(snap) Snapshot support for ExtStorage
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196   """Undo the work performed by L{CreateDisks}.
197
198   This function is called in case of an error to undo the work of
199   L{CreateDisks}.
200
201   @type lu: L{LogicalUnit}
202   @param lu: the logical unit on whose behalf we execute
203   @param disks_created: the result returned by L{CreateDisks}
204
205   """
206   for (node, disk) in disks_created:
207     lu.cfg.SetDiskID(disk, node)
208     result = lu.rpc.call_blockdev_remove(node, disk)
209     if result.fail_msg:
210       logging.warning("Failed to remove newly-created disk %s on node %s:"
211                       " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215   """Create all disks for an instance.
216
217   This abstracts away some work from AddInstance.
218
219   @type lu: L{LogicalUnit}
220   @param lu: the logical unit on whose behalf we execute
221   @type instance: L{objects.Instance}
222   @param instance: the instance whose disks we should create
223   @type to_skip: list
224   @param to_skip: list of indices to skip
225   @type target_node: string
226   @param target_node: if passed, overrides the target node for creation
227   @type disks: list of {objects.Disk}
228   @param disks: the disks to create; if not specified, all the disks of the
229       instance are created
230   @return: information about the created disks, to be used to call
231       L{_UndoCreateDisks}
232   @raise errors.OpPrereqError: in case of error
233
234   """
235   info = GetInstanceInfoText(instance)
236   if target_node is None:
237     pnode = instance.primary_node
238     all_nodes = instance.all_nodes
239   else:
240     pnode = target_node
241     all_nodes = [pnode]
242
243   if disks is None:
244     disks = instance.disks
245
246   if instance.disk_template in constants.DTS_FILEBASED:
247     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250     result.Raise("Failed to create directory '%s' on"
251                  " node %s" % (file_storage_dir, pnode))
252
253   disks_created = []
254   for idx, device in enumerate(disks):
255     if to_skip and idx in to_skip:
256       continue
257     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258     for node in all_nodes:
259       f_create = node == pnode
260       try:
261         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262         disks_created.append((node, device))
263       except errors.DeviceCreationError, e:
264         logging.warning("Creating disk %s for instance '%s' failed",
265                         idx, instance.name)
266         disks_created.extend(e.created_devices)
267         _UndoCreateDisks(lu, disks_created)
268         raise errors.OpExecError(e.message)
269   return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273   """Compute disk size requirements in the volume group
274
275   """
276   def _compute(disks, payload):
277     """Universal algorithm.
278
279     """
280     vgs = {}
281     for disk in disks:
282       vgs[disk[constants.IDISK_VG]] = \
283         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285     return vgs
286
287   # Required free disk space as a function of disk and swap space
288   req_size_dict = {
289     constants.DT_DISKLESS: {},
290     constants.DT_PLAIN: _compute(disks, 0),
291     # 128 MB are added for drbd metadata for each disk
292     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293     constants.DT_FILE: {},
294     constants.DT_SHARED_FILE: {},
295     }
296
297   if disk_template not in req_size_dict:
298     raise errors.ProgrammerError("Disk template '%s' size requirement"
299                                  " is unknown" % disk_template)
300
301   return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305   """Computes the instance disks.
306
307   @param op: The instance opcode
308   @param default_vg: The default_vg to assume
309
310   @return: The computed disks
311
312   """
313   disks = []
314   for disk in op.disks:
315     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316     if mode not in constants.DISK_ACCESS_SET:
317       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318                                  mode, errors.ECODE_INVAL)
319     size = disk.get(constants.IDISK_SIZE, None)
320     if size is None:
321       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322     try:
323       size = int(size)
324     except (TypeError, ValueError):
325       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326                                  errors.ECODE_INVAL)
327
328     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329     if ext_provider and op.disk_template != constants.DT_EXT:
330       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331                                  " disk template, not %s" %
332                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
333                                   op.disk_template), errors.ECODE_INVAL)
334
335     data_vg = disk.get(constants.IDISK_VG, default_vg)
336     name = disk.get(constants.IDISK_NAME, None)
337     if name is not None and name.lower() == constants.VALUE_NONE:
338       name = None
339     new_disk = {
340       constants.IDISK_SIZE: size,
341       constants.IDISK_MODE: mode,
342       constants.IDISK_VG: data_vg,
343       constants.IDISK_NAME: name,
344       }
345
346     if constants.IDISK_METAVG in disk:
347       new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348     if constants.IDISK_ADOPT in disk:
349       new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350
351     # For extstorage, demand the `provider' option and add any
352     # additional parameters (ext-params) to the dict
353     if op.disk_template == constants.DT_EXT:
354       if ext_provider:
355         new_disk[constants.IDISK_PROVIDER] = ext_provider
356         for key in disk:
357           if key not in constants.IDISK_PARAMS:
358             new_disk[key] = disk[key]
359       else:
360         raise errors.OpPrereqError("Missing provider for template '%s'" %
361                                    constants.DT_EXT, errors.ECODE_INVAL)
362
363     disks.append(new_disk)
364
365   return disks
366
367
368 def CheckRADOSFreeSpace():
369   """Compute disk size requirements inside the RADOS cluster.
370
371   """
372   # For the RADOS cluster we assume there is always enough space.
373   pass
374
375
376 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377                          iv_name, p_minor, s_minor):
378   """Generate a drbd8 device complete with its children.
379
380   """
381   assert len(vgnames) == len(names) == 2
382   port = lu.cfg.AllocatePort()
383   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386                           logical_id=(vgnames[0], names[0]),
387                           params={})
388   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389   dev_meta = objects.Disk(dev_type=constants.LD_LV,
390                           size=constants.DRBD_META_SIZE,
391                           logical_id=(vgnames[1], names[1]),
392                           params={})
393   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395                           logical_id=(primary, secondary, port,
396                                       p_minor, s_minor,
397                                       shared_secret),
398                           children=[dev_data, dev_meta],
399                           iv_name=iv_name, params={})
400   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401   return drbd_dev
402
403
404 def GenerateDiskTemplate(
405   lu, template_name, instance_name, primary_node, secondary_nodes,
406   disk_info, file_storage_dir, file_driver, base_index,
407   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409   """Generate the entire disk layout for a given template type.
410
411   """
412   vgname = lu.cfg.GetVGName()
413   disk_count = len(disk_info)
414   disks = []
415
416   if template_name == constants.DT_DISKLESS:
417     pass
418   elif template_name == constants.DT_DRBD8:
419     if len(secondary_nodes) != 1:
420       raise errors.ProgrammerError("Wrong template configuration")
421     remote_node = secondary_nodes[0]
422     minors = lu.cfg.AllocateDRBDMinor(
423       [primary_node, remote_node] * len(disk_info), instance_name)
424
425     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426                                                        full_disk_params)
427     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428
429     names = []
430     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431                                                for i in range(disk_count)]):
432       names.append(lv_prefix + "_data")
433       names.append(lv_prefix + "_meta")
434     for idx, disk in enumerate(disk_info):
435       disk_index = idx + base_index
436       data_vg = disk.get(constants.IDISK_VG, vgname)
437       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439                                       disk[constants.IDISK_SIZE],
440                                       [data_vg, meta_vg],
441                                       names[idx * 2:idx * 2 + 2],
442                                       "disk/%d" % disk_index,
443                                       minors[idx * 2], minors[idx * 2 + 1])
444       disk_dev.mode = disk[constants.IDISK_MODE]
445       disk_dev.name = disk.get(constants.IDISK_NAME, None)
446       disks.append(disk_dev)
447   else:
448     if secondary_nodes:
449       raise errors.ProgrammerError("Wrong template configuration")
450
451     if template_name == constants.DT_FILE:
452       _req_file_storage()
453     elif template_name == constants.DT_SHARED_FILE:
454       _req_shr_file_storage()
455
456     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457     if name_prefix is None:
458       names = None
459     else:
460       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461                                         (name_prefix, base_index + i)
462                                         for i in range(disk_count)])
463
464     if template_name == constants.DT_PLAIN:
465
466       def logical_id_fn(idx, _, disk):
467         vg = disk.get(constants.IDISK_VG, vgname)
468         return (vg, names[idx])
469
470     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471       logical_id_fn = \
472         lambda _, disk_index, disk: (file_driver,
473                                      "%s/disk%d" % (file_storage_dir,
474                                                     disk_index))
475     elif template_name == constants.DT_BLOCK:
476       logical_id_fn = \
477         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478                                        disk[constants.IDISK_ADOPT])
479     elif template_name == constants.DT_RBD:
480       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481     elif template_name == constants.DT_EXT:
482       def logical_id_fn(idx, _, disk):
483         provider = disk.get(constants.IDISK_PROVIDER, None)
484         if provider is None:
485           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486                                        " not found", constants.DT_EXT,
487                                        constants.IDISK_PROVIDER)
488         return (provider, names[idx])
489     else:
490       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491
492     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493
494     for idx, disk in enumerate(disk_info):
495       params = {}
496       # Only for the Ext template add disk_info to params
497       if template_name == constants.DT_EXT:
498         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499         for key in disk:
500           if key not in constants.IDISK_PARAMS:
501             params[key] = disk[key]
502       disk_index = idx + base_index
503       size = disk[constants.IDISK_SIZE]
504       feedback_fn("* disk %s, size %s" %
505                   (disk_index, utils.FormatUnit(size, "h")))
506       disk_dev = objects.Disk(dev_type=dev_type, size=size,
507                               logical_id=logical_id_fn(idx, disk_index, disk),
508                               iv_name="disk/%d" % disk_index,
509                               mode=disk[constants.IDISK_MODE],
510                               params=params)
511       disk_dev.name = disk.get(constants.IDISK_NAME, None)
512       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513       disks.append(disk_dev)
514
515   return disks
516
517
518 class LUInstanceRecreateDisks(LogicalUnit):
519   """Recreate an instance's missing disks.
520
521   """
522   HPATH = "instance-recreate-disks"
523   HTYPE = constants.HTYPE_INSTANCE
524   REQ_BGL = False
525
526   _MODIFYABLE = compat.UniqueFrozenset([
527     constants.IDISK_SIZE,
528     constants.IDISK_MODE,
529     ])
530
531   # New or changed disk parameters may have different semantics
532   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533     constants.IDISK_ADOPT,
534
535     # TODO: Implement support changing VG while recreating
536     constants.IDISK_VG,
537     constants.IDISK_METAVG,
538     constants.IDISK_PROVIDER,
539     constants.IDISK_NAME,
540     constants.IDISK_SNAPSHOT_NAME,
541     ]))
542
543   def _RunAllocator(self):
544     """Run the allocator based on input opcode.
545
546     """
547     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
548
549     # FIXME
550     # The allocator should actually run in "relocate" mode, but current
551     # allocators don't support relocating all the nodes of an instance at
552     # the same time. As a workaround we use "allocate" mode, but this is
553     # suboptimal for two reasons:
554     # - The instance name passed to the allocator is present in the list of
555     #   existing instances, so there could be a conflict within the
556     #   internal structures of the allocator. This doesn't happen with the
557     #   current allocators, but it's a liability.
558     # - The allocator counts the resources used by the instance twice: once
559     #   because the instance exists already, and once because it tries to
560     #   allocate a new instance.
561     # The allocator could choose some of the nodes on which the instance is
562     # running, but that's not a problem. If the instance nodes are broken,
563     # they should be already be marked as drained or offline, and hence
564     # skipped by the allocator. If instance disks have been lost for other
565     # reasons, then recreating the disks on the same nodes should be fine.
566     disk_template = self.instance.disk_template
567     spindle_use = be_full[constants.BE_SPINDLE_USE]
568     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
569                                         disk_template=disk_template,
570                                         tags=list(self.instance.GetTags()),
571                                         os=self.instance.os,
572                                         nics=[{}],
573                                         vcpus=be_full[constants.BE_VCPUS],
574                                         memory=be_full[constants.BE_MAXMEM],
575                                         spindle_use=spindle_use,
576                                         disks=[{constants.IDISK_SIZE: d.size,
577                                                 constants.IDISK_MODE: d.mode}
578                                                for d in self.instance.disks],
579                                         hypervisor=self.instance.hypervisor,
580                                         node_whitelist=None)
581     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
582
583     ial.Run(self.op.iallocator)
584
585     assert req.RequiredNodes() == len(self.instance.all_nodes)
586
587     if not ial.success:
588       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
589                                  " %s" % (self.op.iallocator, ial.info),
590                                  errors.ECODE_NORES)
591
592     self.op.nodes = ial.result
593     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
594                  self.op.instance_name, self.op.iallocator,
595                  utils.CommaJoin(ial.result))
596
597   def CheckArguments(self):
598     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
599       # Normalize and convert deprecated list of disk indices
600       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
601
602     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
603     if duplicates:
604       raise errors.OpPrereqError("Some disks have been specified more than"
605                                  " once: %s" % utils.CommaJoin(duplicates),
606                                  errors.ECODE_INVAL)
607
608     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
609     # when neither iallocator nor nodes are specified
610     if self.op.iallocator or self.op.nodes:
611       CheckIAllocatorOrNode(self, "iallocator", "nodes")
612
613     for (idx, params) in self.op.disks:
614       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
615       unsupported = frozenset(params.keys()) - self._MODIFYABLE
616       if unsupported:
617         raise errors.OpPrereqError("Parameters for disk %s try to change"
618                                    " unmodifyable parameter(s): %s" %
619                                    (idx, utils.CommaJoin(unsupported)),
620                                    errors.ECODE_INVAL)
621
622   def ExpandNames(self):
623     self._ExpandAndLockInstance()
624     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
625
626     if self.op.nodes:
627       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
628       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
629     else:
630       self.needed_locks[locking.LEVEL_NODE] = []
631       if self.op.iallocator:
632         # iallocator will select a new node in the same group
633         self.needed_locks[locking.LEVEL_NODEGROUP] = []
634         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
635
636     self.needed_locks[locking.LEVEL_NODE_RES] = []
637
638   def DeclareLocks(self, level):
639     if level == locking.LEVEL_NODEGROUP:
640       assert self.op.iallocator is not None
641       assert not self.op.nodes
642       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
643       self.share_locks[locking.LEVEL_NODEGROUP] = 1
644       # Lock the primary group used by the instance optimistically; this
645       # requires going via the node before it's locked, requiring
646       # verification later on
647       self.needed_locks[locking.LEVEL_NODEGROUP] = \
648         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
649
650     elif level == locking.LEVEL_NODE:
651       # If an allocator is used, then we lock all the nodes in the current
652       # instance group, as we don't know yet which ones will be selected;
653       # if we replace the nodes without using an allocator, locks are
654       # already declared in ExpandNames; otherwise, we need to lock all the
655       # instance nodes for disk re-creation
656       if self.op.iallocator:
657         assert not self.op.nodes
658         assert not self.needed_locks[locking.LEVEL_NODE]
659         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
660
661         # Lock member nodes of the group of the primary node
662         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
663           self.needed_locks[locking.LEVEL_NODE].extend(
664             self.cfg.GetNodeGroup(group_uuid).members)
665
666         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
667       elif not self.op.nodes:
668         self._LockInstancesNodes(primary_only=False)
669     elif level == locking.LEVEL_NODE_RES:
670       # Copy node locks
671       self.needed_locks[locking.LEVEL_NODE_RES] = \
672         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
673
674   def BuildHooksEnv(self):
675     """Build hooks env.
676
677     This runs on master, primary and secondary nodes of the instance.
678
679     """
680     return BuildInstanceHookEnvByObject(self, self.instance)
681
682   def BuildHooksNodes(self):
683     """Build hooks nodes.
684
685     """
686     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
687     return (nl, nl)
688
689   def CheckPrereq(self):
690     """Check prerequisites.
691
692     This checks that the instance is in the cluster and is not running.
693
694     """
695     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
696     assert instance is not None, \
697       "Cannot retrieve locked instance %s" % self.op.instance_name
698     if self.op.nodes:
699       if len(self.op.nodes) != len(instance.all_nodes):
700         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
701                                    " %d replacement nodes were specified" %
702                                    (instance.name, len(instance.all_nodes),
703                                     len(self.op.nodes)),
704                                    errors.ECODE_INVAL)
705       assert instance.disk_template != constants.DT_DRBD8 or \
706              len(self.op.nodes) == 2
707       assert instance.disk_template != constants.DT_PLAIN or \
708              len(self.op.nodes) == 1
709       primary_node = self.op.nodes[0]
710     else:
711       primary_node = instance.primary_node
712     if not self.op.iallocator:
713       CheckNodeOnline(self, primary_node)
714
715     if instance.disk_template == constants.DT_DISKLESS:
716       raise errors.OpPrereqError("Instance '%s' has no disks" %
717                                  self.op.instance_name, errors.ECODE_INVAL)
718
719     # Verify if node group locks are still correct
720     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
721     if owned_groups:
722       # Node group locks are acquired only for the primary node (and only
723       # when the allocator is used)
724       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
725                               primary_only=True)
726
727     # if we replace nodes *and* the old primary is offline, we don't
728     # check the instance state
729     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
730     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
731       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
732                          msg="cannot recreate disks")
733
734     if self.op.disks:
735       self.disks = dict(self.op.disks)
736     else:
737       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
738
739     maxidx = max(self.disks.keys())
740     if maxidx >= len(instance.disks):
741       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
742                                  errors.ECODE_INVAL)
743
744     if ((self.op.nodes or self.op.iallocator) and
745          sorted(self.disks.keys()) != range(len(instance.disks))):
746       raise errors.OpPrereqError("Can't recreate disks partially and"
747                                  " change the nodes at the same time",
748                                  errors.ECODE_INVAL)
749
750     self.instance = instance
751
752     if self.op.iallocator:
753       self._RunAllocator()
754       # Release unneeded node and node resource locks
755       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
756       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
757       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
758
759     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
760
761   def Exec(self, feedback_fn):
762     """Recreate the disks.
763
764     """
765     instance = self.instance
766
767     assert (self.owned_locks(locking.LEVEL_NODE) ==
768             self.owned_locks(locking.LEVEL_NODE_RES))
769
770     to_skip = []
771     mods = [] # keeps track of needed changes
772
773     for idx, disk in enumerate(instance.disks):
774       try:
775         changes = self.disks[idx]
776       except KeyError:
777         # Disk should not be recreated
778         to_skip.append(idx)
779         continue
780
781       # update secondaries for disks, if needed
782       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
783         # need to update the nodes and minors
784         assert len(self.op.nodes) == 2
785         assert len(disk.logical_id) == 6 # otherwise disk internals
786                                          # have changed
787         (_, _, old_port, _, _, old_secret) = disk.logical_id
788         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
789         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
790                   new_minors[0], new_minors[1], old_secret)
791         assert len(disk.logical_id) == len(new_id)
792       else:
793         new_id = None
794
795       mods.append((idx, new_id, changes))
796
797     # now that we have passed all asserts above, we can apply the mods
798     # in a single run (to avoid partial changes)
799     for idx, new_id, changes in mods:
800       disk = instance.disks[idx]
801       if new_id is not None:
802         assert disk.dev_type == constants.LD_DRBD8
803         disk.logical_id = new_id
804       if changes:
805         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
806                     mode=changes.get(constants.IDISK_MODE, None))
807
808     # change primary node, if needed
809     if self.op.nodes:
810       instance.primary_node = self.op.nodes[0]
811       self.LogWarning("Changing the instance's nodes, you will have to"
812                       " remove any disks left on the older nodes manually")
813
814     if self.op.nodes:
815       self.cfg.Update(instance, feedback_fn)
816
817     # All touched nodes must be locked
818     mylocks = self.owned_locks(locking.LEVEL_NODE)
819     assert mylocks.issuperset(frozenset(instance.all_nodes))
820     new_disks = CreateDisks(self, instance, to_skip=to_skip)
821
822     # TODO: Release node locks before wiping, or explain why it's not possible
823     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
824       wipedisks = [(idx, disk, 0)
825                    for (idx, disk) in enumerate(instance.disks)
826                    if idx not in to_skip]
827       WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
828
829
830 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
831   """Checks if nodes have enough free disk space in the specified VG.
832
833   This function checks if all given nodes have the needed amount of
834   free disk. In case any node has less disk or we cannot get the
835   information from the node, this function raises an OpPrereqError
836   exception.
837
838   @type lu: C{LogicalUnit}
839   @param lu: a logical unit from which we get configuration data
840   @type nodenames: C{list}
841   @param nodenames: the list of node names to check
842   @type vg: C{str}
843   @param vg: the volume group to check
844   @type requested: C{int}
845   @param requested: the amount of disk in MiB to check for
846   @raise errors.OpPrereqError: if the node doesn't have enough disk,
847       or we cannot check the node
848
849   """
850   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
851   nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
852   for node in nodenames:
853     info = nodeinfo[node]
854     info.Raise("Cannot get current information from node %s" % node,
855                prereq=True, ecode=errors.ECODE_ENVIRON)
856     (_, (vg_info, ), _) = info.payload
857     vg_free = vg_info.get("vg_free", None)
858     if not isinstance(vg_free, int):
859       raise errors.OpPrereqError("Can't compute free disk space on node"
860                                  " %s for vg %s, result was '%s'" %
861                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
862     if requested > vg_free:
863       raise errors.OpPrereqError("Not enough disk space on target node %s"
864                                  " vg %s: required %d MiB, available %d MiB" %
865                                  (node, vg, requested, vg_free),
866                                  errors.ECODE_NORES)
867
868
869 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
870   """Checks if nodes have enough free disk space in all the VGs.
871
872   This function checks if all given nodes have the needed amount of
873   free disk. In case any node has less disk or we cannot get the
874   information from the node, this function raises an OpPrereqError
875   exception.
876
877   @type lu: C{LogicalUnit}
878   @param lu: a logical unit from which we get configuration data
879   @type nodenames: C{list}
880   @param nodenames: the list of node names to check
881   @type req_sizes: C{dict}
882   @param req_sizes: the hash of vg and corresponding amount of disk in
883       MiB to check for
884   @raise errors.OpPrereqError: if the node doesn't have enough disk,
885       or we cannot check the node
886
887   """
888   for vg, req_size in req_sizes.items():
889     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
890
891
892 def _DiskSizeInBytesToMebibytes(lu, size):
893   """Converts a disk size in bytes to mebibytes.
894
895   Warns and rounds up if the size isn't an even multiple of 1 MiB.
896
897   """
898   (mib, remainder) = divmod(size, 1024 * 1024)
899
900   if remainder != 0:
901     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
902                   " to not overwrite existing data (%s bytes will not be"
903                   " wiped)", (1024 * 1024) - remainder)
904     mib += 1
905
906   return mib
907
908
909 def _CalcEta(time_taken, written, total_size):
910   """Calculates the ETA based on size written and total size.
911
912   @param time_taken: The time taken so far
913   @param written: amount written so far
914   @param total_size: The total size of data to be written
915   @return: The remaining time in seconds
916
917   """
918   avg_time = time_taken / float(written)
919   return (total_size - written) * avg_time
920
921
922 def WipeDisks(lu, instance, disks=None):
923   """Wipes instance disks.
924
925   @type lu: L{LogicalUnit}
926   @param lu: the logical unit on whose behalf we execute
927   @type instance: L{objects.Instance}
928   @param instance: the instance whose disks we should create
929   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
930   @param disks: Disk details; tuple contains disk index, disk object and the
931     start offset
932
933   """
934   node = instance.primary_node
935
936   if disks is None:
937     disks = [(idx, disk, 0)
938              for (idx, disk) in enumerate(instance.disks)]
939
940   for (_, device, _) in disks:
941     lu.cfg.SetDiskID(device, node)
942
943   logging.info("Pausing synchronization of disks of instance '%s'",
944                instance.name)
945   result = lu.rpc.call_blockdev_pause_resume_sync(node,
946                                                   (map(compat.snd, disks),
947                                                    instance),
948                                                   True)
949   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
950
951   for idx, success in enumerate(result.payload):
952     if not success:
953       logging.warn("Pausing synchronization of disk %s of instance '%s'"
954                    " failed", idx, instance.name)
955
956   try:
957     for (idx, device, offset) in disks:
958       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
959       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
960       wipe_chunk_size = \
961         int(min(constants.MAX_WIPE_CHUNK,
962                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
963
964       size = device.size
965       last_output = 0
966       start_time = time.time()
967
968       if offset == 0:
969         info_text = ""
970       else:
971         info_text = (" (from %s to %s)" %
972                      (utils.FormatUnit(offset, "h"),
973                       utils.FormatUnit(size, "h")))
974
975       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
976
977       logging.info("Wiping disk %d for instance %s on node %s using"
978                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
979
980       while offset < size:
981         wipe_size = min(wipe_chunk_size, size - offset)
982
983         logging.debug("Wiping disk %d, offset %s, chunk %s",
984                       idx, offset, wipe_size)
985
986         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
987                                            wipe_size)
988         result.Raise("Could not wipe disk %d at offset %d for size %d" %
989                      (idx, offset, wipe_size))
990
991         now = time.time()
992         offset += wipe_size
993         if now - last_output >= 60:
994           eta = _CalcEta(now - start_time, offset, size)
995           lu.LogInfo(" - done: %.1f%% ETA: %s",
996                      offset / float(size) * 100, utils.FormatSeconds(eta))
997           last_output = now
998   finally:
999     logging.info("Resuming synchronization of disks for instance '%s'",
1000                  instance.name)
1001
1002     result = lu.rpc.call_blockdev_pause_resume_sync(node,
1003                                                     (map(compat.snd, disks),
1004                                                      instance),
1005                                                     False)
1006
1007     if result.fail_msg:
1008       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1009                     node, result.fail_msg)
1010     else:
1011       for idx, success in enumerate(result.payload):
1012         if not success:
1013           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1014                         " failed", idx, instance.name)
1015
1016
1017 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1018   """Wrapper for L{WipeDisks} that handles errors.
1019
1020   @type lu: L{LogicalUnit}
1021   @param lu: the logical unit on whose behalf we execute
1022   @type instance: L{objects.Instance}
1023   @param instance: the instance whose disks we should wipe
1024   @param disks: see L{WipeDisks}
1025   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1026       case of error
1027   @raise errors.OpPrereqError: in case of failure
1028
1029   """
1030   try:
1031     WipeDisks(lu, instance, disks=disks)
1032   except errors.OpExecError:
1033     logging.warning("Wiping disks for instance '%s' failed",
1034                     instance.name)
1035     _UndoCreateDisks(lu, cleanup)
1036     raise
1037
1038
1039 def ExpandCheckDisks(instance, disks):
1040   """Return the instance disks selected by the disks list
1041
1042   @type disks: list of L{objects.Disk} or None
1043   @param disks: selected disks
1044   @rtype: list of L{objects.Disk}
1045   @return: selected instance disks to act on
1046
1047   """
1048   if disks is None:
1049     return instance.disks
1050   else:
1051     if not set(disks).issubset(instance.disks):
1052       raise errors.ProgrammerError("Can only act on disks belonging to the"
1053                                    " target instance: expected a subset of %r,"
1054                                    " got %r" % (instance.disks, disks))
1055     return disks
1056
1057
1058 def WaitForSync(lu, instance, disks=None, oneshot=False):
1059   """Sleep and poll for an instance's disk to sync.
1060
1061   """
1062   if not instance.disks or disks is not None and not disks:
1063     return True
1064
1065   disks = ExpandCheckDisks(instance, disks)
1066
1067   if not oneshot:
1068     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1069
1070   node = instance.primary_node
1071
1072   for dev in disks:
1073     lu.cfg.SetDiskID(dev, node)
1074
1075   # TODO: Convert to utils.Retry
1076
1077   retries = 0
1078   degr_retries = 10 # in seconds, as we sleep 1 second each time
1079   while True:
1080     max_time = 0
1081     done = True
1082     cumul_degraded = False
1083     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1084     msg = rstats.fail_msg
1085     if msg:
1086       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1087       retries += 1
1088       if retries >= 10:
1089         raise errors.RemoteError("Can't contact node %s for mirror data,"
1090                                  " aborting." % node)
1091       time.sleep(6)
1092       continue
1093     rstats = rstats.payload
1094     retries = 0
1095     for i, mstat in enumerate(rstats):
1096       if mstat is None:
1097         lu.LogWarning("Can't compute data for node %s/%s",
1098                       node, disks[i].iv_name)
1099         continue
1100
1101       cumul_degraded = (cumul_degraded or
1102                         (mstat.is_degraded and mstat.sync_percent is None))
1103       if mstat.sync_percent is not None:
1104         done = False
1105         if mstat.estimated_time is not None:
1106           rem_time = ("%s remaining (estimated)" %
1107                       utils.FormatSeconds(mstat.estimated_time))
1108           max_time = mstat.estimated_time
1109         else:
1110           rem_time = "no time estimate"
1111         lu.LogInfo("- device %s: %5.2f%% done, %s",
1112                    disks[i].iv_name, mstat.sync_percent, rem_time)
1113
1114     # if we're done but degraded, let's do a few small retries, to
1115     # make sure we see a stable and not transient situation; therefore
1116     # we force restart of the loop
1117     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1118       logging.info("Degraded disks found, %d retries left", degr_retries)
1119       degr_retries -= 1
1120       time.sleep(1)
1121       continue
1122
1123     if done or oneshot:
1124       break
1125
1126     time.sleep(min(60, max_time))
1127
1128   if done:
1129     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1130
1131   return not cumul_degraded
1132
1133
1134 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1135   """Shutdown block devices of an instance.
1136
1137   This does the shutdown on all nodes of the instance.
1138
1139   If the ignore_primary is false, errors on the primary node are
1140   ignored.
1141
1142   """
1143   all_result = True
1144
1145   if disks is None:
1146     # only mark instance disks as inactive if all disks are affected
1147     lu.cfg.MarkInstanceDisksInactive(instance.name)
1148
1149   disks = ExpandCheckDisks(instance, disks)
1150
1151   for disk in disks:
1152     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1153       lu.cfg.SetDiskID(top_disk, node)
1154       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1155       msg = result.fail_msg
1156       if msg:
1157         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1158                       disk.iv_name, node, msg)
1159         if ((node == instance.primary_node and not ignore_primary) or
1160             (node != instance.primary_node and not result.offline)):
1161           all_result = False
1162   return all_result
1163
1164
1165 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1166   """Shutdown block devices of an instance.
1167
1168   This function checks if an instance is running, before calling
1169   _ShutdownInstanceDisks.
1170
1171   """
1172   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1173   ShutdownInstanceDisks(lu, instance, disks=disks)
1174
1175
1176 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1177                           ignore_size=False):
1178   """Prepare the block devices for an instance.
1179
1180   This sets up the block devices on all nodes.
1181
1182   @type lu: L{LogicalUnit}
1183   @param lu: the logical unit on whose behalf we execute
1184   @type instance: L{objects.Instance}
1185   @param instance: the instance for whose disks we assemble
1186   @type disks: list of L{objects.Disk} or None
1187   @param disks: which disks to assemble (or all, if None)
1188   @type ignore_secondaries: boolean
1189   @param ignore_secondaries: if true, errors on secondary nodes
1190       won't result in an error return from the function
1191   @type ignore_size: boolean
1192   @param ignore_size: if true, the current known size of the disk
1193       will not be used during the disk activation, useful for cases
1194       when the size is wrong
1195   @return: False if the operation failed, otherwise a list of
1196       (host, instance_visible_name, node_visible_name)
1197       with the mapping from node devices to instance devices
1198
1199   """
1200   device_info = []
1201   disks_ok = True
1202   iname = instance.name
1203
1204   if disks is None:
1205     # only mark instance disks as active if all disks are affected
1206     lu.cfg.MarkInstanceDisksActive(iname)
1207
1208   disks = ExpandCheckDisks(instance, disks)
1209
1210   # With the two passes mechanism we try to reduce the window of
1211   # opportunity for the race condition of switching DRBD to primary
1212   # before handshaking occured, but we do not eliminate it
1213
1214   # The proper fix would be to wait (with some limits) until the
1215   # connection has been made and drbd transitions from WFConnection
1216   # into any other network-connected state (Connected, SyncTarget,
1217   # SyncSource, etc.)
1218
1219   # 1st pass, assemble on all nodes in secondary mode
1220   for idx, inst_disk in enumerate(disks):
1221     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1222       if ignore_size:
1223         node_disk = node_disk.Copy()
1224         node_disk.UnsetSize()
1225       lu.cfg.SetDiskID(node_disk, node)
1226       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1227                                              False, idx)
1228       msg = result.fail_msg
1229       if msg:
1230         is_offline_secondary = (node in instance.secondary_nodes and
1231                                 result.offline)
1232         lu.LogWarning("Could not prepare block device %s on node %s"
1233                       " (is_primary=False, pass=1): %s",
1234                       inst_disk.iv_name, node, msg)
1235         if not (ignore_secondaries or is_offline_secondary):
1236           disks_ok = False
1237
1238   # FIXME: race condition on drbd migration to primary
1239
1240   # 2nd pass, do only the primary node
1241   for idx, inst_disk in enumerate(disks):
1242     dev_path = None
1243
1244     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1245       if node != instance.primary_node:
1246         continue
1247       if ignore_size:
1248         node_disk = node_disk.Copy()
1249         node_disk.UnsetSize()
1250       lu.cfg.SetDiskID(node_disk, node)
1251       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252                                              True, idx)
1253       msg = result.fail_msg
1254       if msg:
1255         lu.LogWarning("Could not prepare block device %s on node %s"
1256                       " (is_primary=True, pass=2): %s",
1257                       inst_disk.iv_name, node, msg)
1258         disks_ok = False
1259       else:
1260         dev_path, _ = result.payload
1261
1262     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1263
1264   # leave the disks configured for the primary node
1265   # this is a workaround that would be fixed better by
1266   # improving the logical/physical id handling
1267   for disk in disks:
1268     lu.cfg.SetDiskID(disk, instance.primary_node)
1269
1270   if not disks_ok:
1271     lu.cfg.MarkInstanceDisksInactive(iname)
1272
1273   return disks_ok, device_info
1274
1275
1276 def StartInstanceDisks(lu, instance, force):
1277   """Start the disks of an instance.
1278
1279   """
1280   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1281                                       ignore_secondaries=force)
1282   if not disks_ok:
1283     ShutdownInstanceDisks(lu, instance)
1284     if force is not None and not force:
1285       lu.LogWarning("",
1286                     hint=("If the message above refers to a secondary node,"
1287                           " you can retry the operation using '--force'"))
1288     raise errors.OpExecError("Disk consistency error")
1289
1290
1291 class LUInstanceGrowDisk(LogicalUnit):
1292   """Grow a disk of an instance.
1293
1294   """
1295   HPATH = "disk-grow"
1296   HTYPE = constants.HTYPE_INSTANCE
1297   REQ_BGL = False
1298
1299   def ExpandNames(self):
1300     self._ExpandAndLockInstance()
1301     self.needed_locks[locking.LEVEL_NODE] = []
1302     self.needed_locks[locking.LEVEL_NODE_RES] = []
1303     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1304     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1305
1306   def DeclareLocks(self, level):
1307     if level == locking.LEVEL_NODE:
1308       self._LockInstancesNodes()
1309     elif level == locking.LEVEL_NODE_RES:
1310       # Copy node locks
1311       self.needed_locks[locking.LEVEL_NODE_RES] = \
1312         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1313
1314   def BuildHooksEnv(self):
1315     """Build hooks env.
1316
1317     This runs on the master, the primary and all the secondaries.
1318
1319     """
1320     env = {
1321       "DISK": self.op.disk,
1322       "AMOUNT": self.op.amount,
1323       "ABSOLUTE": self.op.absolute,
1324       }
1325     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1326     return env
1327
1328   def BuildHooksNodes(self):
1329     """Build hooks nodes.
1330
1331     """
1332     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1333     return (nl, nl)
1334
1335   def CheckPrereq(self):
1336     """Check prerequisites.
1337
1338     This checks that the instance is in the cluster.
1339
1340     """
1341     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1342     assert instance is not None, \
1343       "Cannot retrieve locked instance %s" % self.op.instance_name
1344     nodenames = list(instance.all_nodes)
1345     for node in nodenames:
1346       CheckNodeOnline(self, node)
1347
1348     self.instance = instance
1349
1350     if instance.disk_template not in constants.DTS_GROWABLE:
1351       raise errors.OpPrereqError("Instance's disk layout does not support"
1352                                  " growing", errors.ECODE_INVAL)
1353
1354     self.disk = instance.FindDisk(self.op.disk)
1355
1356     if self.op.absolute:
1357       self.target = self.op.amount
1358       self.delta = self.target - self.disk.size
1359       if self.delta < 0:
1360         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1361                                    "current disk size (%s)" %
1362                                    (utils.FormatUnit(self.target, "h"),
1363                                     utils.FormatUnit(self.disk.size, "h")),
1364                                    errors.ECODE_STATE)
1365     else:
1366       self.delta = self.op.amount
1367       self.target = self.disk.size + self.delta
1368       if self.delta < 0:
1369         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1370                                    utils.FormatUnit(self.delta, "h"),
1371                                    errors.ECODE_INVAL)
1372
1373     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1374
1375   def _CheckDiskSpace(self, nodenames, req_vgspace):
1376     template = self.instance.disk_template
1377     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1378       # TODO: check the free disk space for file, when that feature will be
1379       # supported
1380       nodes = map(self.cfg.GetNodeInfo, nodenames)
1381       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1382                         nodes)
1383       if es_nodes:
1384         # With exclusive storage we need to something smarter than just looking
1385         # at free space; for now, let's simply abort the operation.
1386         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1387                                    " is enabled", errors.ECODE_STATE)
1388       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1389
1390   def Exec(self, feedback_fn):
1391     """Execute disk grow.
1392
1393     """
1394     instance = self.instance
1395     disk = self.disk
1396
1397     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1398     assert (self.owned_locks(locking.LEVEL_NODE) ==
1399             self.owned_locks(locking.LEVEL_NODE_RES))
1400
1401     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1402
1403     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1404     if not disks_ok:
1405       raise errors.OpExecError("Cannot activate block device to grow")
1406
1407     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1408                 (self.op.disk, instance.name,
1409                  utils.FormatUnit(self.delta, "h"),
1410                  utils.FormatUnit(self.target, "h")))
1411
1412     # First run all grow ops in dry-run mode
1413     for node in instance.all_nodes:
1414       self.cfg.SetDiskID(disk, node)
1415       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1416                                            True, True)
1417       result.Raise("Dry-run grow request failed to node %s" % node)
1418
1419     if wipe_disks:
1420       # Get disk size from primary node for wiping
1421       self.cfg.SetDiskID(disk, instance.primary_node)
1422       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1423       result.Raise("Failed to retrieve disk size from node '%s'" %
1424                    instance.primary_node)
1425
1426       (disk_size_in_bytes, ) = result.payload
1427
1428       if disk_size_in_bytes is None:
1429         raise errors.OpExecError("Failed to retrieve disk size from primary"
1430                                  " node '%s'" % instance.primary_node)
1431
1432       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1433
1434       assert old_disk_size >= disk.size, \
1435         ("Retrieved disk size too small (got %s, should be at least %s)" %
1436          (old_disk_size, disk.size))
1437     else:
1438       old_disk_size = None
1439
1440     # We know that (as far as we can test) operations across different
1441     # nodes will succeed, time to run it for real on the backing storage
1442     for node in instance.all_nodes:
1443       self.cfg.SetDiskID(disk, node)
1444       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1445                                            False, True)
1446       result.Raise("Grow request failed to node %s" % node)
1447
1448     # And now execute it for logical storage, on the primary node
1449     node = instance.primary_node
1450     self.cfg.SetDiskID(disk, node)
1451     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1452                                          False, False)
1453     result.Raise("Grow request failed to node %s" % node)
1454
1455     disk.RecordGrow(self.delta)
1456     self.cfg.Update(instance, feedback_fn)
1457
1458     # Changes have been recorded, release node lock
1459     ReleaseLocks(self, locking.LEVEL_NODE)
1460
1461     # Downgrade lock while waiting for sync
1462     self.glm.downgrade(locking.LEVEL_INSTANCE)
1463
1464     assert wipe_disks ^ (old_disk_size is None)
1465
1466     if wipe_disks:
1467       assert instance.disks[self.op.disk] == disk
1468
1469       # Wipe newly added disk space
1470       WipeDisks(self, instance,
1471                 disks=[(self.op.disk, disk, old_disk_size)])
1472
1473     if self.op.wait_for_sync:
1474       disk_abort = not WaitForSync(self, instance, disks=[disk])
1475       if disk_abort:
1476         self.LogWarning("Disk syncing has not returned a good status; check"
1477                         " the instance")
1478       if not instance.disks_active:
1479         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1480     elif not instance.disks_active:
1481       self.LogWarning("Not shutting down the disk even if the instance is"
1482                       " not supposed to be running because no wait for"
1483                       " sync mode was requested")
1484
1485     assert self.owned_locks(locking.LEVEL_NODE_RES)
1486     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1487
1488
1489 class LUInstanceReplaceDisks(LogicalUnit):
1490   """Replace the disks of an instance.
1491
1492   """
1493   HPATH = "mirrors-replace"
1494   HTYPE = constants.HTYPE_INSTANCE
1495   REQ_BGL = False
1496
1497   def CheckArguments(self):
1498     """Check arguments.
1499
1500     """
1501     remote_node = self.op.remote_node
1502     ialloc = self.op.iallocator
1503     if self.op.mode == constants.REPLACE_DISK_CHG:
1504       if remote_node is None and ialloc is None:
1505         raise errors.OpPrereqError("When changing the secondary either an"
1506                                    " iallocator script must be used or the"
1507                                    " new node given", errors.ECODE_INVAL)
1508       else:
1509         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1510
1511     elif remote_node is not None or ialloc is not None:
1512       # Not replacing the secondary
1513       raise errors.OpPrereqError("The iallocator and new node options can"
1514                                  " only be used when changing the"
1515                                  " secondary node", errors.ECODE_INVAL)
1516
1517   def ExpandNames(self):
1518     self._ExpandAndLockInstance()
1519
1520     assert locking.LEVEL_NODE not in self.needed_locks
1521     assert locking.LEVEL_NODE_RES not in self.needed_locks
1522     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1523
1524     assert self.op.iallocator is None or self.op.remote_node is None, \
1525       "Conflicting options"
1526
1527     if self.op.remote_node is not None:
1528       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1529
1530       # Warning: do not remove the locking of the new secondary here
1531       # unless DRBD8.AddChildren is changed to work in parallel;
1532       # currently it doesn't since parallel invocations of
1533       # FindUnusedMinor will conflict
1534       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1535       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1536     else:
1537       self.needed_locks[locking.LEVEL_NODE] = []
1538       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1539
1540       if self.op.iallocator is not None:
1541         # iallocator will select a new node in the same group
1542         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1543         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1544
1545     self.needed_locks[locking.LEVEL_NODE_RES] = []
1546
1547     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1548                                    self.op.iallocator, self.op.remote_node,
1549                                    self.op.disks, self.op.early_release,
1550                                    self.op.ignore_ipolicy)
1551
1552     self.tasklets = [self.replacer]
1553
1554   def DeclareLocks(self, level):
1555     if level == locking.LEVEL_NODEGROUP:
1556       assert self.op.remote_node is None
1557       assert self.op.iallocator is not None
1558       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1559
1560       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1561       # Lock all groups used by instance optimistically; this requires going
1562       # via the node before it's locked, requiring verification later on
1563       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1564         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1565
1566     elif level == locking.LEVEL_NODE:
1567       if self.op.iallocator is not None:
1568         assert self.op.remote_node is None
1569         assert not self.needed_locks[locking.LEVEL_NODE]
1570         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1571
1572         # Lock member nodes of all locked groups
1573         self.needed_locks[locking.LEVEL_NODE] = \
1574           [node_name
1575            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1576            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1577       else:
1578         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1579
1580         self._LockInstancesNodes()
1581
1582     elif level == locking.LEVEL_NODE_RES:
1583       # Reuse node locks
1584       self.needed_locks[locking.LEVEL_NODE_RES] = \
1585         self.needed_locks[locking.LEVEL_NODE]
1586
1587   def BuildHooksEnv(self):
1588     """Build hooks env.
1589
1590     This runs on the master, the primary and all the secondaries.
1591
1592     """
1593     instance = self.replacer.instance
1594     env = {
1595       "MODE": self.op.mode,
1596       "NEW_SECONDARY": self.op.remote_node,
1597       "OLD_SECONDARY": instance.secondary_nodes[0],
1598       }
1599     env.update(BuildInstanceHookEnvByObject(self, instance))
1600     return env
1601
1602   def BuildHooksNodes(self):
1603     """Build hooks nodes.
1604
1605     """
1606     instance = self.replacer.instance
1607     nl = [
1608       self.cfg.GetMasterNode(),
1609       instance.primary_node,
1610       ]
1611     if self.op.remote_node is not None:
1612       nl.append(self.op.remote_node)
1613     return nl, nl
1614
1615   def CheckPrereq(self):
1616     """Check prerequisites.
1617
1618     """
1619     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1620             self.op.iallocator is None)
1621
1622     # Verify if node group locks are still correct
1623     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1624     if owned_groups:
1625       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1626
1627     return LogicalUnit.CheckPrereq(self)
1628
1629
1630 class LUInstanceActivateDisks(NoHooksLU):
1631   """Bring up an instance's disks.
1632
1633   """
1634   REQ_BGL = False
1635
1636   def ExpandNames(self):
1637     self._ExpandAndLockInstance()
1638     self.needed_locks[locking.LEVEL_NODE] = []
1639     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1640
1641   def DeclareLocks(self, level):
1642     if level == locking.LEVEL_NODE:
1643       self._LockInstancesNodes()
1644
1645   def CheckPrereq(self):
1646     """Check prerequisites.
1647
1648     This checks that the instance is in the cluster.
1649
1650     """
1651     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1652     assert self.instance is not None, \
1653       "Cannot retrieve locked instance %s" % self.op.instance_name
1654     CheckNodeOnline(self, self.instance.primary_node)
1655
1656   def Exec(self, feedback_fn):
1657     """Activate the disks.
1658
1659     """
1660     disks_ok, disks_info = \
1661               AssembleInstanceDisks(self, self.instance,
1662                                     ignore_size=self.op.ignore_size)
1663     if not disks_ok:
1664       raise errors.OpExecError("Cannot activate block devices")
1665
1666     if self.op.wait_for_sync:
1667       if not WaitForSync(self, self.instance):
1668         self.cfg.MarkInstanceDisksInactive(self.instance.name)
1669         raise errors.OpExecError("Some disks of the instance are degraded!")
1670
1671     return disks_info
1672
1673
1674 class LUInstanceDeactivateDisks(NoHooksLU):
1675   """Shutdown an instance's disks.
1676
1677   """
1678   REQ_BGL = False
1679
1680   def ExpandNames(self):
1681     self._ExpandAndLockInstance()
1682     self.needed_locks[locking.LEVEL_NODE] = []
1683     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1684
1685   def DeclareLocks(self, level):
1686     if level == locking.LEVEL_NODE:
1687       self._LockInstancesNodes()
1688
1689   def CheckPrereq(self):
1690     """Check prerequisites.
1691
1692     This checks that the instance is in the cluster.
1693
1694     """
1695     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1696     assert self.instance is not None, \
1697       "Cannot retrieve locked instance %s" % self.op.instance_name
1698
1699   def Exec(self, feedback_fn):
1700     """Deactivate the disks
1701
1702     """
1703     instance = self.instance
1704     if self.op.force:
1705       ShutdownInstanceDisks(self, instance)
1706     else:
1707       _SafeShutdownInstanceDisks(self, instance)
1708
1709
1710 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1711                                ldisk=False):
1712   """Check that mirrors are not degraded.
1713
1714   @attention: The device has to be annotated already.
1715
1716   The ldisk parameter, if True, will change the test from the
1717   is_degraded attribute (which represents overall non-ok status for
1718   the device(s)) to the ldisk (representing the local storage status).
1719
1720   """
1721   lu.cfg.SetDiskID(dev, node)
1722
1723   result = True
1724
1725   if on_primary or dev.AssembleOnSecondary():
1726     rstats = lu.rpc.call_blockdev_find(node, dev)
1727     msg = rstats.fail_msg
1728     if msg:
1729       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1730       result = False
1731     elif not rstats.payload:
1732       lu.LogWarning("Can't find disk on node %s", node)
1733       result = False
1734     else:
1735       if ldisk:
1736         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1737       else:
1738         result = result and not rstats.payload.is_degraded
1739
1740   if dev.children:
1741     for child in dev.children:
1742       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1743                                                      on_primary)
1744
1745   return result
1746
1747
1748 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1749   """Wrapper around L{_CheckDiskConsistencyInner}.
1750
1751   """
1752   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1753   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1754                                     ldisk=ldisk)
1755
1756
1757 def _BlockdevFind(lu, node, dev, instance):
1758   """Wrapper around call_blockdev_find to annotate diskparams.
1759
1760   @param lu: A reference to the lu object
1761   @param node: The node to call out
1762   @param dev: The device to find
1763   @param instance: The instance object the device belongs to
1764   @returns The result of the rpc call
1765
1766   """
1767   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1768   return lu.rpc.call_blockdev_find(node, disk)
1769
1770
1771 def _GenerateUniqueNames(lu, exts):
1772   """Generate a suitable LV name.
1773
1774   This will generate a logical volume name for the given instance.
1775
1776   """
1777   results = []
1778   for val in exts:
1779     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1780     results.append("%s%s" % (new_id, val))
1781   return results
1782
1783
1784 class TLReplaceDisks(Tasklet):
1785   """Replaces disks for an instance.
1786
1787   Note: Locking is not within the scope of this class.
1788
1789   """
1790   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1791                disks, early_release, ignore_ipolicy):
1792     """Initializes this class.
1793
1794     """
1795     Tasklet.__init__(self, lu)
1796
1797     # Parameters
1798     self.instance_name = instance_name
1799     self.mode = mode
1800     self.iallocator_name = iallocator_name
1801     self.remote_node = remote_node
1802     self.disks = disks
1803     self.early_release = early_release
1804     self.ignore_ipolicy = ignore_ipolicy
1805
1806     # Runtime data
1807     self.instance = None
1808     self.new_node = None
1809     self.target_node = None
1810     self.other_node = None
1811     self.remote_node_info = None
1812     self.node_secondary_ip = None
1813
1814   @staticmethod
1815   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1816     """Compute a new secondary node using an IAllocator.
1817
1818     """
1819     req = iallocator.IAReqRelocate(name=instance_name,
1820                                    relocate_from=list(relocate_from))
1821     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1822
1823     ial.Run(iallocator_name)
1824
1825     if not ial.success:
1826       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1827                                  " %s" % (iallocator_name, ial.info),
1828                                  errors.ECODE_NORES)
1829
1830     remote_node_name = ial.result[0]
1831
1832     lu.LogInfo("Selected new secondary for instance '%s': %s",
1833                instance_name, remote_node_name)
1834
1835     return remote_node_name
1836
1837   def _FindFaultyDisks(self, node_name):
1838     """Wrapper for L{FindFaultyInstanceDisks}.
1839
1840     """
1841     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1842                                    node_name, True)
1843
1844   def _CheckDisksActivated(self, instance):
1845     """Checks if the instance disks are activated.
1846
1847     @param instance: The instance to check disks
1848     @return: True if they are activated, False otherwise
1849
1850     """
1851     nodes = instance.all_nodes
1852
1853     for idx, dev in enumerate(instance.disks):
1854       for node in nodes:
1855         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1856         self.cfg.SetDiskID(dev, node)
1857
1858         result = _BlockdevFind(self, node, dev, instance)
1859
1860         if result.offline:
1861           continue
1862         elif result.fail_msg or not result.payload:
1863           return False
1864
1865     return True
1866
1867   def CheckPrereq(self):
1868     """Check prerequisites.
1869
1870     This checks that the instance is in the cluster.
1871
1872     """
1873     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1874     assert instance is not None, \
1875       "Cannot retrieve locked instance %s" % self.instance_name
1876
1877     if instance.disk_template != constants.DT_DRBD8:
1878       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1879                                  " instances", errors.ECODE_INVAL)
1880
1881     if len(instance.secondary_nodes) != 1:
1882       raise errors.OpPrereqError("The instance has a strange layout,"
1883                                  " expected one secondary but found %d" %
1884                                  len(instance.secondary_nodes),
1885                                  errors.ECODE_FAULT)
1886
1887     instance = self.instance
1888     secondary_node = instance.secondary_nodes[0]
1889
1890     if self.iallocator_name is None:
1891       remote_node = self.remote_node
1892     else:
1893       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1894                                        instance.name, instance.secondary_nodes)
1895
1896     if remote_node is None:
1897       self.remote_node_info = None
1898     else:
1899       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1900              "Remote node '%s' is not locked" % remote_node
1901
1902       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1903       assert self.remote_node_info is not None, \
1904         "Cannot retrieve locked node %s" % remote_node
1905
1906     if remote_node == self.instance.primary_node:
1907       raise errors.OpPrereqError("The specified node is the primary node of"
1908                                  " the instance", errors.ECODE_INVAL)
1909
1910     if remote_node == secondary_node:
1911       raise errors.OpPrereqError("The specified node is already the"
1912                                  " secondary node of the instance",
1913                                  errors.ECODE_INVAL)
1914
1915     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1916                                     constants.REPLACE_DISK_CHG):
1917       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1918                                  errors.ECODE_INVAL)
1919
1920     if self.mode == constants.REPLACE_DISK_AUTO:
1921       if not self._CheckDisksActivated(instance):
1922         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1923                                    " first" % self.instance_name,
1924                                    errors.ECODE_STATE)
1925       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1926       faulty_secondary = self._FindFaultyDisks(secondary_node)
1927
1928       if faulty_primary and faulty_secondary:
1929         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1930                                    " one node and can not be repaired"
1931                                    " automatically" % self.instance_name,
1932                                    errors.ECODE_STATE)
1933
1934       if faulty_primary:
1935         self.disks = faulty_primary
1936         self.target_node = instance.primary_node
1937         self.other_node = secondary_node
1938         check_nodes = [self.target_node, self.other_node]
1939       elif faulty_secondary:
1940         self.disks = faulty_secondary
1941         self.target_node = secondary_node
1942         self.other_node = instance.primary_node
1943         check_nodes = [self.target_node, self.other_node]
1944       else:
1945         self.disks = []
1946         check_nodes = []
1947
1948     else:
1949       # Non-automatic modes
1950       if self.mode == constants.REPLACE_DISK_PRI:
1951         self.target_node = instance.primary_node
1952         self.other_node = secondary_node
1953         check_nodes = [self.target_node, self.other_node]
1954
1955       elif self.mode == constants.REPLACE_DISK_SEC:
1956         self.target_node = secondary_node
1957         self.other_node = instance.primary_node
1958         check_nodes = [self.target_node, self.other_node]
1959
1960       elif self.mode == constants.REPLACE_DISK_CHG:
1961         self.new_node = remote_node
1962         self.other_node = instance.primary_node
1963         self.target_node = secondary_node
1964         check_nodes = [self.new_node, self.other_node]
1965
1966         CheckNodeNotDrained(self.lu, remote_node)
1967         CheckNodeVmCapable(self.lu, remote_node)
1968
1969         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1970         assert old_node_info is not None
1971         if old_node_info.offline and not self.early_release:
1972           # doesn't make sense to delay the release
1973           self.early_release = True
1974           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1975                           " early-release mode", secondary_node)
1976
1977       else:
1978         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1979                                      self.mode)
1980
1981       # If not specified all disks should be replaced
1982       if not self.disks:
1983         self.disks = range(len(self.instance.disks))
1984
1985     # TODO: This is ugly, but right now we can't distinguish between internal
1986     # submitted opcode and external one. We should fix that.
1987     if self.remote_node_info:
1988       # We change the node, lets verify it still meets instance policy
1989       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1990       cluster = self.cfg.GetClusterInfo()
1991       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1992                                                               new_group_info)
1993       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1994                              self.cfg, ignore=self.ignore_ipolicy)
1995
1996     for node in check_nodes:
1997       CheckNodeOnline(self.lu, node)
1998
1999     touched_nodes = frozenset(node_name for node_name in [self.new_node,
2000                                                           self.other_node,
2001                                                           self.target_node]
2002                               if node_name is not None)
2003
2004     # Release unneeded node and node resource locks
2005     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2006     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2007     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2008
2009     # Release any owned node group
2010     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2011
2012     # Check whether disks are valid
2013     for disk_idx in self.disks:
2014       instance.FindDisk(disk_idx)
2015
2016     # Get secondary node IP addresses
2017     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2018                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2019
2020   def Exec(self, feedback_fn):
2021     """Execute disk replacement.
2022
2023     This dispatches the disk replacement to the appropriate handler.
2024
2025     """
2026     if __debug__:
2027       # Verify owned locks before starting operation
2028       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2029       assert set(owned_nodes) == set(self.node_secondary_ip), \
2030           ("Incorrect node locks, owning %s, expected %s" %
2031            (owned_nodes, self.node_secondary_ip.keys()))
2032       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2033               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2034       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2035
2036       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2037       assert list(owned_instances) == [self.instance_name], \
2038           "Instance '%s' not locked" % self.instance_name
2039
2040       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2041           "Should not own any node group lock at this point"
2042
2043     if not self.disks:
2044       feedback_fn("No disks need replacement for instance '%s'" %
2045                   self.instance.name)
2046       return
2047
2048     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2049                 (utils.CommaJoin(self.disks), self.instance.name))
2050     feedback_fn("Current primary node: %s" % self.instance.primary_node)
2051     feedback_fn("Current seconary node: %s" %
2052                 utils.CommaJoin(self.instance.secondary_nodes))
2053
2054     activate_disks = not self.instance.disks_active
2055
2056     # Activate the instance disks if we're replacing them on a down instance
2057     if activate_disks:
2058       StartInstanceDisks(self.lu, self.instance, True)
2059
2060     try:
2061       # Should we replace the secondary node?
2062       if self.new_node is not None:
2063         fn = self._ExecDrbd8Secondary
2064       else:
2065         fn = self._ExecDrbd8DiskOnly
2066
2067       result = fn(feedback_fn)
2068     finally:
2069       # Deactivate the instance disks if we're replacing them on a
2070       # down instance
2071       if activate_disks:
2072         _SafeShutdownInstanceDisks(self.lu, self.instance)
2073
2074     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2075
2076     if __debug__:
2077       # Verify owned locks
2078       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2079       nodes = frozenset(self.node_secondary_ip)
2080       assert ((self.early_release and not owned_nodes) or
2081               (not self.early_release and not (set(owned_nodes) - nodes))), \
2082         ("Not owning the correct locks, early_release=%s, owned=%r,"
2083          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2084
2085     return result
2086
2087   def _CheckVolumeGroup(self, nodes):
2088     self.lu.LogInfo("Checking volume groups")
2089
2090     vgname = self.cfg.GetVGName()
2091
2092     # Make sure volume group exists on all involved nodes
2093     results = self.rpc.call_vg_list(nodes)
2094     if not results:
2095       raise errors.OpExecError("Can't list volume groups on the nodes")
2096
2097     for node in nodes:
2098       res = results[node]
2099       res.Raise("Error checking node %s" % node)
2100       if vgname not in res.payload:
2101         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2102                                  (vgname, node))
2103
2104   def _CheckDisksExistence(self, nodes):
2105     # Check disk existence
2106     for idx, dev in enumerate(self.instance.disks):
2107       if idx not in self.disks:
2108         continue
2109
2110       for node in nodes:
2111         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2112         self.cfg.SetDiskID(dev, node)
2113
2114         result = _BlockdevFind(self, node, dev, self.instance)
2115
2116         msg = result.fail_msg
2117         if msg or not result.payload:
2118           if not msg:
2119             msg = "disk not found"
2120           if not self._CheckDisksActivated(self.instance):
2121             extra_hint = ("\nDisks seem to be not properly activated. Try"
2122                           " running activate-disks on the instance before"
2123                           " using replace-disks.")
2124           else:
2125             extra_hint = ""
2126           raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2127                                    (idx, node, msg, extra_hint))
2128
2129   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2130     for idx, dev in enumerate(self.instance.disks):
2131       if idx not in self.disks:
2132         continue
2133
2134       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2135                       (idx, node_name))
2136
2137       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2138                                   on_primary, ldisk=ldisk):
2139         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2140                                  " replace disks for instance %s" %
2141                                  (node_name, self.instance.name))
2142
2143   def _CreateNewStorage(self, node_name):
2144     """Create new storage on the primary or secondary node.
2145
2146     This is only used for same-node replaces, not for changing the
2147     secondary node, hence we don't want to modify the existing disk.
2148
2149     """
2150     iv_names = {}
2151
2152     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2153     for idx, dev in enumerate(disks):
2154       if idx not in self.disks:
2155         continue
2156
2157       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2158
2159       self.cfg.SetDiskID(dev, node_name)
2160
2161       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2162       names = _GenerateUniqueNames(self.lu, lv_names)
2163
2164       (data_disk, meta_disk) = dev.children
2165       vg_data = data_disk.logical_id[0]
2166       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2167                              logical_id=(vg_data, names[0]),
2168                              params=data_disk.params)
2169       vg_meta = meta_disk.logical_id[0]
2170       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2171                              size=constants.DRBD_META_SIZE,
2172                              logical_id=(vg_meta, names[1]),
2173                              params=meta_disk.params)
2174
2175       new_lvs = [lv_data, lv_meta]
2176       old_lvs = [child.Copy() for child in dev.children]
2177       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2178       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2179
2180       # we pass force_create=True to force the LVM creation
2181       for new_lv in new_lvs:
2182         try:
2183           _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2184                                GetInstanceInfoText(self.instance), False,
2185                                excl_stor)
2186         except errors.DeviceCreationError, e:
2187           raise errors.OpExecError("Can't create block device: %s" % e.message)
2188
2189     return iv_names
2190
2191   def _CheckDevices(self, node_name, iv_names):
2192     for name, (dev, _, _) in iv_names.iteritems():
2193       self.cfg.SetDiskID(dev, node_name)
2194
2195       result = _BlockdevFind(self, node_name, dev, self.instance)
2196
2197       msg = result.fail_msg
2198       if msg or not result.payload:
2199         if not msg:
2200           msg = "disk not found"
2201         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2202                                  (name, msg))
2203
2204       if result.payload.is_degraded:
2205         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2206
2207   def _RemoveOldStorage(self, node_name, iv_names):
2208     for name, (_, old_lvs, _) in iv_names.iteritems():
2209       self.lu.LogInfo("Remove logical volumes for %s", name)
2210
2211       for lv in old_lvs:
2212         self.cfg.SetDiskID(lv, node_name)
2213
2214         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2215         if msg:
2216           self.lu.LogWarning("Can't remove old LV: %s", msg,
2217                              hint="remove unused LVs manually")
2218
2219   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2220     """Replace a disk on the primary or secondary for DRBD 8.
2221
2222     The algorithm for replace is quite complicated:
2223
2224       1. for each disk to be replaced:
2225
2226         1. create new LVs on the target node with unique names
2227         1. detach old LVs from the drbd device
2228         1. rename old LVs to name_replaced.<time_t>
2229         1. rename new LVs to old LVs
2230         1. attach the new LVs (with the old names now) to the drbd device
2231
2232       1. wait for sync across all devices
2233
2234       1. for each modified disk:
2235
2236         1. remove old LVs (which have the name name_replaces.<time_t>)
2237
2238     Failures are not very well handled.
2239
2240     """
2241     steps_total = 6
2242
2243     # Step: check device activation
2244     self.lu.LogStep(1, steps_total, "Check device existence")
2245     self._CheckDisksExistence([self.other_node, self.target_node])
2246     self._CheckVolumeGroup([self.target_node, self.other_node])
2247
2248     # Step: check other node consistency
2249     self.lu.LogStep(2, steps_total, "Check peer consistency")
2250     self._CheckDisksConsistency(self.other_node,
2251                                 self.other_node == self.instance.primary_node,
2252                                 False)
2253
2254     # Step: create new storage
2255     self.lu.LogStep(3, steps_total, "Allocate new storage")
2256     iv_names = self._CreateNewStorage(self.target_node)
2257
2258     # Step: for each lv, detach+rename*2+attach
2259     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2260     for dev, old_lvs, new_lvs in iv_names.itervalues():
2261       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2262
2263       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2264                                                      old_lvs)
2265       result.Raise("Can't detach drbd from local storage on node"
2266                    " %s for device %s" % (self.target_node, dev.iv_name))
2267       #dev.children = []
2268       #cfg.Update(instance)
2269
2270       # ok, we created the new LVs, so now we know we have the needed
2271       # storage; as such, we proceed on the target node to rename
2272       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2273       # using the assumption that logical_id == physical_id (which in
2274       # turn is the unique_id on that node)
2275
2276       # FIXME(iustin): use a better name for the replaced LVs
2277       temp_suffix = int(time.time())
2278       ren_fn = lambda d, suff: (d.physical_id[0],
2279                                 d.physical_id[1] + "_replaced-%s" % suff)
2280
2281       # Build the rename list based on what LVs exist on the node
2282       rename_old_to_new = []
2283       for to_ren in old_lvs:
2284         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2285         if not result.fail_msg and result.payload:
2286           # device exists
2287           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2288
2289       self.lu.LogInfo("Renaming the old LVs on the target node")
2290       result = self.rpc.call_blockdev_rename(self.target_node,
2291                                              rename_old_to_new)
2292       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2293
2294       # Now we rename the new LVs to the old LVs
2295       self.lu.LogInfo("Renaming the new LVs on the target node")
2296       rename_new_to_old = [(new, old.physical_id)
2297                            for old, new in zip(old_lvs, new_lvs)]
2298       result = self.rpc.call_blockdev_rename(self.target_node,
2299                                              rename_new_to_old)
2300       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2301
2302       # Intermediate steps of in memory modifications
2303       for old, new in zip(old_lvs, new_lvs):
2304         new.logical_id = old.logical_id
2305         self.cfg.SetDiskID(new, self.target_node)
2306
2307       # We need to modify old_lvs so that removal later removes the
2308       # right LVs, not the newly added ones; note that old_lvs is a
2309       # copy here
2310       for disk in old_lvs:
2311         disk.logical_id = ren_fn(disk, temp_suffix)
2312         self.cfg.SetDiskID(disk, self.target_node)
2313
2314       # Now that the new lvs have the old name, we can add them to the device
2315       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2316       result = self.rpc.call_blockdev_addchildren(self.target_node,
2317                                                   (dev, self.instance), new_lvs)
2318       msg = result.fail_msg
2319       if msg:
2320         for new_lv in new_lvs:
2321           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2322                                                new_lv).fail_msg
2323           if msg2:
2324             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2325                                hint=("cleanup manually the unused logical"
2326                                      "volumes"))
2327         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2328
2329     cstep = itertools.count(5)
2330
2331     if self.early_release:
2332       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2333       self._RemoveOldStorage(self.target_node, iv_names)
2334       # TODO: Check if releasing locks early still makes sense
2335       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2336     else:
2337       # Release all resource locks except those used by the instance
2338       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2339                    keep=self.node_secondary_ip.keys())
2340
2341     # Release all node locks while waiting for sync
2342     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2343
2344     # TODO: Can the instance lock be downgraded here? Take the optional disk
2345     # shutdown in the caller into consideration.
2346
2347     # Wait for sync
2348     # This can fail as the old devices are degraded and _WaitForSync
2349     # does a combined result over all disks, so we don't check its return value
2350     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2351     WaitForSync(self.lu, self.instance)
2352
2353     # Check all devices manually
2354     self._CheckDevices(self.instance.primary_node, iv_names)
2355
2356     # Step: remove old storage
2357     if not self.early_release:
2358       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2359       self._RemoveOldStorage(self.target_node, iv_names)
2360
2361   def _ExecDrbd8Secondary(self, feedback_fn):
2362     """Replace the secondary node for DRBD 8.
2363
2364     The algorithm for replace is quite complicated:
2365       - for all disks of the instance:
2366         - create new LVs on the new node with same names
2367         - shutdown the drbd device on the old secondary
2368         - disconnect the drbd network on the primary
2369         - create the drbd device on the new secondary
2370         - network attach the drbd on the primary, using an artifice:
2371           the drbd code for Attach() will connect to the network if it
2372           finds a device which is connected to the good local disks but
2373           not network enabled
2374       - wait for sync across all devices
2375       - remove all disks from the old secondary
2376
2377     Failures are not very well handled.
2378
2379     """
2380     steps_total = 6
2381
2382     pnode = self.instance.primary_node
2383
2384     # Step: check device activation
2385     self.lu.LogStep(1, steps_total, "Check device existence")
2386     self._CheckDisksExistence([self.instance.primary_node])
2387     self._CheckVolumeGroup([self.instance.primary_node])
2388
2389     # Step: check other node consistency
2390     self.lu.LogStep(2, steps_total, "Check peer consistency")
2391     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2392
2393     # Step: create new storage
2394     self.lu.LogStep(3, steps_total, "Allocate new storage")
2395     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2396     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2397     for idx, dev in enumerate(disks):
2398       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2399                       (self.new_node, idx))
2400       # we pass force_create=True to force LVM creation
2401       for new_lv in dev.children:
2402         try:
2403           _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2404                                True, GetInstanceInfoText(self.instance), False,
2405                                excl_stor)
2406         except errors.DeviceCreationError, e:
2407           raise errors.OpExecError("Can't create block device: %s" % e.message)
2408
2409     # Step 4: dbrd minors and drbd setups changes
2410     # after this, we must manually remove the drbd minors on both the
2411     # error and the success paths
2412     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2413     minors = self.cfg.AllocateDRBDMinor([self.new_node
2414                                          for dev in self.instance.disks],
2415                                         self.instance.name)
2416     logging.debug("Allocated minors %r", minors)
2417
2418     iv_names = {}
2419     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2420       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2421                       (self.new_node, idx))
2422       # create new devices on new_node; note that we create two IDs:
2423       # one without port, so the drbd will be activated without
2424       # networking information on the new node at this stage, and one
2425       # with network, for the latter activation in step 4
2426       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2427       if self.instance.primary_node == o_node1:
2428         p_minor = o_minor1
2429       else:
2430         assert self.instance.primary_node == o_node2, "Three-node instance?"
2431         p_minor = o_minor2
2432
2433       new_alone_id = (self.instance.primary_node, self.new_node, None,
2434                       p_minor, new_minor, o_secret)
2435       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2436                     p_minor, new_minor, o_secret)
2437
2438       iv_names[idx] = (dev, dev.children, new_net_id)
2439       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2440                     new_net_id)
2441       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2442                               logical_id=new_alone_id,
2443                               children=dev.children,
2444                               size=dev.size,
2445                               params={})
2446       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2447                                             self.cfg)
2448       try:
2449         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2450                              anno_new_drbd,
2451                              GetInstanceInfoText(self.instance), False,
2452                              excl_stor)
2453       except errors.GenericError:
2454         self.cfg.ReleaseDRBDMinors(self.instance.name)
2455         raise
2456
2457     # We have new devices, shutdown the drbd on the old secondary
2458     for idx, dev in enumerate(self.instance.disks):
2459       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2460       self.cfg.SetDiskID(dev, self.target_node)
2461       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2462                                             (dev, self.instance)).fail_msg
2463       if msg:
2464         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2465                            "node: %s" % (idx, msg),
2466                            hint=("Please cleanup this device manually as"
2467                                  " soon as possible"))
2468
2469     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2470     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2471                                                self.instance.disks)[pnode]
2472
2473     msg = result.fail_msg
2474     if msg:
2475       # detaches didn't succeed (unlikely)
2476       self.cfg.ReleaseDRBDMinors(self.instance.name)
2477       raise errors.OpExecError("Can't detach the disks from the network on"
2478                                " old node: %s" % (msg,))
2479
2480     # if we managed to detach at least one, we update all the disks of
2481     # the instance to point to the new secondary
2482     self.lu.LogInfo("Updating instance configuration")
2483     for dev, _, new_logical_id in iv_names.itervalues():
2484       dev.logical_id = new_logical_id
2485       self.cfg.SetDiskID(dev, self.instance.primary_node)
2486
2487     self.cfg.Update(self.instance, feedback_fn)
2488
2489     # Release all node locks (the configuration has been updated)
2490     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2491
2492     # and now perform the drbd attach
2493     self.lu.LogInfo("Attaching primary drbds to new secondary"
2494                     " (standalone => connected)")
2495     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2496                                             self.new_node],
2497                                            self.node_secondary_ip,
2498                                            (self.instance.disks, self.instance),
2499                                            self.instance.name,
2500                                            False)
2501     for to_node, to_result in result.items():
2502       msg = to_result.fail_msg
2503       if msg:
2504         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2505                            to_node, msg,
2506                            hint=("please do a gnt-instance info to see the"
2507                                  " status of disks"))
2508
2509     cstep = itertools.count(5)
2510
2511     if self.early_release:
2512       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2513       self._RemoveOldStorage(self.target_node, iv_names)
2514       # TODO: Check if releasing locks early still makes sense
2515       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2516     else:
2517       # Release all resource locks except those used by the instance
2518       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2519                    keep=self.node_secondary_ip.keys())
2520
2521     # TODO: Can the instance lock be downgraded here? Take the optional disk
2522     # shutdown in the caller into consideration.
2523
2524     # Wait for sync
2525     # This can fail as the old devices are degraded and _WaitForSync
2526     # does a combined result over all disks, so we don't check its return value
2527     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2528     WaitForSync(self.lu, self.instance)
2529
2530     # Check all devices manually
2531     self._CheckDevices(self.instance.primary_node, iv_names)
2532
2533     # Step: remove old storage
2534     if not self.early_release:
2535       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2536       self._RemoveOldStorage(self.target_node, iv_names)