Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 763ad5be

History | View | Annotate | Download (91 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \
42
  _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \
43
  _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \
45
  _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \
46
  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                          excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                          excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return _IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
196
  """Create all disks for an instance.
197

198
  This abstracts away some work from AddInstance.
199

200
  @type lu: L{LogicalUnit}
201
  @param lu: the logical unit on whose behalf we execute
202
  @type instance: L{objects.Instance}
203
  @param instance: the instance whose disks we should create
204
  @type to_skip: list
205
  @param to_skip: list of indices to skip
206
  @type target_node: string
207
  @param target_node: if passed, overrides the target node for creation
208
  @rtype: boolean
209
  @return: the success of the creation
210

211
  """
212
  info = _GetInstanceInfoText(instance)
213
  if target_node is None:
214
    pnode = instance.primary_node
215
    all_nodes = instance.all_nodes
216
  else:
217
    pnode = target_node
218
    all_nodes = [pnode]
219

    
220
  if instance.disk_template in constants.DTS_FILEBASED:
221
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
222
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
223

    
224
    result.Raise("Failed to create directory '%s' on"
225
                 " node %s" % (file_storage_dir, pnode))
226

    
227
  disks_created = []
228
  # Note: this needs to be kept in sync with adding of disks in
229
  # LUInstanceSetParams
230
  for idx, device in enumerate(instance.disks):
231
    if to_skip and idx in to_skip:
232
      continue
233
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
234
    #HARDCODE
235
    for node in all_nodes:
236
      f_create = node == pnode
237
      try:
238
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
239
        disks_created.append((node, device))
240
      except errors.OpExecError:
241
        logging.warning("Creating disk %s for instance '%s' failed",
242
                        idx, instance.name)
243
      except errors.DeviceCreationError, e:
244
        logging.warning("Creating disk %s for instance '%s' failed",
245
                        idx, instance.name)
246
        disks_created.extend(e.created_devices)
247
        for (node, disk) in disks_created:
248
          lu.cfg.SetDiskID(disk, node)
249
          result = lu.rpc.call_blockdev_remove(node, disk)
250
          if result.fail_msg:
251
            logging.warning("Failed to remove newly-created disk %s on node %s:"
252
                            " %s", device, node, result.fail_msg)
253
        raise errors.OpExecError(e.message)
254

    
255

    
256
def _ComputeDiskSizePerVG(disk_template, disks):
257
  """Compute disk size requirements in the volume group
258

259
  """
260
  def _compute(disks, payload):
261
    """Universal algorithm.
262

263
    """
264
    vgs = {}
265
    for disk in disks:
266
      vgs[disk[constants.IDISK_VG]] = \
267
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
268

    
269
    return vgs
270

    
271
  # Required free disk space as a function of disk and swap space
272
  req_size_dict = {
273
    constants.DT_DISKLESS: {},
274
    constants.DT_PLAIN: _compute(disks, 0),
275
    # 128 MB are added for drbd metadata for each disk
276
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
277
    constants.DT_FILE: {},
278
    constants.DT_SHARED_FILE: {},
279
    }
280

    
281
  if disk_template not in req_size_dict:
282
    raise errors.ProgrammerError("Disk template '%s' size requirement"
283
                                 " is unknown" % disk_template)
284

    
285
  return req_size_dict[disk_template]
286

    
287

    
288
def _ComputeDisks(op, default_vg):
289
  """Computes the instance disks.
290

291
  @param op: The instance opcode
292
  @param default_vg: The default_vg to assume
293

294
  @return: The computed disks
295

296
  """
297
  disks = []
298
  for disk in op.disks:
299
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
300
    if mode not in constants.DISK_ACCESS_SET:
301
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
302
                                 mode, errors.ECODE_INVAL)
303
    size = disk.get(constants.IDISK_SIZE, None)
304
    if size is None:
305
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
306
    try:
307
      size = int(size)
308
    except (TypeError, ValueError):
309
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
310
                                 errors.ECODE_INVAL)
311

    
312
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
313
    if ext_provider and op.disk_template != constants.DT_EXT:
314
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
315
                                 " disk template, not %s" %
316
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
317
                                  op.disk_template), errors.ECODE_INVAL)
318

    
319
    data_vg = disk.get(constants.IDISK_VG, default_vg)
320
    name = disk.get(constants.IDISK_NAME, None)
321
    if name is not None and name.lower() == constants.VALUE_NONE:
322
      name = None
323
    new_disk = {
324
      constants.IDISK_SIZE: size,
325
      constants.IDISK_MODE: mode,
326
      constants.IDISK_VG: data_vg,
327
      constants.IDISK_NAME: name,
328
      }
329

    
330
    if constants.IDISK_METAVG in disk:
331
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
332
    if constants.IDISK_ADOPT in disk:
333
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
334

    
335
    # For extstorage, demand the `provider' option and add any
336
    # additional parameters (ext-params) to the dict
337
    if op.disk_template == constants.DT_EXT:
338
      if ext_provider:
339
        new_disk[constants.IDISK_PROVIDER] = ext_provider
340
        for key in disk:
341
          if key not in constants.IDISK_PARAMS:
342
            new_disk[key] = disk[key]
343
      else:
344
        raise errors.OpPrereqError("Missing provider for template '%s'" %
345
                                   constants.DT_EXT, errors.ECODE_INVAL)
346

    
347
    disks.append(new_disk)
348

    
349
  return disks
350

    
351

    
352
def _CheckRADOSFreeSpace():
353
  """Compute disk size requirements inside the RADOS cluster.
354

355
  """
356
  # For the RADOS cluster we assume there is always enough space.
357
  pass
358

    
359

    
360
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
361
                         iv_name, p_minor, s_minor):
362
  """Generate a drbd8 device complete with its children.
363

364
  """
365
  assert len(vgnames) == len(names) == 2
366
  port = lu.cfg.AllocatePort()
367
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
368

    
369
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
370
                          logical_id=(vgnames[0], names[0]),
371
                          params={})
372
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
373
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
374
                          size=constants.DRBD_META_SIZE,
375
                          logical_id=(vgnames[1], names[1]),
376
                          params={})
377
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
378
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
379
                          logical_id=(primary, secondary, port,
380
                                      p_minor, s_minor,
381
                                      shared_secret),
382
                          children=[dev_data, dev_meta],
383
                          iv_name=iv_name, params={})
384
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
385
  return drbd_dev
386

    
387

    
388
def _GenerateDiskTemplate(
389
  lu, template_name, instance_name, primary_node, secondary_nodes,
390
  disk_info, file_storage_dir, file_driver, base_index,
391
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
392
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
393
  """Generate the entire disk layout for a given template type.
394

395
  """
396
  vgname = lu.cfg.GetVGName()
397
  disk_count = len(disk_info)
398
  disks = []
399

    
400
  if template_name == constants.DT_DISKLESS:
401
    pass
402
  elif template_name == constants.DT_DRBD8:
403
    if len(secondary_nodes) != 1:
404
      raise errors.ProgrammerError("Wrong template configuration")
405
    remote_node = secondary_nodes[0]
406
    minors = lu.cfg.AllocateDRBDMinor(
407
      [primary_node, remote_node] * len(disk_info), instance_name)
408

    
409
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
410
                                                       full_disk_params)
411
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
412

    
413
    names = []
414
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
415
                                               for i in range(disk_count)]):
416
      names.append(lv_prefix + "_data")
417
      names.append(lv_prefix + "_meta")
418
    for idx, disk in enumerate(disk_info):
419
      disk_index = idx + base_index
420
      data_vg = disk.get(constants.IDISK_VG, vgname)
421
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
422
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
423
                                      disk[constants.IDISK_SIZE],
424
                                      [data_vg, meta_vg],
425
                                      names[idx * 2:idx * 2 + 2],
426
                                      "disk/%d" % disk_index,
427
                                      minors[idx * 2], minors[idx * 2 + 1])
428
      disk_dev.mode = disk[constants.IDISK_MODE]
429
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
430
      disks.append(disk_dev)
431
  else:
432
    if secondary_nodes:
433
      raise errors.ProgrammerError("Wrong template configuration")
434

    
435
    if template_name == constants.DT_FILE:
436
      _req_file_storage()
437
    elif template_name == constants.DT_SHARED_FILE:
438
      _req_shr_file_storage()
439

    
440
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
441
    if name_prefix is None:
442
      names = None
443
    else:
444
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
445
                                        (name_prefix, base_index + i)
446
                                        for i in range(disk_count)])
447

    
448
    if template_name == constants.DT_PLAIN:
449

    
450
      def logical_id_fn(idx, _, disk):
451
        vg = disk.get(constants.IDISK_VG, vgname)
452
        return (vg, names[idx])
453

    
454
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
455
      logical_id_fn = \
456
        lambda _, disk_index, disk: (file_driver,
457
                                     "%s/disk%d" % (file_storage_dir,
458
                                                    disk_index))
459
    elif template_name == constants.DT_BLOCK:
460
      logical_id_fn = \
461
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
462
                                       disk[constants.IDISK_ADOPT])
463
    elif template_name == constants.DT_RBD:
464
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
465
    elif template_name == constants.DT_EXT:
466
      def logical_id_fn(idx, _, disk):
467
        provider = disk.get(constants.IDISK_PROVIDER, None)
468
        if provider is None:
469
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
470
                                       " not found", constants.DT_EXT,
471
                                       constants.IDISK_PROVIDER)
