Add exclusive_storage to blockdev_grow RPC
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node_uuid: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node_uuid)
93   result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device,
98                                              lu.cfg.GetNodeName(node_uuid),
99                                              instance.name))
100   if device.physical_id is None:
101     device.physical_id = result.payload
102
103
104 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105                          info, force_open, excl_stor):
106   """Create a tree of block devices on a given node.
107
108   If this device type has to be created on secondaries, create it and
109   all its children.
110
111   If not, just recurse to children keeping the same 'force' value.
112
113   @attention: The device has to be annotated already.
114
115   @param lu: the lu on whose behalf we execute
116   @param node_uuid: the node on which to create the device
117   @type instance: L{objects.Instance}
118   @param instance: the instance which owns the device
119   @type device: L{objects.Disk}
120   @param device: the device to create
121   @type force_create: boolean
122   @param force_create: whether to force creation of this device; this
123       will be change to True whenever we find a device which has
124       CreateOnSecondary() attribute
125   @param info: the extra 'metadata' we should attach to the device
126       (this will be represented as a LVM tag)
127   @type force_open: boolean
128   @param force_open: this parameter will be passes to the
129       L{backend.BlockdevCreate} function where it specifies
130       whether we run on primary or not, and it affects both
131       the child assembly and the device own Open() execution
132   @type excl_stor: boolean
133   @param excl_stor: Whether exclusive_storage is active for the node
134
135   @return: list of created devices
136   """
137   created_devices = []
138   try:
139     if device.CreateOnSecondary():
140       force_create = True
141
142     if device.children:
143       for child in device.children:
144         devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145                                     force_create, info, force_open, excl_stor)
146         created_devices.extend(devs)
147
148     if not force_create:
149       return created_devices
150
151     CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
152                          excl_stor)
153     # The device has been completely created, so there is no point in keeping
154     # its subdevices in the list. We just add the device itself instead.
155     created_devices = [(node_uuid, device)]
156     return created_devices
157
158   except errors.DeviceCreationError, e:
159     e.created_devices.extend(created_devices)
160     raise e
161   except errors.OpExecError, e:
162     raise errors.DeviceCreationError(str(e), created_devices)
163
164
165 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166   """Whether exclusive_storage is in effect for the given node.
167
168   @type cfg: L{config.ConfigWriter}
169   @param cfg: The cluster configuration
170   @type node_uuid: string
171   @param node_uuid: The node UUID
172   @rtype: bool
173   @return: The effective value of exclusive_storage
174   @raise errors.OpPrereqError: if no node exists with the given name
175
176   """
177   ni = cfg.GetNodeInfo(node_uuid)
178   if ni is None:
179     raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
180                                errors.ECODE_NOENT)
181   return IsExclusiveStorageEnabledNode(cfg, ni)
182
183
184 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
185                     force_open):
186   """Wrapper around L{_CreateBlockDevInner}.
187
188   This method annotates the root device first.
189
190   """
191   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192   excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193   return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194                               force_open, excl_stor)
195
196
197 def _UndoCreateDisks(lu, disks_created):
198   """Undo the work performed by L{CreateDisks}.
199
200   This function is called in case of an error to undo the work of
201   L{CreateDisks}.
202
203   @type lu: L{LogicalUnit}
204   @param lu: the logical unit on whose behalf we execute
205   @param disks_created: the result returned by L{CreateDisks}
206
207   """
208   for (node_uuid, disk) in disks_created:
209     lu.cfg.SetDiskID(disk, node_uuid)
210     result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211     result.Warn("Failed to remove newly-created disk %s on node %s" %
212                 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214
215 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216   """Create all disks for an instance.
217
218   This abstracts away some work from AddInstance.
219
220   @type lu: L{LogicalUnit}
221   @param lu: the logical unit on whose behalf we execute
222   @type instance: L{objects.Instance}
223   @param instance: the instance whose disks we should create
224   @type to_skip: list
225   @param to_skip: list of indices to skip
226   @type target_node_uuid: string
227   @param target_node_uuid: if passed, overrides the target node for creation
228   @type disks: list of {objects.Disk}
229   @param disks: the disks to create; if not specified, all the disks of the
230       instance are created
231   @return: information about the created disks, to be used to call
232       L{_UndoCreateDisks}
233   @raise errors.OpPrereqError: in case of error
234
235   """
236   info = GetInstanceInfoText(instance)
237   if target_node_uuid is None:
238     pnode_uuid = instance.primary_node
239     all_node_uuids = instance.all_nodes
240   else:
241     pnode_uuid = target_node_uuid
242     all_node_uuids = [pnode_uuid]
243
244   if disks is None:
245     disks = instance.disks
246
247   if instance.disk_template in constants.DTS_FILEBASED:
248     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
249     result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
250
251     result.Raise("Failed to create directory '%s' on"
252                  " node %s" % (file_storage_dir,
253                                lu.cfg.GetNodeName(pnode_uuid)))
254
255   disks_created = []
256   for idx, device in enumerate(disks):
257     if to_skip and idx in to_skip:
258       continue
259     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
260     for node_uuid in all_node_uuids:
261       f_create = node_uuid == pnode_uuid
262       try:
263         _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
264                         f_create)
265         disks_created.append((node_uuid, device))
266       except errors.DeviceCreationError, e:
267         logging.warning("Creating disk %s for instance '%s' failed",
268                         idx, instance.name)
269         disks_created.extend(e.created_devices)
270         _UndoCreateDisks(lu, disks_created)
271         raise errors.OpExecError(e.message)
272   return disks_created
273
274
275 def ComputeDiskSizePerVG(disk_template, disks):
276   """Compute disk size requirements in the volume group
277
278   """
279   def _compute(disks, payload):
280     """Universal algorithm.
281
282     """
283     vgs = {}
284     for disk in disks:
285       vgs[disk[constants.IDISK_VG]] = \
286         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287
288     return vgs
289
290   # Required free disk space as a function of disk and swap space
291   req_size_dict = {
292     constants.DT_DISKLESS: {},
293     constants.DT_PLAIN: _compute(disks, 0),
294     # 128 MB are added for drbd metadata for each disk
295     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
296     constants.DT_FILE: {},
297     constants.DT_SHARED_FILE: {},
298     }
299
300   if disk_template not in req_size_dict:
301     raise errors.ProgrammerError("Disk template '%s' size requirement"
302                                  " is unknown" % disk_template)
303
304   return req_size_dict[disk_template]
305
306
307 def ComputeDisks(op, default_vg):
308   """Computes the instance disks.
309
310   @param op: The instance opcode
311   @param default_vg: The default_vg to assume
312
313   @return: The computed disks
314
315   """
316   disks = []
317   for disk in op.disks:
318     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
319     if mode not in constants.DISK_ACCESS_SET:
320       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
321                                  mode, errors.ECODE_INVAL)
322     size = disk.get(constants.IDISK_SIZE, None)
323     if size is None:
324       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
325     try:
326       size = int(size)
327     except (TypeError, ValueError):
328       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
329                                  errors.ECODE_INVAL)
330
331     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
332     if ext_provider and op.disk_template != constants.DT_EXT:
333       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
334                                  " disk template, not %s" %
335                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
336                                   op.disk_template), errors.ECODE_INVAL)
337
338     data_vg = disk.get(constants.IDISK_VG, default_vg)
339     name = disk.get(constants.IDISK_NAME, None)
340     if name is not None and name.lower() == constants.VALUE_NONE:
341       name = None
342     new_disk = {
343       constants.IDISK_SIZE: size,
344       constants.IDISK_MODE: mode,
345       constants.IDISK_VG: data_vg,
346       constants.IDISK_NAME: name,
347       }
348
349     for key in [
350       constants.IDISK_METAVG,
351       constants.IDISK_ADOPT,
352       constants.IDISK_SPINDLES,
353       ]:
354       if key in disk:
355         new_disk[key] = disk[key]
356
357     # For extstorage, demand the `provider' option and add any
358     # additional parameters (ext-params) to the dict
359     if op.disk_template == constants.DT_EXT:
360       if ext_provider:
361         new_disk[constants.IDISK_PROVIDER] = ext_provider
362         for key in disk:
363           if key not in constants.IDISK_PARAMS:
364             new_disk[key] = disk[key]
365       else:
366         raise errors.OpPrereqError("Missing provider for template '%s'" %
367                                    constants.DT_EXT, errors.ECODE_INVAL)
368
369     disks.append(new_disk)
370
371   return disks
372
373
374 def CheckRADOSFreeSpace():
375   """Compute disk size requirements inside the RADOS cluster.
376
377   """
378   # For the RADOS cluster we assume there is always enough space.
379   pass
380
381
382 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
383                          iv_name, p_minor, s_minor):
384   """Generate a drbd8 device complete with its children.
385
386   """
387   assert len(vgnames) == len(names) == 2
388   port = lu.cfg.AllocatePort()
389   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
390
391   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
392                           logical_id=(vgnames[0], names[0]),
393                           params={})
394   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
395   dev_meta = objects.Disk(dev_type=constants.LD_LV,
396                           size=constants.DRBD_META_SIZE,
397                           logical_id=(vgnames[1], names[1]),
398                           params={})
399   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
401                           logical_id=(primary_uuid, secondary_uuid, port,
402                                       p_minor, s_minor,
403                                       shared_secret),
404                           children=[dev_data, dev_meta],
405                           iv_name=iv_name, params={})
406   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407   return drbd_dev
408
409
410 def GenerateDiskTemplate(
411   lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
412   disk_info, file_storage_dir, file_driver, base_index,
413   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
414   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
415   """Generate the entire disk layout for a given template type.
416
417   """
418   vgname = lu.cfg.GetVGName()
419   disk_count = len(disk_info)
420   disks = []
421
422   if template_name == constants.DT_DISKLESS:
423     pass
424   elif template_name == constants.DT_DRBD8:
425     if len(secondary_node_uuids) != 1:
426       raise errors.ProgrammerError("Wrong template configuration")
427     remote_node_uuid = secondary_node_uuids[0]
428     minors = lu.cfg.AllocateDRBDMinor(
429       [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
430
431     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
432                                                        full_disk_params)
433     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
434
435     names = []
436     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
437                                                for i in range(disk_count)]):
438       names.append(lv_prefix + "_data")
439       names.append(lv_prefix + "_meta")
440     for idx, disk in enumerate(disk_info):
441       disk_index = idx + base_index
442       data_vg = disk.get(constants.IDISK_VG, vgname)
443       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
444       disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
445                                       disk[constants.IDISK_SIZE],
446                                       [data_vg, meta_vg],
447                                       names[idx * 2:idx * 2 + 2],
448                                       "disk/%d" % disk_index,
449                                       minors[idx * 2], minors[idx * 2 + 1])
450       disk_dev.mode = disk[constants.IDISK_MODE]
451       disk_dev.name = disk.get(constants.IDISK_NAME, None)
452       disks.append(disk_dev)
453   else:
454     if secondary_node_uuids:
455       raise errors.ProgrammerError("Wrong template configuration")
456
457     if template_name == constants.DT_FILE:
458       _req_file_storage()
459     elif template_name == constants.DT_SHARED_FILE:
460       _req_shr_file_storage()
461
462     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
463     if name_prefix is None:
464       names = None
465     else:
466       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
467                                         (name_prefix, base_index + i)
468                                         for i in range(disk_count)])
469
470     if template_name == constants.DT_PLAIN:
471
472       def logical_id_fn(idx, _, disk):
473         vg = disk.get(constants.IDISK_VG, vgname)
474         return (vg, names[idx])
475
476     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
477       logical_id_fn = \
478         lambda _, disk_index, disk: (file_driver,
479                                      "%s/disk%d" % (file_storage_dir,
480                                                     disk_index))
481     elif template_name == constants.DT_BLOCK:
482       logical_id_fn = \
483         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
484                                        disk[constants.IDISK_ADOPT])
485     elif template_name == constants.DT_RBD:
486       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
487     elif template_name == constants.DT_EXT:
488       def logical_id_fn(idx, _, disk):
489         provider = disk.get(constants.IDISK_PROVIDER, None)
490         if provider is None:
491           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
492                                        " not found", constants.DT_EXT,
493                                        constants.IDISK_PROVIDER)
494         return (provider, names[idx])
495     else:
496       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
497
498     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
499
500     for idx, disk in enumerate(disk_info):
501       params = {}
502       # Only for the Ext template add disk_info to params
503       if template_name == constants.DT_EXT:
504         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
505         for key in disk:
506           if key not in constants.IDISK_PARAMS:
507             params[key] = disk[key]
508       disk_index = idx + base_index
509       size = disk[constants.IDISK_SIZE]
510       feedback_fn("* disk %s, size %s" %
511                   (disk_index, utils.FormatUnit(size, "h")))
512       disk_dev = objects.Disk(dev_type=dev_type, size=size,
513                               logical_id=logical_id_fn(idx, disk_index, disk),
514                               iv_name="disk/%d" % disk_index,
515                               mode=disk[constants.IDISK_MODE],
516                               params=params,
517                               spindles=disk.get(constants.IDISK_SPINDLES))
518       disk_dev.name = disk.get(constants.IDISK_NAME, None)
519       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
520       disks.append(disk_dev)
521
522   return disks
523
524
525 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
526   """Check the presence of the spindle options with exclusive_storage.
527
528   @type diskdict: dict
529   @param diskdict: disk parameters
530   @type es_flag: bool
531   @param es_flag: the effective value of the exlusive_storage flag
532   @type required: bool
533   @param required: whether spindles are required or just optional
534   @raise errors.OpPrereqError when spindles are given and they should not
535
536   """
537   if (not es_flag and constants.IDISK_SPINDLES in diskdict and
538       diskdict[constants.IDISK_SPINDLES] is not None):
539     raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
540                                " when exclusive storage is not active",
541                                errors.ECODE_INVAL)
542   if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
543                                 diskdict[constants.IDISK_SPINDLES] is None)):
544     raise errors.OpPrereqError("You must specify spindles in instance disks"
545                                " when exclusive storage is active",
546                                errors.ECODE_INVAL)
547
548
549 class LUInstanceRecreateDisks(LogicalUnit):
550   """Recreate an instance's missing disks.
551
552   """
553   HPATH = "instance-recreate-disks"
554   HTYPE = constants.HTYPE_INSTANCE
555   REQ_BGL = False
556
557   _MODIFYABLE = compat.UniqueFrozenset([
558     constants.IDISK_SIZE,
559     constants.IDISK_MODE,
560     constants.IDISK_SPINDLES,
561     ])
562
563   # New or changed disk parameters may have different semantics
564   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
565     constants.IDISK_ADOPT,
566
567     # TODO: Implement support changing VG while recreating
568     constants.IDISK_VG,
569     constants.IDISK_METAVG,
570     constants.IDISK_PROVIDER,
571     constants.IDISK_NAME,
572     ]))
573
574   def _RunAllocator(self):
575     """Run the allocator based on input opcode.
576
577     """
578     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
579
580     # FIXME
581     # The allocator should actually run in "relocate" mode, but current
582     # allocators don't support relocating all the nodes of an instance at
583     # the same time. As a workaround we use "allocate" mode, but this is
584     # suboptimal for two reasons:
585     # - The instance name passed to the allocator is present in the list of
586     #   existing instances, so there could be a conflict within the
587     #   internal structures of the allocator. This doesn't happen with the
588     #   current allocators, but it's a liability.
589     # - The allocator counts the resources used by the instance twice: once
590     #   because the instance exists already, and once because it tries to
591     #   allocate a new instance.
592     # The allocator could choose some of the nodes on which the instance is
593     # running, but that's not a problem. If the instance nodes are broken,
594     # they should be already be marked as drained or offline, and hence
595     # skipped by the allocator. If instance disks have been lost for other
596     # reasons, then recreating the disks on the same nodes should be fine.
597     disk_template = self.instance.disk_template
598     spindle_use = be_full[constants.BE_SPINDLE_USE]
599     disks = [{
600       constants.IDISK_SIZE: d.size,
601       constants.IDISK_MODE: d.mode,
602       constants.IDISK_SPINDLES: d.spindles,
603       } for d in self.instance.disks]
604     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
605                                         disk_template=disk_template,
606                                         tags=list(self.instance.GetTags()),
607                                         os=self.instance.os,
608                                         nics=[{}],
609                                         vcpus=be_full[constants.BE_VCPUS],
610                                         memory=be_full[constants.BE_MAXMEM],
611                                         spindle_use=spindle_use,
612                                         disks=disks,
613                                         hypervisor=self.instance.hypervisor,
614                                         node_whitelist=None)
615     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
616
617     ial.Run(self.op.iallocator)
618
619     assert req.RequiredNodes() == len(self.instance.all_nodes)
620
621     if not ial.success:
622       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
623                                  " %s" % (self.op.iallocator, ial.info),
624                                  errors.ECODE_NORES)
625
626     (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
627     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
628                  self.op.instance_name, self.op.iallocator,
629                  utils.CommaJoin(self.op.nodes))
630
631   def CheckArguments(self):
632     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
633       # Normalize and convert deprecated list of disk indices
634       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
635
636     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
637     if duplicates:
638       raise errors.OpPrereqError("Some disks have been specified more than"
639                                  " once: %s" % utils.CommaJoin(duplicates),
640                                  errors.ECODE_INVAL)
641
642     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
643     # when neither iallocator nor nodes are specified
644     if self.op.iallocator or self.op.nodes:
645       CheckIAllocatorOrNode(self, "iallocator", "nodes")
646
647     for (idx, params) in self.op.disks:
648       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
649       unsupported = frozenset(params.keys()) - self._MODIFYABLE
650       if unsupported:
651         raise errors.OpPrereqError("Parameters for disk %s try to change"
652                                    " unmodifyable parameter(s): %s" %
653                                    (idx, utils.CommaJoin(unsupported)),
654                                    errors.ECODE_INVAL)
655
656   def ExpandNames(self):
657     self._ExpandAndLockInstance()
658     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
659
660     if self.op.nodes:
661       (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
662       self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
663     else:
664       self.needed_locks[locking.LEVEL_NODE] = []
665       if self.op.iallocator:
666         # iallocator will select a new node in the same group
667         self.needed_locks[locking.LEVEL_NODEGROUP] = []
668         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
669
670     self.needed_locks[locking.LEVEL_NODE_RES] = []
671
672   def DeclareLocks(self, level):
673     if level == locking.LEVEL_NODEGROUP:
674       assert self.op.iallocator is not None
675       assert not self.op.nodes
676       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
677       self.share_locks[locking.LEVEL_NODEGROUP] = 1
678       # Lock the primary group used by the instance optimistically; this
679       # requires going via the node before it's locked, requiring
680       # verification later on
681       self.needed_locks[locking.LEVEL_NODEGROUP] = \
682         self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
683
684     elif level == locking.LEVEL_NODE:
685       # If an allocator is used, then we lock all the nodes in the current
686       # instance group, as we don't know yet which ones will be selected;
687       # if we replace the nodes without using an allocator, locks are
688       # already declared in ExpandNames; otherwise, we need to lock all the
689       # instance nodes for disk re-creation
690       if self.op.iallocator:
691         assert not self.op.nodes
692         assert not self.needed_locks[locking.LEVEL_NODE]
693         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
694
695         # Lock member nodes of the group of the primary node
696         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
697           self.needed_locks[locking.LEVEL_NODE].extend(
698             self.cfg.GetNodeGroup(group_uuid).members)
699
700         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
701       elif not self.op.nodes:
702         self._LockInstancesNodes(primary_only=False)
703     elif level == locking.LEVEL_NODE_RES:
704       # Copy node locks
705       self.needed_locks[locking.LEVEL_NODE_RES] = \
706         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
707
708   def BuildHooksEnv(self):
709     """Build hooks env.
710
711     This runs on master, primary and secondary nodes of the instance.
712
713     """
714     return BuildInstanceHookEnvByObject(self, self.instance)
715
716   def BuildHooksNodes(self):
717     """Build hooks nodes.
718
719     """
720     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
721     return (nl, nl)
722
723   def CheckPrereq(self):
724     """Check prerequisites.
725
726     This checks that the instance is in the cluster and is not running.
727
728     """
729     instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
730     assert instance is not None, \
731       "Cannot retrieve locked instance %s" % self.op.instance_name
732     if self.op.node_uuids:
733       if len(self.op.node_uuids) != len(instance.all_nodes):
734         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
735                                    " %d replacement nodes were specified" %
736                                    (instance.name, len(instance.all_nodes),
737                                     len(self.op.node_uuids)),
738                                    errors.ECODE_INVAL)
739       assert instance.disk_template != constants.DT_DRBD8 or \
740              len(self.op.node_uuids) == 2
741       assert instance.disk_template != constants.DT_PLAIN or \
742              len(self.op.node_uuids) == 1
743       primary_node = self.op.node_uuids[0]
744     else:
745       primary_node = instance.primary_node
746     if not self.op.iallocator:
747       CheckNodeOnline(self, primary_node)
748
749     if instance.disk_template == constants.DT_DISKLESS:
750       raise errors.OpPrereqError("Instance '%s' has no disks" %
751                                  self.op.instance_name, errors.ECODE_INVAL)
752
753     # Verify if node group locks are still correct
754     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
755     if owned_groups:
756       # Node group locks are acquired only for the primary node (and only
757       # when the allocator is used)
758       CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
759                               primary_only=True)
760
761     # if we replace nodes *and* the old primary is offline, we don't
762     # check the instance state
763     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
764     if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
765       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
766                          msg="cannot recreate disks")
767
768     if self.op.disks:
769       self.disks = dict(self.op.disks)
770     else:
771       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
772
773     maxidx = max(self.disks.keys())
774     if maxidx >= len(instance.disks):
775       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
776                                  errors.ECODE_INVAL)
777
778     if ((self.op.node_uuids or self.op.iallocator) and
779          sorted(self.disks.keys()) != range(len(instance.disks))):
780       raise errors.OpPrereqError("Can't recreate disks partially and"
781                                  " change the nodes at the same time",
782                                  errors.ECODE_INVAL)
783
784     self.instance = instance
785
786     if self.op.iallocator:
787       self._RunAllocator()
788       # Release unneeded node and node resource locks
789       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
790       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
791       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
792
793     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
794
795     if self.op.node_uuids:
796       node_uuids = self.op.node_uuids
797     else:
798       node_uuids = instance.all_nodes
799     excl_stor = compat.any(
800       rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
801       )
802     for new_params in self.disks.values():
803       CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
804
805   def Exec(self, feedback_fn):
806     """Recreate the disks.
807
808     """
809     assert (self.owned_locks(locking.LEVEL_NODE) ==
810             self.owned_locks(locking.LEVEL_NODE_RES))
811
812     to_skip = []
813     mods = [] # keeps track of needed changes
814
815     for idx, disk in enumerate(self.instance.disks):
816       try:
817         changes = self.disks[idx]
818       except KeyError:
819         # Disk should not be recreated
820         to_skip.append(idx)
821         continue
822
823       # update secondaries for disks, if needed
824       if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
825         # need to update the nodes and minors
826         assert len(self.op.node_uuids) == 2
827         assert len(disk.logical_id) == 6 # otherwise disk internals
828                                          # have changed
829         (_, _, old_port, _, _, old_secret) = disk.logical_id
830         new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
831                                                 self.instance.uuid)
832         new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
833                   new_minors[0], new_minors[1], old_secret)
834         assert len(disk.logical_id) == len(new_id)
835       else:
836         new_id = None
837
838       mods.append((idx, new_id, changes))
839
840     # now that we have passed all asserts above, we can apply the mods
841     # in a single run (to avoid partial changes)
842     for idx, new_id, changes in mods:
843       disk = self.instance.disks[idx]
844       if new_id is not None:
845         assert disk.dev_type == constants.LD_DRBD8
846         disk.logical_id = new_id
847       if changes:
848         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
849                     mode=changes.get(constants.IDISK_MODE, None),
850                     spindles=changes.get(constants.IDISK_SPINDLES, None))
851
852     # change primary node, if needed
853     if self.op.node_uuids:
854       self.instance.primary_node = self.op.node_uuids[0]
855       self.LogWarning("Changing the instance's nodes, you will have to"
856                       " remove any disks left on the older nodes manually")
857
858     if self.op.node_uuids:
859       self.cfg.Update(self.instance, feedback_fn)
860
861     # All touched nodes must be locked
862     mylocks = self.owned_locks(locking.LEVEL_NODE)
863     assert mylocks.issuperset(frozenset(self.instance.all_nodes))
864     new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
865
866     # TODO: Release node locks before wiping, or explain why it's not possible
867     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
868       wipedisks = [(idx, disk, 0)
869                    for (idx, disk) in enumerate(self.instance.disks)
870                    if idx not in to_skip]
871       WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
872                          cleanup=new_disks)
873
874
875 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
876   """Checks if nodes have enough free disk space in the specified VG.
877
878   This function checks if all given nodes have the needed amount of
879   free disk. In case any node has less disk or we cannot get the
880   information from the node, this function raises an OpPrereqError
881   exception.
882
883   @type lu: C{LogicalUnit}
884   @param lu: a logical unit from which we get configuration data
885   @type node_uuids: C{list}
886   @param node_uuids: the list of node UUIDs to check
887   @type vg: C{str}
888   @param vg: the volume group to check
889   @type requested: C{int}
890   @param requested: the amount of disk in MiB to check for
891   @raise errors.OpPrereqError: if the node doesn't have enough disk,
892       or we cannot check the node
893
894   """
895   es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, node_uuids)
896   hvname = lu.cfg.GetHypervisorType()
897   hvparams = lu.cfg.GetClusterInfo().hvparams
898   nodeinfo = lu.rpc.call_node_info(node_uuids, [(constants.ST_LVM_VG, vg)],
899                                    [(hvname, hvparams[hvname])], es_flags)
900   for node in node_uuids:
901     node_name = lu.cfg.GetNodeName(node)
902
903     info = nodeinfo[node]
904     info.Raise("Cannot get current information from node %s" % node_name,
905                prereq=True, ecode=errors.ECODE_ENVIRON)
906     (_, (vg_info, ), _) = info.payload
907     vg_free = vg_info.get("storage_free", None)
908     if not isinstance(vg_free, int):
909       raise errors.OpPrereqError("Can't compute free disk space on node"
910                                  " %s for vg %s, result was '%s'" %
911                                  (node_name, vg, vg_free), errors.ECODE_ENVIRON)
912     if requested > vg_free:
913       raise errors.OpPrereqError("Not enough disk space on target node %s"
914                                  " vg %s: required %d MiB, available %d MiB" %
915                                  (node_name, vg, requested, vg_free),
916                                  errors.ECODE_NORES)
917
918
919 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
920   """Checks if nodes have enough free disk space in all the VGs.
921
922   This function checks if all given nodes have the needed amount of
923   free disk. In case any node has less disk or we cannot get the
924   information from the node, this function raises an OpPrereqError
925   exception.
926
927   @type lu: C{LogicalUnit}
928   @param lu: a logical unit from which we get configuration data
929   @type node_uuids: C{list}
930   @param node_uuids: the list of node UUIDs to check
931   @type req_sizes: C{dict}
932   @param req_sizes: the hash of vg and corresponding amount of disk in
933       MiB to check for
934   @raise errors.OpPrereqError: if the node doesn't have enough disk,
935       or we cannot check the node
936
937   """
938   for vg, req_size in req_sizes.items():
939     _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
940
941
942 def _DiskSizeInBytesToMebibytes(lu, size):
943   """Converts a disk size in bytes to mebibytes.
944
945   Warns and rounds up if the size isn't an even multiple of 1 MiB.
946
947   """
948   (mib, remainder) = divmod(size, 1024 * 1024)
949
950   if remainder != 0:
951     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
952                   " to not overwrite existing data (%s bytes will not be"
953                   " wiped)", (1024 * 1024) - remainder)
954     mib += 1
955
956   return mib
957
958
959 def _CalcEta(time_taken, written, total_size):
960   """Calculates the ETA based on size written and total size.
961
962   @param time_taken: The time taken so far
963   @param written: amount written so far
964   @param total_size: The total size of data to be written
965   @return: The remaining time in seconds
966
967   """
968   avg_time = time_taken / float(written)
969   return (total_size - written) * avg_time
970
971
972 def WipeDisks(lu, instance, disks=None):
973   """Wipes instance disks.
974
975   @type lu: L{LogicalUnit}
976   @param lu: the logical unit on whose behalf we execute
977   @type instance: L{objects.Instance}
978   @param instance: the instance whose disks we should create
979   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
980   @param disks: Disk details; tuple contains disk index, disk object and the
981     start offset
982
983   """
984   node_uuid = instance.primary_node
985   node_name = lu.cfg.GetNodeName(node_uuid)
986
987   if disks is None:
988     disks = [(idx, disk, 0)
989              for (idx, disk) in enumerate(instance.disks)]
990
991   for (_, device, _) in disks:
992     lu.cfg.SetDiskID(device, node_uuid)
993
994   logging.info("Pausing synchronization of disks of instance '%s'",
995                instance.name)
996   result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
997                                                   (map(compat.snd, disks),
998                                                    instance),
999                                                   True)
1000   result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1001
1002   for idx, success in enumerate(result.payload):
1003     if not success:
1004       logging.warn("Pausing synchronization of disk %s of instance '%s'"
1005                    " failed", idx, instance.name)
1006
1007   try:
1008     for (idx, device, offset) in disks:
1009       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1010       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1011       wipe_chunk_size = \
1012         int(min(constants.MAX_WIPE_CHUNK,
1013                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1014
1015       size = device.size
1016       last_output = 0
1017       start_time = time.time()
1018
1019       if offset == 0:
1020         info_text = ""
1021       else:
1022         info_text = (" (from %s to %s)" %
1023                      (utils.FormatUnit(offset, "h"),
1024                       utils.FormatUnit(size, "h")))
1025
1026       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1027
1028       logging.info("Wiping disk %d for instance %s on node %s using"
1029                    " chunk size %s", idx, instance.name, node_name,
1030                    wipe_chunk_size)
1031
1032       while offset < size:
1033         wipe_size = min(wipe_chunk_size, size - offset)
1034
1035         logging.debug("Wiping disk %d, offset %s, chunk %s",
1036                       idx, offset, wipe_size)
1037
1038         result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1039                                            offset, wipe_size)
1040         result.Raise("Could not wipe disk %d at offset %d for size %d" %
1041                      (idx, offset, wipe_size))
1042
1043         now = time.time()
1044         offset += wipe_size
1045         if now - last_output >= 60:
1046           eta = _CalcEta(now - start_time, offset, size)
1047           lu.LogInfo(" - done: %.1f%% ETA: %s",
1048                      offset / float(size) * 100, utils.FormatSeconds(eta))
1049           last_output = now
1050   finally:
1051     logging.info("Resuming synchronization of disks for instance '%s'",
1052                  instance.name)
1053
1054     result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1055                                                     (map(compat.snd, disks),
1056                                                      instance),
1057                                                     False)
1058
1059     if result.fail_msg:
1060       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1061                     node_name, result.fail_msg)
1062     else:
1063       for idx, success in enumerate(result.payload):
1064         if not success:
1065           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1066                         " failed", idx, instance.name)
1067
1068
1069 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1070   """Wrapper for L{WipeDisks} that handles errors.
1071
1072   @type lu: L{LogicalUnit}
1073   @param lu: the logical unit on whose behalf we execute
1074   @type instance: L{objects.Instance}
1075   @param instance: the instance whose disks we should wipe
1076   @param disks: see L{WipeDisks}
1077   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1078       case of error
1079   @raise errors.OpPrereqError: in case of failure
1080
1081   """
1082   try:
1083     WipeDisks(lu, instance, disks=disks)
1084   except errors.OpExecError:
1085     logging.warning("Wiping disks for instance '%s' failed",
1086                     instance.name)
1087     _UndoCreateDisks(lu, cleanup)
1088     raise
1089
1090
1091 def ExpandCheckDisks(instance, disks):
1092   """Return the instance disks selected by the disks list
1093
1094   @type disks: list of L{objects.Disk} or None
1095   @param disks: selected disks
1096   @rtype: list of L{objects.Disk}
1097   @return: selected instance disks to act on
1098
1099   """
1100   if disks is None:
1101     return instance.disks
1102   else:
1103     if not set(disks).issubset(instance.disks):
1104       raise errors.ProgrammerError("Can only act on disks belonging to the"
1105                                    " target instance: expected a subset of %r,"
1106                                    " got %r" % (instance.disks, disks))
1107     return disks
1108
1109
1110 def WaitForSync(lu, instance, disks=None, oneshot=False):
1111   """Sleep and poll for an instance's disk to sync.
1112
1113   """
1114   if not instance.disks or disks is not None and not disks:
1115     return True
1116
1117   disks = ExpandCheckDisks(instance, disks)
1118
1119   if not oneshot:
1120     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1121
1122   node_uuid = instance.primary_node
1123   node_name = lu.cfg.GetNodeName(node_uuid)
1124
1125   for dev in disks:
1126     lu.cfg.SetDiskID(dev, node_uuid)
1127
1128   # TODO: Convert to utils.Retry
1129
1130   retries = 0
1131   degr_retries = 10 # in seconds, as we sleep 1 second each time
1132   while True:
1133     max_time = 0
1134     done = True
1135     cumul_degraded = False
1136     rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1137     msg = rstats.fail_msg
1138     if msg:
1139       lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1140       retries += 1
1141       if retries >= 10:
1142         raise errors.RemoteError("Can't contact node %s for mirror data,"
1143                                  " aborting." % node_name)
1144       time.sleep(6)
1145       continue
1146     rstats = rstats.payload
1147     retries = 0
1148     for i, mstat in enumerate(rstats):
1149       if mstat is None:
1150         lu.LogWarning("Can't compute data for node %s/%s",
1151                       node_name, disks[i].iv_name)
1152         continue
1153
1154       cumul_degraded = (cumul_degraded or
1155                         (mstat.is_degraded and mstat.sync_percent is None))
1156       if mstat.sync_percent is not None:
1157         done = False
1158         if mstat.estimated_time is not None:
1159           rem_time = ("%s remaining (estimated)" %
1160                       utils.FormatSeconds(mstat.estimated_time))
1161           max_time = mstat.estimated_time
1162         else:
1163           rem_time = "no time estimate"
1164         lu.LogInfo("- device %s: %5.2f%% done, %s",
1165                    disks[i].iv_name, mstat.sync_percent, rem_time)
1166
1167     # if we're done but degraded, let's do a few small retries, to
1168     # make sure we see a stable and not transient situation; therefore
1169     # we force restart of the loop
1170     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1171       logging.info("Degraded disks found, %d retries left", degr_retries)
1172       degr_retries -= 1
1173       time.sleep(1)
1174       continue
1175
1176     if done or oneshot:
1177       break
1178
1179     time.sleep(min(60, max_time))
1180
1181   if done:
1182     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1183
1184   return not cumul_degraded
1185
1186
1187 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1188   """Shutdown block devices of an instance.
1189
1190   This does the shutdown on all nodes of the instance.
1191
1192   If the ignore_primary is false, errors on the primary node are
1193   ignored.
1194
1195   """
1196   lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1197   all_result = True
1198   disks = ExpandCheckDisks(instance, disks)
1199
1200   for disk in disks:
1201     for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1202       lu.cfg.SetDiskID(top_disk, node_uuid)
1203       result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1204       msg = result.fail_msg
1205       if msg:
1206         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1207                       disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1208         if ((node_uuid == instance.primary_node and not ignore_primary) or
1209             (node_uuid != instance.primary_node and not result.offline)):
1210           all_result = False
1211   return all_result
1212
1213
1214 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1215   """Shutdown block devices of an instance.
1216
1217   This function checks if an instance is running, before calling
1218   _ShutdownInstanceDisks.
1219
1220   """
1221   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1222   ShutdownInstanceDisks(lu, instance, disks=disks)
1223
1224
1225 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1226                            ignore_size=False):
1227   """Prepare the block devices for an instance.
1228
1229   This sets up the block devices on all nodes.
1230
1231   @type lu: L{LogicalUnit}
1232   @param lu: the logical unit on whose behalf we execute
1233   @type instance: L{objects.Instance}
1234   @param instance: the instance for whose disks we assemble
1235   @type disks: list of L{objects.Disk} or None
1236   @param disks: which disks to assemble (or all, if None)
1237   @type ignore_secondaries: boolean
1238   @param ignore_secondaries: if true, errors on secondary nodes
1239       won't result in an error return from the function
1240   @type ignore_size: boolean
1241   @param ignore_size: if true, the current known size of the disk
1242       will not be used during the disk activation, useful for cases
1243       when the size is wrong
1244   @return: False if the operation failed, otherwise a list of
1245       (host, instance_visible_name, node_visible_name)
1246       with the mapping from node devices to instance devices
1247
1248   """
1249   device_info = []
1250   disks_ok = True
1251   disks = ExpandCheckDisks(instance, disks)
1252
1253   # With the two passes mechanism we try to reduce the window of
1254   # opportunity for the race condition of switching DRBD to primary
1255   # before handshaking occured, but we do not eliminate it
1256
1257   # The proper fix would be to wait (with some limits) until the
1258   # connection has been made and drbd transitions from WFConnection
1259   # into any other network-connected state (Connected, SyncTarget,
1260   # SyncSource, etc.)
1261
1262   # mark instance disks as active before doing actual work, so watcher does
1263   # not try to shut them down erroneously
1264   lu.cfg.MarkInstanceDisksActive(instance.uuid)
1265
1266   # 1st pass, assemble on all nodes in secondary mode
1267   for idx, inst_disk in enumerate(disks):
1268     for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1269                                   instance.primary_node):
1270       if ignore_size:
1271         node_disk = node_disk.Copy()
1272         node_disk.UnsetSize()
1273       lu.cfg.SetDiskID(node_disk, node_uuid)
1274       result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1275                                              instance.name, False, idx)
1276       msg = result.fail_msg
1277       if msg:
1278         is_offline_secondary = (node_uuid in instance.secondary_nodes and
1279                                 result.offline)
1280         lu.LogWarning("Could not prepare block device %s on node %s"
1281                       " (is_primary=False, pass=1): %s",
1282                       inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1283         if not (ignore_secondaries or is_offline_secondary):
1284           disks_ok = False
1285
1286   # FIXME: race condition on drbd migration to primary
1287
1288   # 2nd pass, do only the primary node
1289   for idx, inst_disk in enumerate(disks):
1290     dev_path = None
1291
1292     for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1293                                   instance.primary_node):
1294       if node_uuid != instance.primary_node:
1295         continue
1296       if ignore_size:
1297         node_disk = node_disk.Copy()
1298         node_disk.UnsetSize()
1299       lu.cfg.SetDiskID(node_disk, node_uuid)
1300       result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1301                                              instance.name, True, idx)
1302       msg = result.fail_msg
1303       if msg:
1304         lu.LogWarning("Could not prepare block device %s on node %s"
1305                       " (is_primary=True, pass=2): %s",
1306                       inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1307         disks_ok = False
1308       else:
1309         dev_path = result.payload
1310
1311     device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1312                         inst_disk.iv_name, dev_path))
1313
1314   # leave the disks configured for the primary node
1315   # this is a workaround that would be fixed better by
1316   # improving the logical/physical id handling
1317   for disk in disks:
1318     lu.cfg.SetDiskID(disk, instance.primary_node)
1319
1320   if not disks_ok:
1321     lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1322
1323   return disks_ok, device_info
1324
1325
1326 def StartInstanceDisks(lu, instance, force):
1327   """Start the disks of an instance.
1328
1329   """
1330   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1331                                       ignore_secondaries=force)
1332   if not disks_ok:
1333     ShutdownInstanceDisks(lu, instance)
1334     if force is not None and not force:
1335       lu.LogWarning("",
1336                     hint=("If the message above refers to a secondary node,"
1337                           " you can retry the operation using '--force'"))
1338     raise errors.OpExecError("Disk consistency error")
1339
1340
1341 class LUInstanceGrowDisk(LogicalUnit):
1342   """Grow a disk of an instance.
1343
1344   """
1345   HPATH = "disk-grow"
1346   HTYPE = constants.HTYPE_INSTANCE
1347   REQ_BGL = False
1348
1349   def ExpandNames(self):
1350     self._ExpandAndLockInstance()
1351     self.needed_locks[locking.LEVEL_NODE] = []
1352     self.needed_locks[locking.LEVEL_NODE_RES] = []
1353     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1354     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1355
1356   def DeclareLocks(self, level):
1357     if level == locking.LEVEL_NODE:
1358       self._LockInstancesNodes()
1359     elif level == locking.LEVEL_NODE_RES:
1360       # Copy node locks
1361       self.needed_locks[locking.LEVEL_NODE_RES] = \
1362         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1363
1364   def BuildHooksEnv(self):
1365     """Build hooks env.
1366
1367     This runs on the master, the primary and all the secondaries.
1368
1369     """
1370     env = {
1371       "DISK": self.op.disk,
1372       "AMOUNT": self.op.amount,
1373       "ABSOLUTE": self.op.absolute,
1374       }
1375     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1376     return env
1377
1378   def BuildHooksNodes(self):
1379     """Build hooks nodes.
1380
1381     """
1382     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1383     return (nl, nl)
1384
1385   def CheckPrereq(self):
1386     """Check prerequisites.
1387
1388     This checks that the instance is in the cluster.
1389
1390     """
1391     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1392     assert self.instance is not None, \
1393       "Cannot retrieve locked instance %s" % self.op.instance_name
1394     node_uuids = list(self.instance.all_nodes)
1395     for node_uuid in node_uuids:
1396       CheckNodeOnline(self, node_uuid)
1397     self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1398
1399     if self.instance.disk_template not in constants.DTS_GROWABLE:
1400       raise errors.OpPrereqError("Instance's disk layout does not support"
1401                                  " growing", errors.ECODE_INVAL)
1402
1403     self.disk = self.instance.FindDisk(self.op.disk)
1404
1405     if self.op.absolute:
1406       self.target = self.op.amount
1407       self.delta = self.target - self.disk.size
1408       if self.delta < 0:
1409         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1410                                    "current disk size (%s)" %
1411                                    (utils.FormatUnit(self.target, "h"),
1412                                     utils.FormatUnit(self.disk.size, "h")),
1413                                    errors.ECODE_STATE)
1414     else:
1415       self.delta = self.op.amount
1416       self.target = self.disk.size + self.delta
1417       if self.delta < 0:
1418         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1419                                    utils.FormatUnit(self.delta, "h"),
1420                                    errors.ECODE_INVAL)
1421
1422     self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1423
1424   def _CheckDiskSpace(self, node_uuids, req_vgspace):
1425     template = self.instance.disk_template
1426     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1427       # TODO: check the free disk space for file, when that feature will be
1428       # supported
1429       if any(self.node_es_flags.values()):
1430         # With exclusive storage we need to something smarter than just looking
1431         # at free space; for now, let's simply abort the operation.
1432         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1433                                    " is enabled", errors.ECODE_STATE)
1434       CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1435
1436   def Exec(self, feedback_fn):
1437     """Execute disk grow.
1438
1439     """
1440     assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1441     assert (self.owned_locks(locking.LEVEL_NODE) ==
1442             self.owned_locks(locking.LEVEL_NODE_RES))
1443
1444     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1445
1446     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1447     if not disks_ok:
1448       raise errors.OpExecError("Cannot activate block device to grow")
1449
1450     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1451                 (self.op.disk, self.instance.name,
1452                  utils.FormatUnit(self.delta, "h"),
1453                  utils.FormatUnit(self.target, "h")))
1454
1455     # First run all grow ops in dry-run mode
1456     for node_uuid in self.instance.all_nodes:
1457       self.cfg.SetDiskID(self.disk, node_uuid)
1458       result = self.rpc.call_blockdev_grow(node_uuid,
1459                                            (self.disk, self.instance),
1460                                            self.delta, True, True,
1461                                            self.node_es_flags[node_uuid])
1462       result.Raise("Dry-run grow request failed to node %s" %
1463                    self.cfg.GetNodeName(node_uuid))
1464
1465     if wipe_disks:
1466       # Get disk size from primary node for wiping
1467       self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1468       result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1469                                                     [self.disk])
1470       result.Raise("Failed to retrieve disk size from node '%s'" %
1471                    self.instance.primary_node)
1472
1473       (disk_dimensions, ) = result.payload
1474
1475       if disk_dimensions is None:
1476         raise errors.OpExecError("Failed to retrieve disk size from primary"
1477                                  " node '%s'" % self.instance.primary_node)
1478       (disk_size_in_bytes, _) = disk_dimensions
1479
1480       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1481
1482       assert old_disk_size >= self.disk.size, \
1483         ("Retrieved disk size too small (got %s, should be at least %s)" %
1484          (old_disk_size, self.disk.size))
1485     else:
1486       old_disk_size = None
1487
1488     # We know that (as far as we can test) operations across different
1489     # nodes will succeed, time to run it for real on the backing storage
1490     for node_uuid in self.instance.all_nodes:
1491       self.cfg.SetDiskID(self.disk, node_uuid)
1492       result = self.rpc.call_blockdev_grow(node_uuid,
1493                                            (self.disk, self.instance),
1494                                            self.delta, False, True,
1495                                            self.node_es_flags[node_uuid])
1496       result.Raise("Grow request failed to node %s" %
1497                    self.cfg.GetNodeName(node_uuid))
1498
1499     # And now execute it for logical storage, on the primary node
1500     node_uuid = self.instance.primary_node
1501     self.cfg.SetDiskID(self.disk, node_uuid)
1502     result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1503                                          self.delta, False, False,
1504                                          self.node_es_flags[node_uuid])
1505     result.Raise("Grow request failed to node %s" %
1506                  self.cfg.GetNodeName(node_uuid))
1507
1508     self.disk.RecordGrow(self.delta)
1509     self.cfg.Update(self.instance, feedback_fn)
1510
1511     # Changes have been recorded, release node lock
1512     ReleaseLocks(self, locking.LEVEL_NODE)
1513
1514     # Downgrade lock while waiting for sync
1515     self.glm.downgrade(locking.LEVEL_INSTANCE)
1516
1517     assert wipe_disks ^ (old_disk_size is None)
1518
1519     if wipe_disks:
1520       assert self.instance.disks[self.op.disk] == self.disk
1521
1522       # Wipe newly added disk space
1523       WipeDisks(self, self.instance,
1524                 disks=[(self.op.disk, self.disk, old_disk_size)])
1525
1526     if self.op.wait_for_sync:
1527       disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1528       if disk_abort:
1529         self.LogWarning("Disk syncing has not returned a good status; check"
1530                         " the instance")
1531       if not self.instance.disks_active:
1532         _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1533     elif not self.instance.disks_active:
1534       self.LogWarning("Not shutting down the disk even if the instance is"
1535                       " not supposed to be running because no wait for"
1536                       " sync mode was requested")
1537
1538     assert self.owned_locks(locking.LEVEL_NODE_RES)
1539     assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1540
1541
1542 class LUInstanceReplaceDisks(LogicalUnit):
1543   """Replace the disks of an instance.
1544
1545   """
1546   HPATH = "mirrors-replace"
1547   HTYPE = constants.HTYPE_INSTANCE
1548   REQ_BGL = False
1549
1550   def CheckArguments(self):
1551     """Check arguments.
1552
1553     """
1554     if self.op.mode == constants.REPLACE_DISK_CHG:
1555       if self.op.remote_node is None and self.op.iallocator is None:
1556         raise errors.OpPrereqError("When changing the secondary either an"
1557                                    " iallocator script must be used or the"
1558                                    " new node given", errors.ECODE_INVAL)
1559       else:
1560         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1561
1562     elif self.op.remote_node is not None or self.op.iallocator is not None:
1563       # Not replacing the secondary
1564       raise errors.OpPrereqError("The iallocator and new node options can"
1565                                  " only be used when changing the"
1566                                  " secondary node", errors.ECODE_INVAL)
1567
1568   def ExpandNames(self):
1569     self._ExpandAndLockInstance()
1570
1571     assert locking.LEVEL_NODE not in self.needed_locks
1572     assert locking.LEVEL_NODE_RES not in self.needed_locks
1573     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1574
1575     assert self.op.iallocator is None or self.op.remote_node is None, \
1576       "Conflicting options"
1577
1578     if self.op.remote_node is not None:
1579       (self.op.remote_node_uuid, self.op.remote_node) = \
1580         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1581                               self.op.remote_node)
1582
1583       # Warning: do not remove the locking of the new secondary here
1584       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1585       # currently it doesn't since parallel invocations of
1586       # FindUnusedMinor will conflict
1587       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1588       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1589     else:
1590       self.needed_locks[locking.LEVEL_NODE] = []
1591       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1592
1593       if self.op.iallocator is not None:
1594         # iallocator will select a new node in the same group
1595         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1596         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1597
1598     self.needed_locks[locking.LEVEL_NODE_RES] = []
1599
1600     self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1601                                    self.op.instance_name, self.op.mode,
1602                                    self.op.iallocator, self.op.remote_node_uuid,
1603                                    self.op.disks, self.op.early_release,
1604                                    self.op.ignore_ipolicy)
1605
1606     self.tasklets = [self.replacer]
1607
1608   def DeclareLocks(self, level):
1609     if level == locking.LEVEL_NODEGROUP:
1610       assert self.op.remote_node_uuid is None
1611       assert self.op.iallocator is not None
1612       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1613
1614       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1615       # Lock all groups used by instance optimistically; this requires going
1616       # via the node before it's locked, requiring verification later on
1617       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1618         self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1619
1620     elif level == locking.LEVEL_NODE:
1621       if self.op.iallocator is not None:
1622         assert self.op.remote_node_uuid is None
1623         assert not self.needed_locks[locking.LEVEL_NODE]
1624         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1625
1626         # Lock member nodes of all locked groups
1627         self.needed_locks[locking.LEVEL_NODE] = \
1628           [node_uuid
1629            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1630            for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1631       else:
1632         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1633
1634         self._LockInstancesNodes()
1635
1636     elif level == locking.LEVEL_NODE_RES:
1637       # Reuse node locks
1638       self.needed_locks[locking.LEVEL_NODE_RES] = \
1639         self.needed_locks[locking.LEVEL_NODE]
1640
1641   def BuildHooksEnv(self):
1642     """Build hooks env.
1643
1644     This runs on the master, the primary and all the secondaries.
1645
1646     """
1647     instance = self.replacer.instance
1648     env = {
1649       "MODE": self.op.mode,
1650       "NEW_SECONDARY": self.op.remote_node,
1651       "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1652       }
1653     env.update(BuildInstanceHookEnvByObject(self, instance))
1654     return env
1655
1656   def BuildHooksNodes(self):
1657     """Build hooks nodes.
1658
1659     """
1660     instance = self.replacer.instance
1661     nl = [
1662       self.cfg.GetMasterNode(),
1663       instance.primary_node,
1664       ]
1665     if self.op.remote_node_uuid is not None:
1666       nl.append(self.op.remote_node_uuid)
1667     return nl, nl
1668
1669   def CheckPrereq(self):
1670     """Check prerequisites.
1671
1672     """
1673     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1674             self.op.iallocator is None)
1675
1676     # Verify if node group locks are still correct
1677     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1678     if owned_groups:
1679       CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1680
1681     return LogicalUnit.CheckPrereq(self)
1682
1683
1684 class LUInstanceActivateDisks(NoHooksLU):
1685   """Bring up an instance's disks.
1686
1687   """
1688   REQ_BGL = False
1689
1690   def ExpandNames(self):
1691     self._ExpandAndLockInstance()
1692     self.needed_locks[locking.LEVEL_NODE] = []
1693     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1694
1695   def DeclareLocks(self, level):
1696     if level == locking.LEVEL_NODE:
1697       self._LockInstancesNodes()
1698
1699   def CheckPrereq(self):
1700     """Check prerequisites.
1701
1702     This checks that the instance is in the cluster.
1703
1704     """
1705     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1706     assert self.instance is not None, \
1707       "Cannot retrieve locked instance %s" % self.op.instance_name
1708     CheckNodeOnline(self, self.instance.primary_node)
1709
1710   def Exec(self, feedback_fn):
1711     """Activate the disks.
1712
1713     """
1714     disks_ok, disks_info = \
1715               AssembleInstanceDisks(self, self.instance,
1716                                     ignore_size=self.op.ignore_size)
1717     if not disks_ok:
1718       raise errors.OpExecError("Cannot activate block devices")
1719
1720     if self.op.wait_for_sync:
1721       if not WaitForSync(self, self.instance):
1722         self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1723         raise errors.OpExecError("Some disks of the instance are degraded!")
1724
1725     return disks_info
1726
1727
1728 class LUInstanceDeactivateDisks(NoHooksLU):
1729   """Shutdown an instance's disks.
1730
1731   """
1732   REQ_BGL = False
1733
1734   def ExpandNames(self):
1735     self._ExpandAndLockInstance()
1736     self.needed_locks[locking.LEVEL_NODE] = []
1737     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1738
1739   def DeclareLocks(self, level):
1740     if level == locking.LEVEL_NODE:
1741       self._LockInstancesNodes()
1742
1743   def CheckPrereq(self):
1744     """Check prerequisites.
1745
1746     This checks that the instance is in the cluster.
1747
1748     """
1749     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1750     assert self.instance is not None, \
1751       "Cannot retrieve locked instance %s" % self.op.instance_name
1752
1753   def Exec(self, feedback_fn):
1754     """Deactivate the disks
1755
1756     """
1757     if self.op.force:
1758       ShutdownInstanceDisks(self, self.instance)
1759     else:
1760       _SafeShutdownInstanceDisks(self, self.instance)
1761
1762
1763 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1764                                ldisk=False):
1765   """Check that mirrors are not degraded.
1766
1767   @attention: The device has to be annotated already.
1768
1769   The ldisk parameter, if True, will change the test from the
1770   is_degraded attribute (which represents overall non-ok status for
1771   the device(s)) to the ldisk (representing the local storage status).
1772
1773   """
1774   lu.cfg.SetDiskID(dev, node_uuid)
1775
1776   result = True
1777
1778   if on_primary or dev.AssembleOnSecondary():
1779     rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1780     msg = rstats.fail_msg
1781     if msg:
1782       lu.LogWarning("Can't find disk on node %s: %s",
1783                     lu.cfg.GetNodeName(node_uuid), msg)
1784       result = False
1785     elif not rstats.payload:
1786       lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1787       result = False
1788     else:
1789       if ldisk:
1790         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1791       else:
1792         result = result and not rstats.payload.is_degraded
1793
1794   if dev.children:
1795     for child in dev.children:
1796       result = result and _CheckDiskConsistencyInner(lu, instance, child,
1797                                                      node_uuid, on_primary)
1798
1799   return result
1800
1801
1802 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1803   """Wrapper around L{_CheckDiskConsistencyInner}.
1804
1805   """
1806   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1807   return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1808                                     ldisk=ldisk)
1809
1810
1811 def _BlockdevFind(lu, node_uuid, dev, instance):
1812   """Wrapper around call_blockdev_find to annotate diskparams.
1813
1814   @param lu: A reference to the lu object
1815   @param node_uuid: The node to call out
1816   @param dev: The device to find
1817   @param instance: The instance object the device belongs to
1818   @returns The result of the rpc call
1819
1820   """
1821   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1822   return lu.rpc.call_blockdev_find(node_uuid, disk)
1823
1824
1825 def _GenerateUniqueNames(lu, exts):
1826   """Generate a suitable LV name.
1827
1828   This will generate a logical volume name for the given instance.
1829
1830   """
1831   results = []
1832   for val in exts:
1833     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1834     results.append("%s%s" % (new_id, val))
1835   return results
1836
1837
1838 class TLReplaceDisks(Tasklet):
1839   """Replaces disks for an instance.
1840
1841   Note: Locking is not within the scope of this class.
1842
1843   """
1844   def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1845                remote_node_uuid, disks, early_release, ignore_ipolicy):
1846     """Initializes this class.
1847
1848     """
1849     Tasklet.__init__(self, lu)
1850
1851     # Parameters
1852     self.instance_uuid = instance_uuid
1853     self.instance_name = instance_name
1854     self.mode = mode
1855     self.iallocator_name = iallocator_name
1856     self.remote_node_uuid = remote_node_uuid
1857     self.disks = disks
1858     self.early_release = early_release
1859     self.ignore_ipolicy = ignore_ipolicy
1860
1861     # Runtime data
1862     self.instance = None
1863     self.new_node_uuid = None
1864     self.target_node_uuid = None
1865     self.other_node_uuid = None
1866     self.remote_node_info = None
1867     self.node_secondary_ip = None
1868
1869   @staticmethod
1870   def _RunAllocator(lu, iallocator_name, instance_uuid,
1871                     relocate_from_node_uuids):
1872     """Compute a new secondary node using an IAllocator.
1873
1874     """
1875     req = iallocator.IAReqRelocate(
1876           inst_uuid=instance_uuid,
1877           relocate_from_node_uuids=list(relocate_from_node_uuids))
1878     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1879
1880     ial.Run(iallocator_name)
1881
1882     if not ial.success:
1883       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1884                                  " %s" % (iallocator_name, ial.info),
1885                                  errors.ECODE_NORES)
1886
1887     remote_node_name = ial.result[0]
1888     remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1889
1890     if remote_node is None:
1891       raise errors.OpPrereqError("Node %s not found in configuration" %
1892                                  remote_node_name, errors.ECODE_NOENT)
1893
1894     lu.LogInfo("Selected new secondary for instance '%s': %s",
1895                instance_uuid, remote_node_name)
1896
1897     return remote_node.uuid
1898
1899   def _FindFaultyDisks(self, node_uuid):
1900     """Wrapper for L{FindFaultyInstanceDisks}.
1901
1902     """
1903     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1904                                    node_uuid, True)
1905
1906   def _CheckDisksActivated(self, instance):
1907     """Checks if the instance disks are activated.
1908
1909     @param instance: The instance to check disks
1910     @return: True if they are activated, False otherwise
1911
1912     """
1913     node_uuids = instance.all_nodes
1914
1915     for idx, dev in enumerate(instance.disks):
1916       for node_uuid in node_uuids:
1917         self.lu.LogInfo("Checking disk/%d on %s", idx,
1918                         self.cfg.GetNodeName(node_uuid))
1919         self.cfg.SetDiskID(dev, node_uuid)
1920
1921         result = _BlockdevFind(self, node_uuid, dev, instance)
1922
1923         if result.offline:
1924           continue
1925         elif result.fail_msg or not result.payload:
1926           return False
1927
1928     return True
1929
1930   def CheckPrereq(self):
1931     """Check prerequisites.
1932
1933     This checks that the instance is in the cluster.
1934
1935     """
1936     self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1937     assert self.instance is not None, \
1938       "Cannot retrieve locked instance %s" % self.instance_name
1939
1940     if self.instance.disk_template != constants.DT_DRBD8:
1941       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1942                                  " instances", errors.ECODE_INVAL)
1943
1944     if len(self.instance.secondary_nodes) != 1:
1945       raise errors.OpPrereqError("The instance has a strange layout,"
1946                                  " expected one secondary but found %d" %
1947                                  len(self.instance.secondary_nodes),
1948                                  errors.ECODE_FAULT)
1949
1950     secondary_node_uuid = self.instance.secondary_nodes[0]
1951
1952     if self.iallocator_name is None:
1953       remote_node_uuid = self.remote_node_uuid
1954     else:
1955       remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1956                                             self.instance.uuid,
1957                                             self.instance.secondary_nodes)
1958
1959     if remote_node_uuid is None:
1960       self.remote_node_info = None
1961     else:
1962       assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1963              "Remote node '%s' is not locked" % remote_node_uuid
1964
1965       self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1966       assert self.remote_node_info is not None, \
1967         "Cannot retrieve locked node %s" % remote_node_uuid
1968
1969     if remote_node_uuid == self.instance.primary_node:
1970       raise errors.OpPrereqError("The specified node is the primary node of"
1971                                  " the instance", errors.ECODE_INVAL)
1972
1973     if remote_node_uuid == secondary_node_uuid:
1974       raise errors.OpPrereqError("The specified node is already the"
1975                                  " secondary node of the instance",
1976                                  errors.ECODE_INVAL)
1977
1978     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1979                                     constants.REPLACE_DISK_CHG):
1980       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1981                                  errors.ECODE_INVAL)
1982
1983     if self.mode == constants.REPLACE_DISK_AUTO:
1984       if not self._CheckDisksActivated(self.instance):
1985         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1986                                    " first" % self.instance_name,
1987                                    errors.ECODE_STATE)
1988       faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
1989       faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
1990
1991       if faulty_primary and faulty_secondary:
1992         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1993                                    " one node and can not be repaired"
1994                                    " automatically" % self.instance_name,
1995                                    errors.ECODE_STATE)
1996
1997       if faulty_primary:
1998         self.disks = faulty_primary
1999         self.target_node_uuid = self.instance.primary_node
2000         self.other_node_uuid = secondary_node_uuid
2001         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2002       elif faulty_secondary:
2003         self.disks = faulty_secondary
2004         self.target_node_uuid = secondary_node_uuid
2005         self.other_node_uuid = self.instance.primary_node
2006         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2007       else:
2008         self.disks = []
2009         check_nodes = []
2010
2011     else:
2012       # Non-automatic modes
2013       if self.mode == constants.REPLACE_DISK_PRI:
2014         self.target_node_uuid = self.instance.primary_node
2015         self.other_node_uuid = secondary_node_uuid
2016         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2017
2018       elif self.mode == constants.REPLACE_DISK_SEC:
2019         self.target_node_uuid = secondary_node_uuid
2020         self.other_node_uuid = self.instance.primary_node
2021         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2022
2023       elif self.mode == constants.REPLACE_DISK_CHG:
2024         self.new_node_uuid = remote_node_uuid
2025         self.other_node_uuid = self.instance.primary_node
2026         self.target_node_uuid = secondary_node_uuid
2027         check_nodes = [self.new_node_uuid, self.other_node_uuid]
2028
2029         CheckNodeNotDrained(self.lu, remote_node_uuid)
2030         CheckNodeVmCapable(self.lu, remote_node_uuid)
2031
2032         old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2033         assert old_node_info is not None
2034         if old_node_info.offline and not self.early_release:
2035           # doesn't make sense to delay the release
2036           self.early_release = True
2037           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2038                           " early-release mode", secondary_node_uuid)
2039
2040       else:
2041         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2042                                      self.mode)
2043
2044       # If not specified all disks should be replaced
2045       if not self.disks:
2046         self.disks = range(len(self.instance.disks))
2047
2048     # TODO: This is ugly, but right now we can't distinguish between internal
2049     # submitted opcode and external one. We should fix that.
2050     if self.remote_node_info:
2051       # We change the node, lets verify it still meets instance policy
2052       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2053       cluster = self.cfg.GetClusterInfo()
2054       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2055                                                               new_group_info)
2056       CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2057                              self.remote_node_info, self.cfg,
2058                              ignore=self.ignore_ipolicy)
2059
2060     for node_uuid in check_nodes:
2061       CheckNodeOnline(self.lu, node_uuid)
2062
2063     touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2064                                                           self.other_node_uuid,
2065                                                           self.target_node_uuid]
2066                               if node_uuid is not None)
2067
2068     # Release unneeded node and node resource locks
2069     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2070     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2071     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2072
2073     # Release any owned node group
2074     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2075
2076     # Check whether disks are valid
2077     for disk_idx in self.disks:
2078       self.instance.FindDisk(disk_idx)
2079
2080     # Get secondary node IP addresses
2081     self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2082                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2083
2084   def Exec(self, feedback_fn):
2085     """Execute disk replacement.
2086
2087     This dispatches the disk replacement to the appropriate handler.
2088
2089     """
2090     if __debug__:
2091       # Verify owned locks before starting operation
2092       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2093       assert set(owned_nodes) == set(self.node_secondary_ip), \
2094           ("Incorrect node locks, owning %s, expected %s" %
2095            (owned_nodes, self.node_secondary_ip.keys()))
2096       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2097               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2098       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2099
2100       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2101       assert list(owned_instances) == [self.instance_name], \
2102           "Instance '%s' not locked" % self.instance_name
2103
2104       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2105           "Should not own any node group lock at this point"
2106
2107     if not self.disks:
2108       feedback_fn("No disks need replacement for instance '%s'" %
2109                   self.instance.name)
2110       return
2111
2112     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2113                 (utils.CommaJoin(self.disks), self.instance.name))
2114     feedback_fn("Current primary node: %s" %
2115                 self.cfg.GetNodeName(self.instance.primary_node))
2116     feedback_fn("Current seconary node: %s" %
2117                 utils.CommaJoin(self.cfg.GetNodeNames(
2118                                   self.instance.secondary_nodes)))
2119
2120     activate_disks = not self.instance.disks_active
2121
2122     # Activate the instance disks if we're replacing them on a down instance
2123     if activate_disks:
2124       StartInstanceDisks(self.lu, self.instance, True)
2125
2126     try:
2127       # Should we replace the secondary node?
2128       if self.new_node_uuid is not None:
2129         fn = self._ExecDrbd8Secondary
2130       else:
2131         fn = self._ExecDrbd8DiskOnly
2132
2133       result = fn(feedback_fn)
2134     finally:
2135       # Deactivate the instance disks if we're replacing them on a
2136       # down instance
2137       if activate_disks:
2138         _SafeShutdownInstanceDisks(self.lu, self.instance)
2139
2140     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2141
2142     if __debug__:
2143       # Verify owned locks
2144       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2145       nodes = frozenset(self.node_secondary_ip)
2146       assert ((self.early_release and not owned_nodes) or
2147               (not self.early_release and not (set(owned_nodes) - nodes))), \
2148         ("Not owning the correct locks, early_release=%s, owned=%r,"
2149          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2150
2151     return result
2152
2153   def _CheckVolumeGroup(self, node_uuids):
2154     self.lu.LogInfo("Checking volume groups")
2155
2156     vgname = self.cfg.GetVGName()
2157
2158     # Make sure volume group exists on all involved nodes
2159     results = self.rpc.call_vg_list(node_uuids)
2160     if not results:
2161       raise errors.OpExecError("Can't list volume groups on the nodes")
2162
2163     for node_uuid in node_uuids:
2164       res = results[node_uuid]
2165       res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2166       if vgname not in res.payload:
2167         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2168                                  (vgname, self.cfg.GetNodeName(node_uuid)))
2169
2170   def _CheckDisksExistence(self, node_uuids):
2171     # Check disk existence
2172     for idx, dev in enumerate(self.instance.disks):
2173       if idx not in self.disks:
2174         continue
2175
2176       for node_uuid in node_uuids:
2177         self.lu.LogInfo("Checking disk/%d on %s", idx,
2178                         self.cfg.GetNodeName(node_uuid))
2179         self.cfg.SetDiskID(dev, node_uuid)
2180
2181         result = _BlockdevFind(self, node_uuid, dev, self.instance)
2182
2183         msg = result.fail_msg
2184         if msg or not result.payload:
2185           if not msg:
2186             msg = "disk not found"
2187           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2188                                    (idx, self.cfg.GetNodeName(node_uuid), msg))
2189
2190   def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2191     for idx, dev in enumerate(self.instance.disks):
2192       if idx not in self.disks:
2193         continue
2194
2195       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2196                       (idx, self.cfg.GetNodeName(node_uuid)))
2197
2198       if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2199                                   on_primary, ldisk=ldisk):
2200         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2201                                  " replace disks for instance %s" %
2202                                  (self.cfg.GetNodeName(node_uuid),
2203                                   self.instance.name))
2204
2205   def _CreateNewStorage(self, node_uuid):
2206     """Create new storage on the primary or secondary node.
2207
2208     This is only used for same-node replaces, not for changing the
2209     secondary node, hence we don't want to modify the existing disk.
2210
2211     """
2212     iv_names = {}
2213
2214     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2215     for idx, dev in enumerate(disks):
2216       if idx not in self.disks:
2217         continue
2218
2219       self.lu.LogInfo("Adding storage on %s for disk/%d",
2220                       self.cfg.GetNodeName(node_uuid), idx)
2221
2222       self.cfg.SetDiskID(dev, node_uuid)
2223
2224       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2225       names = _GenerateUniqueNames(self.lu, lv_names)
2226
2227       (data_disk, meta_disk) = dev.children
2228       vg_data = data_disk.logical_id[0]
2229       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2230                              logical_id=(vg_data, names[0]),
2231                              params=data_disk.params)
2232       vg_meta = meta_disk.logical_id[0]
2233       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2234                              size=constants.DRBD_META_SIZE,
2235                              logical_id=(vg_meta, names[1]),
2236                              params=meta_disk.params)
2237
2238       new_lvs = [lv_data, lv_meta]
2239       old_lvs = [child.Copy() for child in dev.children]
2240       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2241       excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2242
2243       # we pass force_create=True to force the LVM creation
2244       for new_lv in new_lvs:
2245         try:
2246           _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2247                                GetInstanceInfoText(self.instance), False,
2248                                excl_stor)
2249         except errors.DeviceCreationError, e:
2250           raise errors.OpExecError("Can't create block device: %s" % e.message)
2251
2252     return iv_names
2253
2254   def _CheckDevices(self, node_uuid, iv_names):
2255     for name, (dev, _, _) in iv_names.iteritems():
2256       self.cfg.SetDiskID(dev, node_uuid)
2257
2258       result = _BlockdevFind(self, node_uuid, dev, self.instance)
2259
2260       msg = result.fail_msg
2261       if msg or not result.payload:
2262         if not msg:
2263           msg = "disk not found"
2264         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2265                                  (name, msg))
2266
2267       if result.payload.is_degraded:
2268         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2269
2270   def _RemoveOldStorage(self, node_uuid, iv_names):
2271     for name, (_, old_lvs, _) in iv_names.iteritems():
2272       self.lu.LogInfo("Remove logical volumes for %s", name)
2273
2274       for lv in old_lvs:
2275         self.cfg.SetDiskID(lv, node_uuid)
2276
2277         msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2278         if msg:
2279           self.lu.LogWarning("Can't remove old LV: %s", msg,
2280                              hint="remove unused LVs manually")
2281
2282   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2283     """Replace a disk on the primary or secondary for DRBD 8.
2284
2285     The algorithm for replace is quite complicated:
2286
2287       1. for each disk to be replaced:
2288
2289         1. create new LVs on the target node with unique names
2290         1. detach old LVs from the drbd device
2291         1. rename old LVs to name_replaced.<time_t>
2292         1. rename new LVs to old LVs
2293         1. attach the new LVs (with the old names now) to the drbd device
2294
2295       1. wait for sync across all devices
2296
2297       1. for each modified disk:
2298
2299         1. remove old LVs (which have the name name_replaces.<time_t>)
2300
2301     Failures are not very well handled.
2302
2303     """
2304     steps_total = 6
2305
2306     # Step: check device activation
2307     self.lu.LogStep(1, steps_total, "Check device existence")
2308     self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2309     self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2310
2311     # Step: check other node consistency
2312     self.lu.LogStep(2, steps_total, "Check peer consistency")
2313     self._CheckDisksConsistency(
2314       self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2315       False)
2316
2317     # Step: create new storage
2318     self.lu.LogStep(3, steps_total, "Allocate new storage")
2319     iv_names = self._CreateNewStorage(self.target_node_uuid)
2320
2321     # Step: for each lv, detach+rename*2+attach
2322     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2323     for dev, old_lvs, new_lvs in iv_names.itervalues():
2324       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2325
2326       result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2327                                                      old_lvs)
2328       result.Raise("Can't detach drbd from local storage on node"
2329                    " %s for device %s" %
2330                    (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2331       #dev.children = []
2332       #cfg.Update(instance)
2333
2334       # ok, we created the new LVs, so now we know we have the needed
2335       # storage; as such, we proceed on the target node to rename
2336       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2337       # using the assumption that logical_id == physical_id (which in
2338       # turn is the unique_id on that node)
2339
2340       # FIXME(iustin): use a better name for the replaced LVs
2341       temp_suffix = int(time.time())
2342       ren_fn = lambda d, suff: (d.physical_id[0],
2343                                 d.physical_id[1] + "_replaced-%s" % suff)
2344
2345       # Build the rename list based on what LVs exist on the node
2346       rename_old_to_new = []
2347       for to_ren in old_lvs:
2348         result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2349         if not result.fail_msg and result.payload:
2350           # device exists
2351           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2352
2353       self.lu.LogInfo("Renaming the old LVs on the target node")
2354       result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2355                                              rename_old_to_new)
2356       result.Raise("Can't rename old LVs on node %s" %
2357                    self.cfg.GetNodeName(self.target_node_uuid))
2358
2359       # Now we rename the new LVs to the old LVs
2360       self.lu.LogInfo("Renaming the new LVs on the target node")
2361       rename_new_to_old = [(new, old.physical_id)
2362                            for old, new in zip(old_lvs, new_lvs)]
2363       result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2364                                              rename_new_to_old)
2365       result.Raise("Can't rename new LVs on node %s" %
2366                    self.cfg.GetNodeName(self.target_node_uuid))
2367
2368       # Intermediate steps of in memory modifications
2369       for old, new in zip(old_lvs, new_lvs):
2370         new.logical_id = old.logical_id
2371         self.cfg.SetDiskID(new, self.target_node_uuid)
2372
2373       # We need to modify old_lvs so that removal later removes the
2374       # right LVs, not the newly added ones; note that old_lvs is a
2375       # copy here
2376       for disk in old_lvs:
2377         disk.logical_id = ren_fn(disk, temp_suffix)
2378         self.cfg.SetDiskID(disk, self.target_node_uuid)
2379
2380       # Now that the new lvs have the old name, we can add them to the device
2381       self.lu.LogInfo("Adding new mirror component on %s",
2382                       self.cfg.GetNodeName(self.target_node_uuid))
2383       result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2384                                                   (dev, self.instance), new_lvs)
2385       msg = result.fail_msg
2386       if msg:
2387         for new_lv in new_lvs:
2388           msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2389                                                new_lv).fail_msg
2390           if msg2:
2391             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2392                                hint=("cleanup manually the unused logical"
2393                                      "volumes"))
2394         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2395
2396     cstep = itertools.count(5)
2397
2398     if self.early_release:
2399       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2400       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2401       # TODO: Check if releasing locks early still makes sense
2402       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2403     else:
2404       # Release all resource locks except those used by the instance
2405       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2406                    keep=self.node_secondary_ip.keys())
2407
2408     # Release all node locks while waiting for sync
2409     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2410
2411     # TODO: Can the instance lock be downgraded here? Take the optional disk
2412     # shutdown in the caller into consideration.
2413
2414     # Wait for sync
2415     # This can fail as the old devices are degraded and _WaitForSync
2416     # does a combined result over all disks, so we don't check its return value
2417     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2418     WaitForSync(self.lu, self.instance)
2419
2420     # Check all devices manually
2421     self._CheckDevices(self.instance.primary_node, iv_names)
2422
2423     # Step: remove old storage
2424     if not self.early_release:
2425       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2426       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2427
2428   def _ExecDrbd8Secondary(self, feedback_fn):
2429     """Replace the secondary node for DRBD 8.
2430
2431     The algorithm for replace is quite complicated:
2432       - for all disks of the instance:
2433         - create new LVs on the new node with same names
2434         - shutdown the drbd device on the old secondary
2435         - disconnect the drbd network on the primary
2436         - create the drbd device on the new secondary
2437         - network attach the drbd on the primary, using an artifice:
2438           the drbd code for Attach() will connect to the network if it
2439           finds a device which is connected to the good local disks but
2440           not network enabled
2441       - wait for sync across all devices
2442       - remove all disks from the old secondary
2443
2444     Failures are not very well handled.
2445
2446     """
2447     steps_total = 6
2448
2449     pnode = self.instance.primary_node
2450
2451     # Step: check device activation
2452     self.lu.LogStep(1, steps_total, "Check device existence")
2453     self._CheckDisksExistence([self.instance.primary_node])
2454     self._CheckVolumeGroup([self.instance.primary_node])
2455
2456     # Step: check other node consistency
2457     self.lu.LogStep(2, steps_total, "Check peer consistency")
2458     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2459
2460     # Step: create new storage
2461     self.lu.LogStep(3, steps_total, "Allocate new storage")
2462     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2463     excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2464                                                   self.new_node_uuid)
2465     for idx, dev in enumerate(disks):
2466       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2467                       (self.cfg.GetNodeName(self.new_node_uuid), idx))
2468       # we pass force_create=True to force LVM creation
2469       for new_lv in dev.children:
2470         try:
2471           _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2472                                new_lv, True, GetInstanceInfoText(self.instance),
2473                                False, excl_stor)
2474         except errors.DeviceCreationError, e:
2475           raise errors.OpExecError("Can't create block device: %s" % e.message)
2476
2477     # Step 4: dbrd minors and drbd setups changes
2478     # after this, we must manually remove the drbd minors on both the
2479     # error and the success paths
2480     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2481     minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2482                                          for _ in self.instance.disks],
2483                                         self.instance.uuid)
2484     logging.debug("Allocated minors %r", minors)
2485
2486     iv_names = {}
2487     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2488       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2489                       (self.cfg.GetNodeName(self.new_node_uuid), idx))
2490       # create new devices on new_node; note that we create two IDs:
2491       # one without port, so the drbd will be activated without
2492       # networking information on the new node at this stage, and one
2493       # with network, for the latter activation in step 4
2494       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2495       if self.instance.primary_node == o_node1:
2496         p_minor = o_minor1
2497       else:
2498         assert self.instance.primary_node == o_node2, "Three-node instance?"
2499         p_minor = o_minor2
2500
2501       new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2502                       p_minor, new_minor, o_secret)
2503       new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2504                     p_minor, new_minor, o_secret)
2505
2506       iv_names[idx] = (dev, dev.children, new_net_id)
2507       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2508                     new_net_id)
2509       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2510                               logical_id=new_alone_id,
2511                               children=dev.children,
2512                               size=dev.size,
2513                               params={})
2514       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2515                                             self.cfg)
2516       try:
2517         CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2518                              anno_new_drbd,
2519                              GetInstanceInfoText(self.instance), False,
2520                              excl_stor)
2521       except errors.GenericError:
2522         self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2523         raise
2524
2525     # We have new devices, shutdown the drbd on the old secondary
2526     for idx, dev in enumerate(self.instance.disks):
2527       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2528       self.cfg.SetDiskID(dev, self.target_node_uuid)
2529       msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2530                                             (dev, self.instance)).fail_msg
2531       if msg:
2532         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2533                            "node: %s" % (idx, msg),
2534                            hint=("Please cleanup this device manually as"
2535                                  " soon as possible"))
2536
2537     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2538     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2539                                                self.instance.disks)[pnode]
2540
2541     msg = result.fail_msg
2542     if msg:
2543       # detaches didn't succeed (unlikely)
2544       self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2545       raise errors.OpExecError("Can't detach the disks from the network on"
2546                                " old node: %s" % (msg,))
2547
2548     # if we managed to detach at least one, we update all the disks of
2549     # the instance to point to the new secondary
2550     self.lu.LogInfo("Updating instance configuration")
2551     for dev, _, new_logical_id in iv_names.itervalues():
2552       dev.logical_id = new_logical_id
2553       self.cfg.SetDiskID(dev, self.instance.primary_node)
2554
2555     self.cfg.Update(self.instance, feedback_fn)
2556
2557     # Release all node locks (the configuration has been updated)
2558     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2559
2560     # and now perform the drbd attach
2561     self.lu.LogInfo("Attaching primary drbds to new secondary"
2562                     " (standalone => connected)")
2563     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2564                                             self.new_node_uuid],
2565                                            self.node_secondary_ip,
2566                                            (self.instance.disks, self.instance),
2567                                            self.instance.name,
2568                                            False)
2569     for to_node, to_result in result.items():
2570       msg = to_result.fail_msg
2571       if msg:
2572         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2573                            self.cfg.GetNodeName(to_node), msg,
2574                            hint=("please do a gnt-instance info to see the"
2575                                  " status of disks"))
2576
2577     cstep = itertools.count(5)
2578
2579     if self.early_release:
2580       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2581       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2582       # TODO: Check if releasing locks early still makes sense
2583       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2584     else:
2585       # Release all resource locks except those used by the instance
2586       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2587                    keep=self.node_secondary_ip.keys())
2588
2589     # TODO: Can the instance lock be downgraded here? Take the optional disk
2590     # shutdown in the caller into consideration.
2591
2592     # Wait for sync
2593     # This can fail as the old devices are degraded and _WaitForSync
2594     # does a combined result over all disks, so we don't check its return value
2595     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2596     WaitForSync(self.lu, self.instance)
2597
2598     # Check all devices manually
2599     self._CheckDevices(self.instance.primary_node, iv_names)
2600
2601     # Step: remove old storage
2602     if not self.early_release:
2603       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2604       self._RemoveOldStorage(self.target_node_uuid, iv_names)