472
        return (provider, names[idx])
473
    else:
474
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
475

    
476
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
477

    
478
    for idx, disk in enumerate(disk_info):
479
      params = {}
480
      # Only for the Ext template add disk_info to params
481
      if template_name == constants.DT_EXT:
482
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
483
        for key in disk:
484
          if key not in constants.IDISK_PARAMS:
485
            params[key] = disk[key]
486
      disk_index = idx + base_index
487
      size = disk[constants.IDISK_SIZE]
488
      feedback_fn("* disk %s, size %s" %
489
                  (disk_index, utils.FormatUnit(size, "h")))
490
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
491
                              logical_id=logical_id_fn(idx, disk_index, disk),
492
                              iv_name="disk/%d" % disk_index,
493
                              mode=disk[constants.IDISK_MODE],
494
                              params=params)
495
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
496
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
497
      disks.append(disk_dev)
498

    
499
  return disks
500

    
501

    
502
class LUInstanceRecreateDisks(LogicalUnit):
503
  """Recreate an instance's missing disks.
504

505
  """
506
  HPATH = "instance-recreate-disks"
507
  HTYPE = constants.HTYPE_INSTANCE
508
  REQ_BGL = False
509

    
510
  _MODIFYABLE = compat.UniqueFrozenset([
511
    constants.IDISK_SIZE,
512
    constants.IDISK_MODE,
513
    ])
514

    
515
  # New or changed disk parameters may have different semantics
516
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
517
    constants.IDISK_ADOPT,
518

    
519
    # TODO: Implement support changing VG while recreating
520
    constants.IDISK_VG,
521
    constants.IDISK_METAVG,
522
    constants.IDISK_PROVIDER,
523
    constants.IDISK_NAME,
524
    ]))
525

    
526
  def _RunAllocator(self):
527
    """Run the allocator based on input opcode.
528

529
    """
530
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
531

    
532
    # FIXME
533
    # The allocator should actually run in "relocate" mode, but current
534
    # allocators don't support relocating all the nodes of an instance at
535
    # the same time. As a workaround we use "allocate" mode, but this is
536
    # suboptimal for two reasons:
537
    # - The instance name passed to the allocator is present in the list of
538
    #   existing instances, so there could be a conflict within the
539
    #   internal structures of the allocator. This doesn't happen with the
540
    #   current allocators, but it's a liability.
541
    # - The allocator counts the resources used by the instance twice: once
542
    #   because the instance exists already, and once because it tries to
543
    #   allocate a new instance.
544
    # The allocator could choose some of the nodes on which the instance is
545
    # running, but that's not a problem. If the instance nodes are broken,
546
    # they should be already be marked as drained or offline, and hence
547
    # skipped by the allocator. If instance disks have been lost for other
548
    # reasons, then recreating the disks on the same nodes should be fine.
549
    disk_template = self.instance.disk_template
550
    spindle_use = be_full[constants.BE_SPINDLE_USE]
551
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
552
                                        disk_template=disk_template,
553
                                        tags=list(self.instance.GetTags()),
554
                                        os=self.instance.os,
555
                                        nics=[{}],
556
                                        vcpus=be_full[constants.BE_VCPUS],
557
                                        memory=be_full[constants.BE_MAXMEM],
558
                                        spindle_use=spindle_use,
559
                                        disks=[{constants.IDISK_SIZE: d.size,
560
                                                constants.IDISK_MODE: d.mode}
561
                                               for d in self.instance.disks],
562
                                        hypervisor=self.instance.hypervisor,
563
                                        node_whitelist=None)
564
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
565

    
566
    ial.Run(self.op.iallocator)
567

    
568
    assert req.RequiredNodes() == len(self.instance.all_nodes)
569

    
570
    if not ial.success:
571
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
572
                                 " %s" % (self.op.iallocator, ial.info),
573
                                 errors.ECODE_NORES)
574

    
575
    self.op.nodes = ial.result
576
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
577
                 self.op.instance_name, self.op.iallocator,
578
                 utils.CommaJoin(ial.result))
579

    
580
  def CheckArguments(self):
581
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
582
      # Normalize and convert deprecated list of disk indices
583
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
584

    
585
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
586
    if duplicates:
587
      raise errors.OpPrereqError("Some disks have been specified more than"
588
                                 " once: %s" % utils.CommaJoin(duplicates),
589
                                 errors.ECODE_INVAL)
590

    
591
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
592
    # when neither iallocator nor nodes are specified
593
    if self.op.iallocator or self.op.nodes:
594
      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
595

    
596
    for (idx, params) in self.op.disks:
597
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
598
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
599
      if unsupported:
600
        raise errors.OpPrereqError("Parameters for disk %s try to change"
601
                                   " unmodifyable parameter(s): %s" %
602
                                   (idx, utils.CommaJoin(unsupported)),
603
                                   errors.ECODE_INVAL)
604

    
605
  def ExpandNames(self):
606
    self._ExpandAndLockInstance()
607
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
608

    
609
    if self.op.nodes:
610
      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
611
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
612
    else:
613
      self.needed_locks[locking.LEVEL_NODE] = []
614
      if self.op.iallocator:
615
        # iallocator will select a new node in the same group
616
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
617
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
618

    
619
    self.needed_locks[locking.LEVEL_NODE_RES] = []
620

    
621
  def DeclareLocks(self, level):
622
    if level == locking.LEVEL_NODEGROUP:
623
      assert self.op.iallocator is not None
624
      assert not self.op.nodes
625
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
626
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
627
      # Lock the primary group used by the instance optimistically; this
628
      # requires going via the node before it's locked, requiring
629
      # verification later on
630
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
631
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
632

    
633
    elif level == locking.LEVEL_NODE:
634
      # If an allocator is used, then we lock all the nodes in the current
635
      # instance group, as we don't know yet which ones will be selected;
636
      # if we replace the nodes without using an allocator, locks are
637
      # already declared in ExpandNames; otherwise, we need to lock all the
638
      # instance nodes for disk re-creation
639
      if self.op.iallocator:
640
        assert not self.op.nodes
641
        assert not self.needed_locks[locking.LEVEL_NODE]
642
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
643

    
644
        # Lock member nodes of the group of the primary node
645
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
646
          self.needed_locks[locking.LEVEL_NODE].extend(
647
            self.cfg.GetNodeGroup(group_uuid).members)
648

    
649
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
650
      elif not self.op.nodes:
651
        self._LockInstancesNodes(primary_only=False)
652
    elif level == locking.LEVEL_NODE_RES:
653
      # Copy node locks
654
      self.needed_locks[locking.LEVEL_NODE_RES] = \
655
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
656

    
657
  def BuildHooksEnv(self):
658
    """Build hooks env.
659

660
    This runs on master, primary and secondary nodes of the instance.
661

662
    """
663
    return _BuildInstanceHookEnvByObject(self, self.instance)
664

    
665
  def BuildHooksNodes(self):
666
    """Build hooks nodes.
667

668
    """
669
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
670
    return (nl, nl)
671

    
672
  def CheckPrereq(self):
673
    """Check prerequisites.
674

675
    This checks that the instance is in the cluster and is not running.
676

677
    """
678
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
679
    assert instance is not None, \
680
      "Cannot retrieve locked instance %s" % self.op.instance_name
681
    if self.op.nodes:
682
      if len(self.op.nodes) != len(instance.all_nodes):
683
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
684
                                   " %d replacement nodes were specified" %
685
                                   (instance.name, len(instance.all_nodes),
686
                                    len(self.op.nodes)),
687
                                   errors.ECODE_INVAL)
688
      assert instance.disk_template != constants.DT_DRBD8 or \
689
             len(self.op.nodes) == 2
690
      assert instance.disk_template != constants.DT_PLAIN or \
691
             len(self.op.nodes) == 1
692
      primary_node = self.op.nodes[0]
693
    else:
694
      primary_node = instance.primary_node
695
    if not self.op.iallocator:
696
      _CheckNodeOnline(self, primary_node)
697

    
698
    if instance.disk_template == constants.DT_DISKLESS:
699
      raise errors.OpPrereqError("Instance '%s' has no disks" %
700
                                 self.op.instance_name, errors.ECODE_INVAL)
701

    
702
    # Verify if node group locks are still correct
703
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
704
    if owned_groups:
705
      # Node group locks are acquired only for the primary node (and only
706
      # when the allocator is used)
707
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
708
                               primary_only=True)
709

    
710
    # if we replace nodes *and* the old primary is offline, we don't
711
    # check the instance state
712
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
713
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
714
      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
715
                          msg="cannot recreate disks")
716

    
717
    if self.op.disks:
718
      self.disks = dict(self.op.disks)
719
    else:
720
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
721

    
722
    maxidx = max(self.disks.keys())
723
    if maxidx >= len(instance.disks):
724
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
725
                                 errors.ECODE_INVAL)
726

    
727
    if ((self.op.nodes or self.op.iallocator) and
728
         sorted(self.disks.keys()) != range(len(instance.disks))):
729
      raise errors.OpPrereqError("Can't recreate disks partially and"
730
                                 " change the nodes at the same time",
731
                                 errors.ECODE_INVAL)
732

    
733
    self.instance = instance
734

    
735
    if self.op.iallocator:
736
      self._RunAllocator()
737
      # Release unneeded node and node resource locks
738
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
739
      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
740
      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
741

    
742
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
743

    
744
  def Exec(self, feedback_fn):
745
    """Recreate the disks.
746

747
    """
748
    instance = self.instance
749

    
750
    assert (self.owned_locks(locking.LEVEL_NODE) ==
751
            self.owned_locks(locking.LEVEL_NODE_RES))
752

    
753
    to_skip = []
754
    mods = [] # keeps track of needed changes
755

    
756
    for idx, disk in enumerate(instance.disks):
757
      try:
758
        changes = self.disks[idx]
759
      except KeyError:
760
        # Disk should not be recreated
761
        to_skip.append(idx)
762
        continue
763

    
764
      # update secondaries for disks, if needed
765
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
766
        # need to update the nodes and minors
767
        assert len(self.op.nodes) == 2
768
        assert len(disk.logical_id) == 6 # otherwise disk internals
769
                                         # have changed
770
        (_, _, old_port, _, _, old_secret) = disk.logical_id
771
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
772
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
773
                  new_minors[0], new_minors[1], old_secret)
774
        assert len(disk.logical_id) == len(new_id)
775
      else:
776
        new_id = None
777

    
778
      mods.append((idx, new_id, changes))
779

    
780
    # now that we have passed all asserts above, we can apply the mods
781
    # in a single run (to avoid partial changes)
782
    for idx, new_id, changes in mods:
783
      disk = instance.disks[idx]
784
      if new_id is not None:
785
        assert disk.dev_type == constants.LD_DRBD8
786
        disk.logical_id = new_id
787
      if changes:
788
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
789
                    mode=changes.get(constants.IDISK_MODE, None))
790

    
791
    # change primary node, if needed
792
    if self.op.nodes:
793
      instance.primary_node = self.op.nodes[0]
794
      self.LogWarning("Changing the instance's nodes, you will have to"
795
                      " remove any disks left on the older nodes manually")
796

    
797
    if self.op.nodes:
798
      self.cfg.Update(instance, feedback_fn)
799

    
800
    # All touched nodes must be locked
801
    mylocks = self.owned_locks(locking.LEVEL_NODE)
802
    assert mylocks.issuperset(frozenset(instance.all_nodes))
803
    _CreateDisks(self, instance, to_skip=to_skip)
804

    
805

    
806
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
807
  """Checks if nodes have enough free disk space in the specified VG.
808

809
  This function checks if all given nodes have the needed amount of
810
  free disk. In case any node has less disk or we cannot get the
811
  information from the node, this function raises an OpPrereqError
812
  exception.
813

814
  @type lu: C{LogicalUnit}
815
  @param lu: a logical unit from which we get configuration data
816
  @type nodenames: C{list}
817
  @param nodenames: the list of node names to check
818
  @type vg: C{str}
819
  @param vg: the volume group to check
820
  @type requested: C{int}
821
  @param requested: the amount of disk in MiB to check for
822
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
823
      or we cannot check the node
824

825
  """
826
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
827
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
828
  for node in nodenames:
829
    info = nodeinfo[node]
830
    info.Raise("Cannot get current information from node %s" % node,
831
               prereq=True, ecode=errors.ECODE_ENVIRON)
832
    (_, (vg_info, ), _) = info.payload
833
    vg_free = vg_info.get("vg_free", None)
834
    if not isinstance(vg_free, int):
835
      raise errors.OpPrereqError("Can't compute free disk space on node"
836
                                 " %s for vg %s, result was '%s'" %
837
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
838
    if requested > vg_free:
839
      raise errors.OpPrereqError("Not enough disk space on target node %s"
840
                                 " vg %s: required %d MiB, available %d MiB" %
841
                                 (node, vg, requested, vg_free),
842
                                 errors.ECODE_NORES)
843

    
844

    
845
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
846
  """Checks if nodes have enough free disk space in all the VGs.
847

848
  This function checks if all given nodes have the needed amount of
849
  free disk. In case any node has less disk or we cannot get the
850
  information from the node, this function raises an OpPrereqError
851
  exception.
852

853
  @type lu: C{LogicalUnit}
854
  @param lu: a logical unit from which we get configuration data
855
  @type nodenames: C{list}
856
  @param nodenames: the list of node names to check
857
  @type req_sizes: C{dict}
858
  @param req_sizes: the hash of vg and corresponding amount of disk in
859
      MiB to check for
860
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
861
      or we cannot check the node
862

863
  """
864
  for vg, req_size in req_sizes.items():
865
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
866

    
867

    
868
def _DiskSizeInBytesToMebibytes(lu, size):
869
  """Converts a disk size in bytes to mebibytes.
870

871
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
872

873
  """
874
  (mib, remainder) = divmod(size, 1024 * 1024)
875

    
876
  if remainder != 0:
877
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
878
                  " to not overwrite existing data (%s bytes will not be"
879
                  " wiped)", (1024 * 1024) - remainder)
880
    mib += 1
881

    
882
  return mib
883

    
884

    
885
def _CalcEta(time_taken, written, total_size):
886
  """Calculates the ETA based on size written and total size.
887

888
  @param time_taken: The time taken so far
889
  @param written: amount written so far
890
  @param total_size: The total size of data to be written
891
  @return: The remaining time in seconds
892

893
  """
894
  avg_time = time_taken / float(written)
895
  return (total_size - written) * avg_time
896

    
897

    
898
def _WipeDisks(lu, instance, disks=None):
899
  """Wipes instance disks.
900

901
  @type lu: L{LogicalUnit}
902
  @param lu: the logical unit on whose behalf we execute
903
  @type instance: L{objects.Instance}
904
  @param instance: the instance whose disks we should create
905
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
906
  @param disks: Disk details; tuple contains disk index, disk object and the
907
    start offset
908

909
  """
910
  node = instance.primary_node
911

    
912
  if disks is None:
913
    disks = [(idx, disk, 0)
914
             for (idx, disk) in enumerate(instance.disks)]
915

    
916
  for (_, device, _) in disks:
917
    lu.cfg.SetDiskID(device, node)
918

    
919
  logging.info("Pausing synchronization of disks of instance '%s'",
920
               instance.name)
921
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
922
                                                  (map(compat.snd, disks),
923
                                                   instance),
924
                                                  True)
925
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
926

    
927
  for idx, success in enumerate(result.payload):
928
    if not success:
929
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
930
                   " failed", idx, instance.name)
931

    
932
  try:
933
    for (idx, device, offset) in disks:
934
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
935
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
936
      wipe_chunk_size = \
937
        int(min(constants.MAX_WIPE_CHUNK,
938
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
939

    
940
      size = device.size
941
      last_output = 0
942
      start_time = time.time()
943

    
944
      if offset == 0:
945
        info_text = ""
946
      else:
947
        info_text = (" (from %s to %s)" %
948
                     (utils.FormatUnit(offset, "h"),
949
                      utils.FormatUnit(size, "h")))
950

    
951
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
952

    
953
      logging.info("Wiping disk %d for instance %s on node %s using"
954
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
955

    
956
      while offset < size:
957
        wipe_size = min(wipe_chunk_size, size - offset)
958

    
959
        logging.debug("Wiping disk %d, offset %s, chunk %s",
960
                      idx, offset, wipe_size)
961

    
962
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
963
                                           wipe_size)
964
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
965
                     (idx, offset, wipe_size))
966

    
967
        now = time.time()
968
        offset += wipe_size
969
        if now - last_output >= 60:
970
          eta = _CalcEta(now - start_time, offset, size)
971
          lu.LogInfo(" - done: %.1f%% ETA: %s",
972
                     offset / float(size) * 100, utils.FormatSeconds(eta))
973
          last_output = now
974
  finally:
975
    logging.info("Resuming synchronization of disks for instance '%s'",
976
                 instance.name)
977

    
978
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
979
                                                    (map(compat.snd, disks),
980
                                                     instance),
981
                                                    False)
982

    
983
    if result.fail_msg:
984
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
985
                    node, result.fail_msg)
986
    else:
987
      for idx, success in enumerate(result.payload):
988
        if not success:
989
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
990
                        " failed", idx, instance.name)
991

    
992

    
993
def _ExpandCheckDisks(instance, disks):
994
  """Return the instance disks selected by the disks list
995

996
  @type disks: list of L{objects.Disk} or None
997
  @param disks: selected disks
998
  @rtype: list of L{objects.Disk}
999
  @return: selected instance disks to act on
1000

1001
  """
1002
  if disks is None:
1003
    return instance.disks
1004
  else:
1005
    if not set(disks).issubset(instance.disks):
1006
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1007
                                   " target instance")
1008
    return disks
1009

    
1010

    
1011
def _WaitForSync(lu, instance, disks=None, oneshot=False):
1012
  """Sleep and poll for an instance's disk to sync.
1013

1014
  """
1015
  if not instance.disks or disks is not None and not disks:
1016
    return True
1017

    
1018
  disks = _ExpandCheckDisks(instance, disks)
1019

    
1020
  if not oneshot:
1021
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1022

    
1023
  node = instance.primary_node
1024

    
1025
  for dev in disks:
1026
    lu.cfg.SetDiskID(dev, node)
1027

    
1028
  # TODO: Convert to utils.Retry
1029

    
1030
  retries = 0
1031
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1032
  while True:
1033
    max_time = 0
1034
    done = True
1035
    cumul_degraded = False
1036
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1037
    msg = rstats.fail_msg
1038
    if msg:
1039
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1040
      retries += 1
1041
      if retries >= 10:
1042
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1043
                                 " aborting." % node)
1044
      time.sleep(6)
1045
      continue
1046
    rstats = rstats.payload
1047
    retries = 0
1048
    for i, mstat in enumerate(rstats):
1049
      if mstat is None:
1050
        lu.LogWarning("Can't compute data for node %s/%s",
1051
                      node, disks[i].iv_name)
1052
        continue
1053

    
1054
      cumul_degraded = (cumul_degraded or
1055
                        (mstat.is_degraded and mstat.sync_percent is None))
1056
      if mstat.sync_percent is not None:
1057
        done = False
1058
        if mstat.estimated_time is not None:
1059
          rem_time = ("%s remaining (estimated)" %
1060
                      utils.FormatSeconds(mstat.estimated_time))
1061
          max_time = mstat.estimated_time
1062
        else:
1063
          rem_time = "no time estimate"
1064
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1065
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1066

    
1067
    # if we're done but degraded, let's do a few small retries, to
1068
    # make sure we see a stable and not transient situation; therefore
1069
    # we force restart of the loop
1070
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1071
      logging.info("Degraded disks found, %d retries left", degr_retries)
1072
      degr_retries -= 1
1073
      time.sleep(1)
1074
      continue
1075

    
1076
    if done or oneshot:
1077
      break
1078

    
1079
    time.sleep(min(60, max_time))
1080

    
1081
  if done:
1082
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1083

    
1084
  return not cumul_degraded
1085

    
1086

    
1087
def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1088
  """Shutdown block devices of an instance.
1089

1090
  This does the shutdown on all nodes of the instance.
1091

1092
  If the ignore_primary is false, errors on the primary node are
1093
  ignored.
1094

1095
  """
1096
  all_result = True
1097
  disks = _ExpandCheckDisks(instance, disks)
1098

    
1099
  for disk in disks:
1100
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1101
      lu.cfg.SetDiskID(top_disk, node)
1102
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1103
      msg = result.fail_msg
1104
      if msg:
1105
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1106
                      disk.iv_name, node, msg)
1107
        if ((node == instance.primary_node and not ignore_primary) or
1108
            (node != instance.primary_node and not result.offline)):
1109
          all_result = False
1110
  return all_result
1111

    
1112

    
1113
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1114
  """Shutdown block devices of an instance.
1115

1116
  This function checks if an instance is running, before calling
1117
  _ShutdownInstanceDisks.
1118

1119
  """
1120
  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1121
  _ShutdownInstanceDisks(lu, instance, disks=disks)
1122

    
1123

    
1124
def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1125
                           ignore_size=False):
1126
  """Prepare the block devices for an instance.
1127

1128
  This sets up the block devices on all nodes.
1129

1130
  @type lu: L{LogicalUnit}
1131
  @param lu: the logical unit on whose behalf we execute
1132
  @type instance: L{objects.Instance}
1133
  @param instance: the instance for whose disks we assemble
1134
  @type disks: list of L{objects.Disk} or None
1135
  @param disks: which disks to assemble (or all, if None)
1136
  @type ignore_secondaries: boolean
1137
  @param ignore_secondaries: if true, errors on secondary nodes
1138
      won't result in an error return from the function
1139
  @type ignore_size: boolean
1140
  @param ignore_size: if true, the current known size of the disk
1141
      will not be used during the disk activation, useful for cases
1142
      when the size is wrong
1143
  @return: False if the operation failed, otherwise a list of
1144
      (host, instance_visible_name, node_visible_name)
1145
      with the mapping from node devices to instance devices
1146

1147
  """
1148
  device_info = []
1149
  disks_ok = True
1150
  iname = instance.name
1151
  disks = _ExpandCheckDisks(instance, disks)
1152

    
1153
  # With the two passes mechanism we try to reduce the window of
1154
  # opportunity for the race condition of switching DRBD to primary
1155
  # before handshaking occured, but we do not eliminate it
1156

    
1157
  # The proper fix would be to wait (with some limits) until the
1158
  # connection has been made and drbd transitions from WFConnection
1159
  # into any other network-connected state (Connected, SyncTarget,
1160
  # SyncSource, etc.)
1161

    
1162
  # 1st pass, assemble on all nodes in secondary mode
1163
  for idx, inst_disk in enumerate(disks):
1164
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1165
      if ignore_size:
1166
        node_disk = node_disk.Copy()
1167
        node_disk.UnsetSize()
1168
      lu.cfg.SetDiskID(node_disk, node)
1169
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1170
                                             False, idx)
1171
      msg = result.fail_msg
1172
      if msg:
1173
        is_offline_secondary = (node in instance.secondary_nodes and
1174
                                result.offline)
1175
        lu.LogWarning("Could not prepare block device %s on node %s"
1176
                      " (is_primary=False, pass=1): %s",
1177
                      inst_disk.iv_name, node, msg)
1178
        if not (ignore_secondaries or is_offline_secondary):
1179
          disks_ok = False
1180

    
1181
  # FIXME: race condition on drbd migration to primary
1182

    
1183
  # 2nd pass, do only the primary node
1184
  for idx, inst_disk in enumerate(disks):
1185
    dev_path = None
1186

    
1187
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1188
      if node != instance.primary_node:
1189
        continue
1190
      if ignore_size:
1191
        node_disk = node_disk.Copy()
1192
        node_disk.UnsetSize()
1193
      lu.cfg.SetDiskID(node_disk, node)
1194
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1195
                                             True, idx)
1196
      msg = result.fail_msg
1197
      if msg:
1198
        lu.LogWarning("Could not prepare block device %s on node %s"
1199
                      " (is_primary=True, pass=2): %s",
1200
                      inst_disk.iv_name, node, msg)
1201
        disks_ok = False
1202
      else:
1203
        dev_path = result.payload
1204

    
1205
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1206

    
1207
  # leave the disks configured for the primary node
1208
  # this is a workaround that would be fixed better by
1209
  # improving the logical/physical id handling
1210
  for disk in disks:
1211
    lu.cfg.SetDiskID(disk, instance.primary_node)
1212

    
1213
  return disks_ok, device_info
1214

    
1215

    
1216
def _StartInstanceDisks(lu, instance, force):
1217
  """Start the disks of an instance.
1218

1219
  """
1220
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
1221
                                       ignore_secondaries=force)
1222
  if not disks_ok:
1223
    _ShutdownInstanceDisks(lu, instance)
1224
    if force is not None and not force:
1225
      lu.LogWarning("",
1226
                    hint=("If the message above refers to a secondary node,"
1227
                          " you can retry the operation using '--force'"))
1228
    raise errors.OpExecError("Disk consistency error")
1229

    
1230

    
1231
class LUInstanceGrowDisk(LogicalUnit):
1232
  """Grow a disk of an instance.
1233

1234
  """
1235
  HPATH = "disk-grow"
1236
  HTYPE = constants.HTYPE_INSTANCE
1237
  REQ_BGL = False
1238

    
1239
  def ExpandNames(self):
1240
    self._ExpandAndLockInstance()
1241
    self.needed_locks[locking.LEVEL_NODE] = []
1242
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1243
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1244
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1245

    
1246
  def DeclareLocks(self, level):
1247
    if level == locking.LEVEL_NODE:
1248
      self._LockInstancesNodes()
1249
    elif level == locking.LEVEL_NODE_RES:
1250
      # Copy node locks
1251
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1252
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1253

    
1254
  def BuildHooksEnv(self):
1255
    """Build hooks env.
1256

1257
    This runs on the master, the primary and all the secondaries.
1258

1259
    """
1260
    env = {
1261
      "DISK": self.op.disk,
1262
      "AMOUNT": self.op.amount,
1263
      "ABSOLUTE": self.op.absolute,
1264
      }
1265
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1266
    return env
1267

    
1268
  def BuildHooksNodes(self):
1269
    """Build hooks nodes.
1270

1271
    """
1272
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1273
    return (nl, nl)
1274

    
1275
  def CheckPrereq(self):
1276
    """Check prerequisites.
1277

1278
    This checks that the instance is in the cluster.
1279

1280
    """
1281
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1282
    assert instance is not None, \
1283
      "Cannot retrieve locked instance %s" % self.op.instance_name
1284
    nodenames = list(instance.all_nodes)
1285
    for node in nodenames:
1286
      _CheckNodeOnline(self, node)
1287

    
1288
    self.instance = instance
1289

    
1290
    if instance.disk_template not in constants.DTS_GROWABLE:
1291
      raise errors.OpPrereqError("Instance's disk layout does not support"
1292
                                 " growing", errors.ECODE_INVAL)
1293

    
1294
    self.disk = instance.FindDisk(self.op.disk)
1295

    
1296
    if self.op.absolute:
1297
      self.target = self.op.amount
1298
      self.delta = self.target - self.disk.size
1299
      if self.delta < 0:
1300
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1301
                                   "current disk size (%s)" %
1302
                                   (utils.FormatUnit(self.target, "h"),
1303
                                    utils.FormatUnit(self.disk.size, "h")),
1304
                                   errors.ECODE_STATE)
1305
    else:
1306
      self.delta = self.op.amount
1307
      self.target = self.disk.size + self.delta
1308
      if self.delta < 0:
1309
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1310
                                   utils.FormatUnit(self.delta, "h"),
1311
                                   errors.ECODE_INVAL)
1312

    
1313
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1314

    
1315
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1316
    template = self.instance.disk_template
1317
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1318
      # TODO: check the free disk space for file, when that feature will be
1319
      # supported
1320
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1321
      es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
1322
                        nodes)
1323
      if es_nodes:
1324
        # With exclusive storage we need to something smarter than just looking
1325
        # at free space; for now, let's simply abort the operation.
1326
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1327
                                   " is enabled", errors.ECODE_STATE)
1328
      _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1329

    
1330
  def Exec(self, feedback_fn):
1331
    """Execute disk grow.
1332

1333
    """
1334
    instance = self.instance
1335
    disk = self.disk
1336

    
1337
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1338
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1339
            self.owned_locks(locking.LEVEL_NODE_RES))
1340

    
1341
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1342

    
1343
    disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
1344
    if not disks_ok:
1345
      raise errors.OpExecError("Cannot activate block device to grow")
1346

    
1347
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1348
                (self.op.disk, instance.name,
1349
                 utils.FormatUnit(self.delta, "h"),
1350
                 utils.FormatUnit(self.target, "h")))
1351

    
1352
    # First run all grow ops in dry-run mode
1353
    for node in instance.all_nodes:
1354
      self.cfg.SetDiskID(disk, node)
1355
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1356
                                           True, True)
1357
      result.Raise("Dry-run grow request failed to node %s" % node)
1358

    
1359
    if wipe_disks:
1360
      # Get disk size from primary node for wiping
1361
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1362
      result.Raise("Failed to retrieve disk size from node '%s'" %
1363
                   instance.primary_node)
1364

    
1365
      (disk_size_in_bytes, ) = result.payload
1366

    
1367
      if disk_size_in_bytes is None:
1368
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1369
                                 " node '%s'" % instance.primary_node)
1370

    
1371
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1372

    
1373
      assert old_disk_size >= disk.size, \
1374
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1375
         (old_disk_size, disk.size))
1376
    else:
1377
      old_disk_size = None
1378

    
1379
    # We know that (as far as we can test) operations across different
1380
    # nodes will succeed, time to run it for real on the backing storage
1381
    for node in instance.all_nodes:
1382
      self.cfg.SetDiskID(disk, node)
1383
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1384
                                           False, True)
1385
      result.Raise("Grow request failed to node %s" % node)
1386

    
1387
    # And now execute it for logical storage, on the primary node
1388
    node = instance.primary_node
1389
    self.cfg.SetDiskID(disk, node)
1390
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1391
                                         False, False)
1392
    result.Raise("Grow request failed to node %s" % node)
1393

    
1394
    disk.RecordGrow(self.delta)
1395
    self.cfg.Update(instance, feedback_fn)
1396

    
1397
    # Changes have been recorded, release node lock
1398
    _ReleaseLocks(self, locking.LEVEL_NODE)
1399

    
1400
    # Downgrade lock while waiting for sync
1401
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1402

    
1403
    assert wipe_disks ^ (old_disk_size is None)
1404

    
1405
    if wipe_disks:
1406
      assert instance.disks[self.op.disk] == disk
1407

    
1408
      # Wipe newly added disk space
1409
      _WipeDisks(self, instance,
1410
                 disks=[(self.op.disk, disk, old_disk_size)])
1411

    
1412
    if self.op.wait_for_sync:
1413
      disk_abort = not _WaitForSync(self, instance, disks=[disk])
1414
      if disk_abort:
1415
        self.LogWarning("Disk syncing has not returned a good status; check"
1416
                        " the instance")
1417
      if instance.admin_state != constants.ADMINST_UP:
1418
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1419
    elif instance.admin_state != constants.ADMINST_UP:
1420
      self.LogWarning("Not shutting down the disk even if the instance is"
1421
                      " not supposed to be running because no wait for"
1422
                      " sync mode was requested")
1423

    
1424
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1425
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1426

    
1427

    
1428
class LUInstanceReplaceDisks(LogicalUnit):
1429
  """Replace the disks of an instance.
1430

1431
  """
1432
  HPATH = "mirrors-replace"
1433
  HTYPE = constants.HTYPE_INSTANCE
1434
  REQ_BGL = False
1435

    
1436
  def CheckArguments(self):
1437
    """Check arguments.
1438

1439
    """
1440
    remote_node = self.op.remote_node
1441
    ialloc = self.op.iallocator
1442
    if self.op.mode == constants.REPLACE_DISK_CHG:
1443
      if remote_node is None and ialloc is None:
1444
        raise errors.OpPrereqError("When changing the secondary either an"
1445
                                   " iallocator script must be used or the"
1446
                                   " new node given", errors.ECODE_INVAL)
1447
      else:
1448
        _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1449

    
1450
    elif remote_node is not None or ialloc is not None:
1451
      # Not replacing the secondary
1452
      raise errors.OpPrereqError("The iallocator and new node options can"
1453
                                 " only be used when changing the"
1454
                                 " secondary node", errors.ECODE_INVAL)
1455

    
1456
  def ExpandNames(self):
1457
    self._ExpandAndLockInstance()
1458

    
1459
    assert locking.LEVEL_NODE not in self.needed_locks
1460
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1461
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1462

    
1463
    assert self.op.iallocator is None or self.op.remote_node is None, \
1464
      "Conflicting options"
1465

    
1466
    if self.op.remote_node is not None:
1467
      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
1468

    
1469
      # Warning: do not remove the locking of the new secondary here
1470
      # unless DRBD8.AddChildren is changed to work in parallel;
1471
      # currently it doesn't since parallel invocations of
1472
      # FindUnusedMinor will conflict
1473
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1474
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1475
    else:
1476
      self.needed_locks[locking.LEVEL_NODE] = []
1477
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1478

    
1479
      if self.op.iallocator is not None:
1480
        # iallocator will select a new node in the same group
1481
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1482
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1483

    
1484
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1485

    
1486
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1487
                                   self.op.iallocator, self.op.remote_node,
1488
                                   self.op.disks, self.op.early_release,
1489
                                   self.op.ignore_ipolicy)
1490

    
1491
    self.tasklets = [self.replacer]
1492

    
1493
  def DeclareLocks(self, level):
1494
    if level == locking.LEVEL_NODEGROUP:
1495
      assert self.op.remote_node is None
1496
      assert self.op.iallocator is not None
1497
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1498

    
1499
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1500
      # Lock all groups used by instance optimistically; this requires going
1501
      # via the node before it's locked, requiring verification later on
1502
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1503
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1504

    
1505
    elif level == locking.LEVEL_NODE:
1506
      if self.op.iallocator is not None:
1507
        assert self.op.remote_node is None
1508
        assert not self.needed_locks[locking.LEVEL_NODE]
1509
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1510

    
1511
        # Lock member nodes of all locked groups
1512
        self.needed_locks[locking.LEVEL_NODE] = \
1513
          [node_name
1514
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1515
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1516
      else:
1517
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1518

    
1519
        self._LockInstancesNodes()
1520

    
1521
    elif level == locking.LEVEL_NODE_RES:
1522
      # Reuse node locks
1523
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1524
        self.needed_locks[locking.LEVEL_NODE]
1525

    
1526
  def BuildHooksEnv(self):
1527
    """Build hooks env.
1528

1529
    This runs on the master, the primary and all the secondaries.
1530

1531
    """
1532
    instance = self.replacer.instance
1533
    env = {
1534
      "MODE": self.op.mode,
1535
      "NEW_SECONDARY": self.op.remote_node,
1536
      "OLD_SECONDARY": instance.secondary_nodes[0],
1537
      }
1538
    env.update(_BuildInstanceHookEnvByObject(self, instance))
1539
    return env
1540

    
1541
  def BuildHooksNodes(self):
1542
    """Build hooks nodes.
1543

1544
    """
1545
    instance = self.replacer.instance
1546
    nl = [
1547
      self.cfg.GetMasterNode(),
1548
      instance.primary_node,
1549
      ]
1550
    if self.op.remote_node is not None:
1551
      nl.append(self.op.remote_node)
1552
    return nl, nl
1553

    
1554
  def CheckPrereq(self):
1555
    """Check prerequisites.
1556

1557
    """
1558
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1559
            self.op.iallocator is None)
1560

    
1561
    # Verify if node group locks are still correct
1562
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1563
    if owned_groups:
1564
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1565

    
1566
    return LogicalUnit.CheckPrereq(self)
1567

    
1568

    
1569
class LUInstanceActivateDisks(NoHooksLU):
1570
  """Bring up an instance's disks.
1571

1572
  """
1573
  REQ_BGL = False
1574

    
1575
  def ExpandNames(self):
1576
    self._ExpandAndLockInstance()
1577
    self.needed_locks[locking.LEVEL_NODE] = []
1578
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1579

    
1580
  def DeclareLocks(self, level):
1581
    if level == locking.LEVEL_NODE:
1582
      self._LockInstancesNodes()
1583

    
1584
  def CheckPrereq(self):
1585
    """Check prerequisites.
1586

1587
    This checks that the instance is in the cluster.
1588

1589
    """
1590
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1591
    assert self.instance is not None, \
1592
      "Cannot retrieve locked instance %s" % self.op.instance_name
1593
    _CheckNodeOnline(self, self.instance.primary_node)
1594

    
1595
  def Exec(self, feedback_fn):
1596
    """Activate the disks.
1597

1598
    """
1599
    disks_ok, disks_info = \
1600
              _AssembleInstanceDisks(self, self.instance,
1601
                                     ignore_size=self.op.ignore_size)
1602
    if not disks_ok:
1603
      raise errors.OpExecError("Cannot activate block devices")
1604

    
1605
    if self.op.wait_for_sync:
1606
      if not _WaitForSync(self, self.instance):
1607
        raise errors.OpExecError("Some disks of the instance are degraded!")
1608

    
1609
    return disks_info
1610

    
1611

    
1612
class LUInstanceDeactivateDisks(NoHooksLU):
1613
  """Shutdown an instance's disks.
1614

1615
  """
1616
  REQ_BGL = False
1617

    
1618
  def ExpandNames(self):
1619
    self._ExpandAndLockInstance()
1620
    self.needed_locks[locking.LEVEL_NODE] = []
1621
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1622

    
1623
  def DeclareLocks(self, level):
1624
    if level == locking.LEVEL_NODE:
1625
      self._LockInstancesNodes()
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    This checks that the instance is in the cluster.
1631

1632
    """
1633
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1634
    assert self.instance is not None, \
1635
      "Cannot retrieve locked instance %s" % self.op.instance_name
1636

    
1637
  def Exec(self, feedback_fn):
1638
    """Deactivate the disks
1639

1640
    """
1641
    instance = self.instance
1642
    if self.op.force:
1643
      _ShutdownInstanceDisks(self, instance)
1644
    else:
1645
      _SafeShutdownInstanceDisks(self, instance)
1646

    
1647

    
1648
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1649
                               ldisk=False):
1650
  """Check that mirrors are not degraded.
1651

1652
  @attention: The device has to be annotated already.
1653

1654
  The ldisk parameter, if True, will change the test from the
1655
  is_degraded attribute (which represents overall non-ok status for
1656
  the device(s)) to the ldisk (representing the local storage status).
1657

1658
  """
1659
  lu.cfg.SetDiskID(dev, node)
1660

    
1661
  result = True
1662

    
1663
  if on_primary or dev.AssembleOnSecondary():
1664
    rstats = lu.rpc.call_blockdev_find(node, dev)
1665
    msg = rstats.fail_msg
1666
    if msg:
1667
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1668
      result = False
1669
    elif not rstats.payload:
1670
      lu.LogWarning("Can't find disk on node %s", node)
1671
      result = False
1672
    else:
1673
      if ldisk:
1674
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1675
      else:
1676
        result = result and not rstats.payload.is_degraded
1677

    
1678
  if dev.children:
1679
    for child in dev.children:
1680
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1681
                                                     on_primary)
1682

    
1683
  return result
1684

    
1685

    
1686
def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1687
  """Wrapper around L{_CheckDiskConsistencyInner}.
1688

1689
  """
1690
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
1691
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1692
                                    ldisk=ldisk)
1693

    
1694

    
1695
def _BlockdevFind(lu, node, dev, instance):
1696
  """Wrapper around call_blockdev_find to annotate diskparams.
1697

1698
  @param lu: A reference to the lu object
1699
  @param node: The node to call out
1700
  @param dev: The device to find
1701
  @param instance: The instance object the device belongs to
1702
  @returns The result of the rpc call
1703

1704
  """
1705
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
1706
  return lu.rpc.call_blockdev_find(node, disk)
1707

    
1708

    
1709
def _GenerateUniqueNames(lu, exts):
1710
  """Generate a suitable LV name.
1711

1712
  This will generate a logical volume name for the given instance.
1713

1714
  """
1715
  results = []
1716
  for val in exts:
1717
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1718
    results.append("%s%s" % (new_id, val))
1719
  return results
1720

    
1721

    
1722
class TLReplaceDisks(Tasklet):
1723
  """Replaces disks for an instance.
1724

1725
  Note: Locking is not within the scope of this class.
1726

1727
  """
1728
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1729
               disks, early_release, ignore_ipolicy):
1730
    """Initializes this class.
1731

1732
    """
1733
    Tasklet.__init__(self, lu)
1734

    
1735
    # Parameters
1736
    self.instance_name = instance_name
1737
    self.mode = mode
1738
    self.iallocator_name = iallocator_name
1739
    self.remote_node = remote_node
1740
    self.disks = disks
1741
    self.early_release = early_release
1742
    self.ignore_ipolicy = ignore_ipolicy
1743

    
1744
    # Runtime data
1745
    self.instance = None
1746
    self.new_node = None
1747
    self.target_node = None
1748
    self.other_node = None
1749
    self.remote_node_info = None
1750
    self.node_secondary_ip = None
1751

    
1752
  @staticmethod
1753
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1754
    """Compute a new secondary node using an IAllocator.
1755

1756
    """
1757
    req = iallocator.IAReqRelocate(name=instance_name,
1758
                                   relocate_from=list(relocate_from))
1759
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1760

    
1761
    ial.Run(iallocator_name)
1762

    
1763
    if not ial.success:
1764
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1765
                                 " %s" % (iallocator_name, ial.info),
1766
                                 errors.ECODE_NORES)
1767

    
1768
    remote_node_name = ial.result[0]
1769

    
1770
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1771
               instance_name, remote_node_name)
1772

    
1773
    return remote_node_name
1774

    
1775
  def _FindFaultyDisks(self, node_name):
1776
    """Wrapper for L{_FindFaultyInstanceDisks}.
1777

1778
    """
1779
    return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1780
                                    node_name, True)
1781

    
1782
  def _CheckDisksActivated(self, instance):
1783
    """Checks if the instance disks are activated.
1784

1785
    @param instance: The instance to check disks
1786
    @return: True if they are activated, False otherwise
1787

1788
    """
1789
    nodes = instance.all_nodes
1790

    
1791
    for idx, dev in enumerate(instance.disks):
1792
      for node in nodes:
1793
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1794
        self.cfg.SetDiskID(dev, node)
1795

    
1796
        result = _BlockdevFind(self, node, dev, instance)
1797

    
1798
        if result.offline:
1799
          continue
1800
        elif result.fail_msg or not result.payload:
1801
          return False
1802

    
1803
    return True
1804

    
1805
  def CheckPrereq(self):
1806
    """Check prerequisites.
1807

1808
    This checks that the instance is in the cluster.
1809

1810
    """
1811
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1812
    assert instance is not None, \
1813
      "Cannot retrieve locked instance %s" % self.instance_name
1814

    
1815
    if instance.disk_template != constants.DT_DRBD8:
1816
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1817
                                 " instances", errors.ECODE_INVAL)
1818

    
1819
    if len(instance.secondary_nodes) != 1:
1820
      raise errors.OpPrereqError("The instance has a strange layout,"
1821
                                 " expected one secondary but found %d" %
1822
                                 len(instance.secondary_nodes),
1823
                                 errors.ECODE_FAULT)
1824

    
1825
    instance = self.instance
1826
    secondary_node = instance.secondary_nodes[0]
1827

    
1828
    if self.iallocator_name is None:
1829
      remote_node = self.remote_node
1830
    else:
1831
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1832
                                       instance.name, instance.secondary_nodes)
1833

    
1834
    if remote_node is None:
1835
      self.remote_node_info = None
1836
    else:
1837
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1838
             "Remote node '%s' is not locked" % remote_node
1839

    
1840
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1841
      assert self.remote_node_info is not None, \
1842
        "Cannot retrieve locked node %s" % remote_node
1843

    
1844
    if remote_node == self.instance.primary_node:
1845
      raise errors.OpPrereqError("The specified node is the primary node of"
1846
                                 " the instance", errors.ECODE_INVAL)
1847

    
1848
    if remote_node == secondary_node:
1849
      raise errors.OpPrereqError("The specified node is already the"
1850
                                 " secondary node of the instance",
1851
                                 errors.ECODE_INVAL)
1852

    
1853
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1854
                                    constants.REPLACE_DISK_CHG):
1855
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1856
                                 errors.ECODE_INVAL)
1857

    
1858
    if self.mode == constants.REPLACE_DISK_AUTO:
1859
      if not self._CheckDisksActivated(instance):
1860
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1861
                                   " first" % self.instance_name,
1862
                                   errors.ECODE_STATE)
1863
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1864
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1865

    
1866
      if faulty_primary and faulty_secondary:
1867
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1868
                                   " one node and can not be repaired"
1869
                                   " automatically" % self.instance_name,
1870
                                   errors.ECODE_STATE)
1871

    
1872
      if faulty_primary:
1873
        self.disks = faulty_primary
1874
        self.target_node = instance.primary_node
1875
        self.other_node = secondary_node
1876
        check_nodes = [self.target_node, self.other_node]
1877
      elif faulty_secondary:
1878
        self.disks = faulty_secondary
1879
        self.target_node = secondary_node
1880
        self.other_node = instance.primary_node
1881
        check_nodes = [self.target_node, self.other_node]
1882
      else:
1883
        self.disks = []
1884
        check_nodes = []
1885

    
1886
    else:
1887
      # Non-automatic modes
1888
      if self.mode == constants.REPLACE_DISK_PRI:
1889
        self.target_node = instance.primary_node
1890
        self.other_node = secondary_node
1891
        check_nodes = [self.target_node, self.other_node]
1892

    
1893
      elif self.mode == constants.REPLACE_DISK_SEC:
1894
        self.target_node = secondary_node
1895
        self.other_node = instance.primary_node
1896
        check_nodes = [self.target_node, self.other_node]
1897

    
1898
      elif self.mode == constants.REPLACE_DISK_CHG:
1899
        self.new_node = remote_node
1900
        self.other_node = instance.primary_node
1901
        self.target_node = secondary_node
1902
        check_nodes = [self.new_node, self.other_node]
1903

    
1904
        _CheckNodeNotDrained(self.lu, remote_node)
1905
        _CheckNodeVmCapable(self.lu, remote_node)
1906

    
1907
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1908
        assert old_node_info is not None
1909
        if old_node_info.offline and not self.early_release:
1910
          # doesn't make sense to delay the release
1911
          self.early_release = True
1912
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1913
                          " early-release mode", secondary_node)
1914

    
1915
      else:
1916
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1917
                                     self.mode)
1918

    
1919
      # If not specified all disks should be replaced
1920
      if not self.disks:
1921
        self.disks = range(len(self.instance.disks))
1922

    
1923
    # TODO: This is ugly, but right now we can't distinguish between internal
1924
    # submitted opcode and external one. We should fix that.
1925
    if self.remote_node_info:
1926
      # We change the node, lets verify it still meets instance policy
1927
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1928
      cluster = self.cfg.GetClusterInfo()
1929
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1930
                                                              new_group_info)
1931
      _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1932
                              self.cfg, ignore=self.ignore_ipolicy)
1933

    
1934
    for node in check_nodes:
1935
      _CheckNodeOnline(self.lu, node)
1936

    
1937
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
1938
                                                          self.other_node,
1939
                                                          self.target_node]
1940
                              if node_name is not None)
1941

    
1942
    # Release unneeded node and node resource locks
1943
    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1944
    _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1945
    _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1946

    
1947
    # Release any owned node group
1948
    _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1949

    
1950
    # Check whether disks are valid
1951
    for disk_idx in self.disks:
1952
      instance.FindDisk(disk_idx)
1953

    
1954
    # Get secondary node IP addresses
1955
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
1956
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
1957

    
1958
  def Exec(self, feedback_fn):
1959
    """Execute disk replacement.
1960

1961
    This dispatches the disk replacement to the appropriate handler.
1962

1963
    """
1964
    if __debug__:
1965
      # Verify owned locks before starting operation
1966
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
1967
      assert set(owned_nodes) == set(self.node_secondary_ip), \
1968
          ("Incorrect node locks, owning %s, expected %s" %
1969
           (owned_nodes, self.node_secondary_ip.keys()))
1970
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
1971
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
1972
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1973

    
1974
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
1975
      assert list(owned_instances) == [self.instance_name], \
1976
          "Instance '%s' not locked" % self.instance_name
1977

    
1978
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
1979
          "Should not own any node group lock at this point"
1980

    
1981
    if not self.disks:
1982
      feedback_fn("No disks need replacement for instance '%s'" %
1983
                  self.instance.name)
1984
      return
1985

    
1986
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
1987
                (utils.CommaJoin(self.disks), self.instance.name))
1988
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
1989
    feedback_fn("Current seconary node: %s" %
1990
                utils.CommaJoin(self.instance.secondary_nodes))
1991

    
1992
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
1993

    
1994
    # Activate the instance disks if we're replacing them on a down instance
1995
    if activate_disks:
1996
      _StartInstanceDisks(self.lu, self.instance, True)
1997

    
1998
    try:
1999
      # Should we replace the secondary node?
2000
      if self.new_node is not None:
2001
        fn = self._ExecDrbd8Secondary
2002
      else:
2003
        fn = self._ExecDrbd8DiskOnly
2004

    
2005
      result = fn(feedback_fn)
2006
    finally:
2007
      # Deactivate the instance disks if we're replacing them on a
2008
      # down instance
2009
      if activate_disks:
2010
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2011

    
2012
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2013

    
2014
    if __debug__:
2015
      # Verify owned locks
2016
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2017
      nodes = frozenset(self.node_secondary_ip)
2018
      assert ((self.early_release and not owned_nodes) or
2019
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2020
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2021
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2022

    
2023
    return result
2024

    
2025
  def _CheckVolumeGroup(self, nodes):
2026
    self.lu.LogInfo("Checking volume groups")
2027

    
2028
    vgname = self.cfg.GetVGName()
2029

    
2030
    # Make sure volume group exists on all involved nodes
2031
    results = self.rpc.call_vg_list(nodes)
2032
    if not results:
2033
      raise errors.OpExecError("Can't list volume groups on the nodes")
2034

    
2035
    for node in nodes:
2036
      res = results[node]
2037
      res.Raise("Error checking node %s" % node)
2038
      if vgname not in res.payload:
2039
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2040
                                 (vgname, node))
2041

    
2042
  def _CheckDisksExistence(self, nodes):
2043
    # Check disk existence
2044
    for idx, dev in enumerate(self.instance.disks):
2045
      if idx not in self.disks:
2046
        continue
2047

    
2048
      for node in nodes:
2049
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2050
        self.cfg.SetDiskID(dev, node)
2051

    
2052
        result = _BlockdevFind(self, node, dev, self.instance)
2053

    
2054
        msg = result.fail_msg
2055
        if msg or not result.payload:
2056
          if not msg:
2057
            msg = "disk not found"
2058
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2059
                                   (idx, node, msg))
2060

    
2061
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2062
    for idx, dev in enumerate(self.instance.disks):
2063
      if idx not in self.disks:
2064
        continue
2065

    
2066
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2067
                      (idx, node_name))
2068

    
2069
      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2070
                                   on_primary, ldisk=ldisk):
2071
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2072
                                 " replace disks for instance %s" %
2073
                                 (node_name, self.instance.name))
2074

    
2075
  def _CreateNewStorage(self, node_name):
2076
    """Create new storage on the primary or secondary node.
2077

2078
    This is only used for same-node replaces, not for changing the
2079
    secondary node, hence we don't want to modify the existing disk.
2080

2081
    """
2082
    iv_names = {}
2083

    
2084
    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2085
    for idx, dev in enumerate(disks):
2086
      if idx not in self.disks:
2087
        continue
2088

    
2089
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2090

    
2091
      self.cfg.SetDiskID(dev, node_name)
2092

    
2093
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2094
      names = _GenerateUniqueNames(self.lu, lv_names)
2095

    
2096
      (data_disk, meta_disk) = dev.children
2097
      vg_data = data_disk.logical_id[0]
2098
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2099
                             logical_id=(vg_data, names[0]),
2100
                             params=data_disk.params)
2101
      vg_meta = meta_disk.logical_id[0]
2102
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2103
                             size=constants.DRBD_META_SIZE,
2104
                             logical_id=(vg_meta, names[1]),
2105
                             params=meta_disk.params)
2106

    
2107
      new_lvs = [lv_data, lv_meta]
2108
      old_lvs = [child.Copy() for child in dev.children]
2109
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2110
      excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2111

    
2112
      # we pass force_create=True to force the LVM creation
2113
      for new_lv in new_lvs:
2114
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2115
                             _GetInstanceInfoText(self.instance), False,
2116
                             excl_stor)
2117

    
2118
    return iv_names
2119

    
2120
  def _CheckDevices(self, node_name, iv_names):
2121
    for name, (dev, _, _) in iv_names.iteritems():
2122
      self.cfg.SetDiskID(dev, node_name)
2123

    
2124
      result = _BlockdevFind(self, node_name, dev, self.instance)
2125

    
2126
      msg = result.fail_msg
2127
      if msg or not result.payload:
2128
        if not msg:
2129
          msg = "disk not found"
2130
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2131
                                 (name, msg))
2132

    
2133
      if result.payload.is_degraded:
2134
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2135

    
2136
  def _RemoveOldStorage(self, node_name, iv_names):
2137
    for name, (_, old_lvs, _) in iv_names.iteritems():
2138
      self.lu.LogInfo("Remove logical volumes for %s", name)
2139

    
2140
      for lv in old_lvs:
2141
        self.cfg.SetDiskID(lv, node_name)
2142

    
2143
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2144
        if msg:
2145
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2146
                             hint="remove unused LVs manually")
2147

    
2148
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2149
    """Replace a disk on the primary or secondary for DRBD 8.
2150

2151
    The algorithm for replace is quite complicated:
2152

2153
      1. for each disk to be replaced:
2154

2155
        1. create new LVs on the target node with unique names
2156
        1. detach old LVs from the drbd device
2157
        1. rename old LVs to name_replaced.<time_t>
2158
        1. rename new LVs to old LVs
2159
        1. attach the new LVs (with the old names now) to the drbd device
2160

2161
      1. wait for sync across all devices
2162

2163
      1. for each modified disk:
2164

2165
        1. remove old LVs (which have the name name_replaces.<time_t>)
2166

2167
    Failures are not very well handled.
2168

2169
    """
2170
    steps_total = 6
2171

    
2172
    # Step: check device activation
2173
    self.lu.LogStep(1, steps_total, "Check device existence")
2174
    self._CheckDisksExistence([self.other_node, self.target_node])
2175
    self._CheckVolumeGroup([self.target_node, self.other_node])
2176

    
2177
    # Step: check other node consistency
2178
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2179
    self._CheckDisksConsistency(self.other_node,
2180
                                self.other_node == self.instance.primary_node,
2181
                                False)
2182

    
2183
    # Step: create new storage
2184
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2185
    iv_names = self._CreateNewStorage(self.target_node)
2186

    
2187
    # Step: for each lv, detach+rename*2+attach
2188
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2189
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2190
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2191

    
2192
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2193
                                                     old_lvs)
2194
      result.Raise("Can't detach drbd from local storage on node"
2195
                   " %s for device %s" % (self.target_node, dev.iv_name))
2196
      #dev.children = []
2197
      #cfg.Update(instance)
2198

    
2199
      # ok, we created the new LVs, so now we know we have the needed
2200
      # storage; as such, we proceed on the target node to rename
2201
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2202
      # using the assumption that logical_id == physical_id (which in
2203
      # turn is the unique_id on that node)
2204

    
2205
      # FIXME(iustin): use a better name for the replaced LVs
2206
      temp_suffix = int(time.time())
2207
      ren_fn = lambda d, suff: (d.physical_id[0],
2208
                                d.physical_id[1] + "_replaced-%s" % suff)
2209

    
2210
      # Build the rename list based on what LVs exist on the node
2211
      rename_old_to_new = []
2212
      for to_ren in old_lvs:
2213
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2214
        if not result.fail_msg and result.payload:
2215
          # device exists
2216
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2217

    
2218
      self.lu.LogInfo("Renaming the old LVs on the target node")
2219
      result = self.rpc.call_blockdev_rename(self.target_node,
2220
                                             rename_old_to_new)
2221
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2222

    
2223
      # Now we rename the new LVs to the old LVs
2224
      self.lu.LogInfo("Renaming the new LVs on the target node")
2225
      rename_new_to_old = [(new, old.physical_id)
2226
                           for old, new in zip(old_lvs, new_lvs)]
2227
      result = self.rpc.call_blockdev_rename(self.target_node,
2228
                                             rename_new_to_old)
2229
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2230

    
2231
      # Intermediate steps of in memory modifications
2232
      for old, new in zip(old_lvs, new_lvs):
2233
        new.logical_id = old.logical_id
2234
        self.cfg.SetDiskID(new, self.target_node)
2235

    
2236
      # We need to modify old_lvs so that removal later removes the
2237
      # right LVs, not the newly added ones; note that old_lvs is a
2238
      # copy here
2239
      for disk in old_lvs:
2240
        disk.logical_id = ren_fn(disk, temp_suffix)
2241
        self.cfg.SetDiskID(disk, self.target_node)
2242

    
2243
      # Now that the new lvs have the old name, we can add them to the device
2244
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2245
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2246
                                                  (dev, self.instance), new_lvs)
2247
      msg = result.fail_msg
2248
      if msg:
2249
        for new_lv in new_lvs:
2250
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2251
                                               new_lv).fail_msg
2252
          if msg2:
2253
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2254
                               hint=("cleanup manually the unused logical"
2255
                                     "volumes"))
2256
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2257

    
2258
    cstep = itertools.count(5)
2259

    
2260
    if self.early_release:
2261
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2262
      self._RemoveOldStorage(self.target_node, iv_names)
2263
      # TODO: Check if releasing locks early still makes sense
2264
      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2265
    else:
2266
      # Release all resource locks except those used by the instance
2267
      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2268
                    keep=self.node_secondary_ip.keys())
2269

    
2270
    # Release all node locks while waiting for sync
2271
    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
2272

    
2273
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2274
    # shutdown in the caller into consideration.
2275

    
2276
    # Wait for sync
2277
    # This can fail as the old devices are degraded and _WaitForSync
2278
    # does a combined result over all disks, so we don't check its return value
2279
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2280
    _WaitForSync(self.lu, self.instance)
2281

    
2282
    # Check all devices manually
2283
    self._CheckDevices(self.instance.primary_node, iv_names)
2284

    
2285
    # Step: remove old storage
2286
    if not self.early_release:
2287
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2288
      self._RemoveOldStorage(self.target_node, iv_names)
2289

    
2290
  def _ExecDrbd8Secondary(self, feedback_fn):
2291
    """Replace the secondary node for DRBD 8.
2292

2293
    The algorithm for replace is quite complicated:
2294
      - for all disks of the instance:
2295
        - create new LVs on the new node with same names
2296
        - shutdown the drbd device on the old secondary
2297
        - disconnect the drbd network on the primary
2298
        - create the drbd device on the new secondary
2299
        - network attach the drbd on the primary, using an artifice:
2300
          the drbd code for Attach() will connect to the network if it
2301
          finds a device which is connected to the good local disks but
2302
          not network enabled
2303
      - wait for sync across all devices
2304
      - remove all disks from the old secondary
2305

2306
    Failures are not very well handled.
2307

2308
    """
2309
    steps_total = 6
2310

    
2311
    pnode = self.instance.primary_node
2312

    
2313
    # Step: check device activation
2314
    self.lu.LogStep(1, steps_total, "Check device existence")
2315
    self._CheckDisksExistence([self.instance.primary_node])
2316
    self._CheckVolumeGroup([self.instance.primary_node])
2317

    
2318
    # Step: check other node consistency
2319
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2320
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2321

    
2322
    # Step: create new storage
2323
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2324
    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2325
    excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2326
    for idx, dev in enumerate(disks):
2327
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2328
                      (self.new_node, idx))
2329
      # we pass force_create=True to force LVM creation
2330
      for new_lv in dev.children:
2331
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2332
                             True, _GetInstanceInfoText(self.instance), False,
2333
                             excl_stor)
2334

    
2335
    # Step 4: dbrd minors and drbd setups changes
2336
    # after this, we must manually remove the drbd minors on both the
2337
    # error and the success paths
2338
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2339
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2340
                                         for dev in self.instance.disks],
2341
                                        self.instance.name)
2342
    logging.debug("Allocated minors %r", minors)
2343

    
2344
    iv_names = {}
2345
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2346
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2347
                      (self.new_node, idx))
2348
      # create new devices on new_node; note that we create two IDs:
2349
      # one without port, so the drbd will be activated without
2350
      # networking information on the new node at this stage, and one
2351
      # with network, for the latter activation in step 4
2352
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2353
      if self.instance.primary_node == o_node1:
2354
        p_minor = o_minor1
2355
      else:
2356
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2357
        p_minor = o_minor2
2358

    
2359
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2360
                      p_minor, new_minor, o_secret)
2361
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2362
                    p_minor, new_minor, o_secret)
2363

    
2364
      iv_names[idx] = (dev, dev.children, new_net_id)
2365
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2366
                    new_net_id)
2367
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2368
                              logical_id=new_alone_id,
2369
                              children=dev.children,
2370
                              size=dev.size,
2371
                              params={})
2372
      (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
2373
                                             self.cfg)
2374
      try:
2375
        _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2376
                              anno_new_drbd,
2377
                              _GetInstanceInfoText(self.instance), False,
2378
                              excl_stor)
2379
      except errors.GenericError:
2380
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2381
        raise
2382

    
2383
    # We have new devices, shutdown the drbd on the old secondary
2384
    for idx, dev in enumerate(self.instance.disks):
2385
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2386
      self.cfg.SetDiskID(dev, self.target_node)
2387
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2388
                                            (dev, self.instance)).fail_msg
2389
      if msg:
2390
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2391
                           "node: %s" % (idx, msg),
2392
                           hint=("Please cleanup this device manually as"
2393
                                 " soon as possible"))
2394

    
2395
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2396
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2397
                                               self.instance.disks)[pnode]
2398

    
2399
    msg = result.fail_msg
2400
    if msg:
2401
      # detaches didn't succeed (unlikely)
2402
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2403
      raise errors.OpExecError("Can't detach the disks from the network on"
2404
                               " old node: %s" % (msg,))
2405

    
2406
    # if we managed to detach at least one, we update all the disks of
2407
    # the instance to point to the new secondary
2408
    self.lu.LogInfo("Updating instance configuration")
2409
    for dev, _, new_logical_id in iv_names.itervalues():
2410
      dev.logical_id = new_logical_id
2411
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2412

    
2413
    self.cfg.Update(self.instance, feedback_fn)
2414

    
2415
    # Release all node locks (the configuration has been updated)
2416
    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
2417

    
2418
    # and now perform the drbd attach
2419
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2420
                    " (standalone => connected)")
2421
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2422
                                            self.new_node],
2423
                                           self.node_secondary_ip,
2424
                                           (self.instance.disks, self.instance),
2425
                                           self.instance.name,
2426
                                           False)
2427
    for to_node, to_result in result.items():
2428
      msg = to_result.fail_msg
2429
      if msg:
2430
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2431
                           to_node, msg,
2432
                           hint=("please do a gnt-instance info to see the"
2433
                                 " status of disks"))
2434

    
2435
    cstep = itertools.count(5)
2436

    
2437
    if self.early_release:
2438
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2439
      self._RemoveOldStorage(self.target_node, iv_names)
2440
      # TODO: Check if releasing locks early still makes sense
2441
      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2442
    else:
2443
      # Release all resource locks except those used by the instance
2444
      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2445
                    keep=self.node_secondary_ip.keys())
2446

    
2447
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2448
    # shutdown in the caller into consideration.
2449

    
2450
    # Wait for sync
2451
    # This can fail as the old devices are degraded and _WaitForSync
2452
    # does a combined result over all disks, so we don't check its return value
2453
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2454
    _WaitForSync(self.lu, self.instance)
2455

    
2456
    # Check all devices manually
2457
    self._CheckDevices(self.instance.primary_node, iv_names)
2458

    
2459
    # Step: remove old storage
2460
    if not self.early_release:
2461
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2462
      self._RemoveOldStorage(self.target_node, iv_names)