Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 6ef8077e

History | View | Annotate | Download (93.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    for key in [
347
      constants.IDISK_METAVG,
348
      constants.IDISK_ADOPT,
349
      constants.IDISK_SPINDLES,
350
      ]:
351
      if key in disk:
352
        new_disk[key] = disk[key]
353

    
354
    # For extstorage, demand the `provider' option and add any
355
    # additional parameters (ext-params) to the dict
356
    if op.disk_template == constants.DT_EXT:
357
      if ext_provider:
358
        new_disk[constants.IDISK_PROVIDER] = ext_provider
359
        for key in disk:
360
          if key not in constants.IDISK_PARAMS:
361
            new_disk[key] = disk[key]
362
      else:
363
        raise errors.OpPrereqError("Missing provider for template '%s'" %
364
                                   constants.DT_EXT, errors.ECODE_INVAL)
365

    
366
    disks.append(new_disk)
367

    
368
  return disks
369

    
370

    
371
def CheckRADOSFreeSpace():
372
  """Compute disk size requirements inside the RADOS cluster.
373

374
  """
375
  # For the RADOS cluster we assume there is always enough space.
376
  pass
377

    
378

    
379
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380
                         iv_name, p_minor, s_minor):
381
  """Generate a drbd8 device complete with its children.
382

383
  """
384
  assert len(vgnames) == len(names) == 2
385
  port = lu.cfg.AllocatePort()
386
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387

    
388
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389
                          logical_id=(vgnames[0], names[0]),
390
                          params={})
391
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
393
                          size=constants.DRBD_META_SIZE,
394
                          logical_id=(vgnames[1], names[1]),
395
                          params={})
396
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398
                          logical_id=(primary, secondary, port,
399
                                      p_minor, s_minor,
400
                                      shared_secret),
401
                          children=[dev_data, dev_meta],
402
                          iv_name=iv_name, params={})
403
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404
  return drbd_dev
405

    
406

    
407
def GenerateDiskTemplate(
408
  lu, template_name, instance_name, primary_node, secondary_nodes,
409
  disk_info, file_storage_dir, file_driver, base_index,
410
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412
  """Generate the entire disk layout for a given template type.
413

414
  """
415
  vgname = lu.cfg.GetVGName()
416
  disk_count = len(disk_info)
417
  disks = []
418

    
419
  if template_name == constants.DT_DISKLESS:
420
    pass
421
  elif template_name == constants.DT_DRBD8:
422
    if len(secondary_nodes) != 1:
423
      raise errors.ProgrammerError("Wrong template configuration")
424
    remote_node = secondary_nodes[0]
425
    minors = lu.cfg.AllocateDRBDMinor(
426
      [primary_node, remote_node] * len(disk_info), instance_name)
427

    
428
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429
                                                       full_disk_params)
430
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431

    
432
    names = []
433
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434
                                               for i in range(disk_count)]):
435
      names.append(lv_prefix + "_data")
436
      names.append(lv_prefix + "_meta")
437
    for idx, disk in enumerate(disk_info):
438
      disk_index = idx + base_index
439
      data_vg = disk.get(constants.IDISK_VG, vgname)
440
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442
                                      disk[constants.IDISK_SIZE],
443
                                      [data_vg, meta_vg],
444
                                      names[idx * 2:idx * 2 + 2],
445
                                      "disk/%d" % disk_index,
446
                                      minors[idx * 2], minors[idx * 2 + 1])
447
      disk_dev.mode = disk[constants.IDISK_MODE]
448
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
449
      disks.append(disk_dev)
450
  else:
451
    if secondary_nodes:
452
      raise errors.ProgrammerError("Wrong template configuration")
453

    
454
    if template_name == constants.DT_FILE:
455
      _req_file_storage()
456
    elif template_name == constants.DT_SHARED_FILE:
457
      _req_shr_file_storage()
458

    
459
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460
    if name_prefix is None:
461
      names = None
462
    else:
463
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464
                                        (name_prefix, base_index + i)
465
                                        for i in range(disk_count)])
466

    
467
    if template_name == constants.DT_PLAIN:
468

    
469
      def logical_id_fn(idx, _, disk):
470
        vg = disk.get(constants.IDISK_VG, vgname)
471
        return (vg, names[idx])
472

    
473
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474
      logical_id_fn = \
475
        lambda _, disk_index, disk: (file_driver,
476
                                     "%s/disk%d" % (file_storage_dir,
477
                                                    disk_index))
478
    elif template_name == constants.DT_BLOCK:
479
      logical_id_fn = \
480
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481
                                       disk[constants.IDISK_ADOPT])
482
    elif template_name == constants.DT_RBD:
483
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484
    elif template_name == constants.DT_EXT:
485
      def logical_id_fn(idx, _, disk):
486
        provider = disk.get(constants.IDISK_PROVIDER, None)
487
        if provider is None:
488
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489
                                       " not found", constants.DT_EXT,
490
                                       constants.IDISK_PROVIDER)
491
        return (provider, names[idx])
492
    else:
493
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494

    
495
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496

    
497
    for idx, disk in enumerate(disk_info):
498
      params = {}
499
      # Only for the Ext template add disk_info to params
500
      if template_name == constants.DT_EXT:
501
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502
        for key in disk:
503
          if key not in constants.IDISK_PARAMS:
504
            params[key] = disk[key]
505
      disk_index = idx + base_index
506
      size = disk[constants.IDISK_SIZE]
507
      feedback_fn("* disk %s, size %s" %
508
                  (disk_index, utils.FormatUnit(size, "h")))
509
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
510
                              logical_id=logical_id_fn(idx, disk_index, disk),
511
                              iv_name="disk/%d" % disk_index,
512
                              mode=disk[constants.IDISK_MODE],
513
                              params=params,
514
                              spindles=disk.get(constants.IDISK_SPINDLES))
515
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
516
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517
      disks.append(disk_dev)
518

    
519
  return disks
520

    
521

    
522
def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523
  """Check the presence of the spindle options with exclusive_storage.
524

525
  @type diskdict: dict
526
  @param diskdict: disk parameters
527
  @type es_flag: bool
528
  @param es_flag: the effective value of the exlusive_storage flag
529
  @raise errors.OpPrereqError when spindles are given and they should not
530

531
  """
532
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533
      diskdict[constants.IDISK_SPINDLES] is not None):
534
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535
                               " when exclusive storage is not active",
536
                               errors.ECODE_INVAL)
537

    
538

    
539
class LUInstanceRecreateDisks(LogicalUnit):
540
  """Recreate an instance's missing disks.
541

542
  """
543
  HPATH = "instance-recreate-disks"
544
  HTYPE = constants.HTYPE_INSTANCE
545
  REQ_BGL = False
546

    
547
  _MODIFYABLE = compat.UniqueFrozenset([
548
    constants.IDISK_SIZE,
549
    constants.IDISK_MODE,
550
    constants.IDISK_SPINDLES,
551
    ])
552

    
553
  # New or changed disk parameters may have different semantics
554
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555
    constants.IDISK_ADOPT,
556

    
557
    # TODO: Implement support changing VG while recreating
558
    constants.IDISK_VG,
559
    constants.IDISK_METAVG,
560
    constants.IDISK_PROVIDER,
561
    constants.IDISK_NAME,
562
    ]))
563

    
564
  def _RunAllocator(self):
565
    """Run the allocator based on input opcode.
566

567
    """
568
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569

    
570
    # FIXME
571
    # The allocator should actually run in "relocate" mode, but current
572
    # allocators don't support relocating all the nodes of an instance at
573
    # the same time. As a workaround we use "allocate" mode, but this is
574
    # suboptimal for two reasons:
575
    # - The instance name passed to the allocator is present in the list of
576
    #   existing instances, so there could be a conflict within the
577
    #   internal structures of the allocator. This doesn't happen with the
578
    #   current allocators, but it's a liability.
579
    # - The allocator counts the resources used by the instance twice: once
580
    #   because the instance exists already, and once because it tries to
581
    #   allocate a new instance.
582
    # The allocator could choose some of the nodes on which the instance is
583
    # running, but that's not a problem. If the instance nodes are broken,
584
    # they should be already be marked as drained or offline, and hence
585
    # skipped by the allocator. If instance disks have been lost for other
586
    # reasons, then recreating the disks on the same nodes should be fine.
587
    disk_template = self.instance.disk_template
588
    spindle_use = be_full[constants.BE_SPINDLE_USE]
589
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590
                                        disk_template=disk_template,
591
                                        tags=list(self.instance.GetTags()),
592
                                        os=self.instance.os,
593
                                        nics=[{}],
594
                                        vcpus=be_full[constants.BE_VCPUS],
595
                                        memory=be_full[constants.BE_MAXMEM],
596
                                        spindle_use=spindle_use,
597
                                        disks=[{constants.IDISK_SIZE: d.size,
598
                                                constants.IDISK_MODE: d.mode}
599
                                               for d in self.instance.disks],
600
                                        hypervisor=self.instance.hypervisor,
601
                                        node_whitelist=None)
602
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
603

    
604
    ial.Run(self.op.iallocator)
605

    
606
    assert req.RequiredNodes() == len(self.instance.all_nodes)
607

    
608
    if not ial.success:
609
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610
                                 " %s" % (self.op.iallocator, ial.info),
611
                                 errors.ECODE_NORES)
612

    
613
    self.op.nodes = ial.result
614
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615
                 self.op.instance_name, self.op.iallocator,
616
                 utils.CommaJoin(ial.result))
617

    
618
  def CheckArguments(self):
619
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620
      # Normalize and convert deprecated list of disk indices
621
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
622

    
623
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
624
    if duplicates:
625
      raise errors.OpPrereqError("Some disks have been specified more than"
626
                                 " once: %s" % utils.CommaJoin(duplicates),
627
                                 errors.ECODE_INVAL)
628

    
629
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630
    # when neither iallocator nor nodes are specified
631
    if self.op.iallocator or self.op.nodes:
632
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
633

    
634
    for (idx, params) in self.op.disks:
635
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
637
      if unsupported:
638
        raise errors.OpPrereqError("Parameters for disk %s try to change"
639
                                   " unmodifyable parameter(s): %s" %
640
                                   (idx, utils.CommaJoin(unsupported)),
641
                                   errors.ECODE_INVAL)
642

    
643
  def ExpandNames(self):
644
    self._ExpandAndLockInstance()
645
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
646

    
647
    if self.op.nodes:
648
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
650
    else:
651
      self.needed_locks[locking.LEVEL_NODE] = []
652
      if self.op.iallocator:
653
        # iallocator will select a new node in the same group
654
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
655
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
656

    
657
    self.needed_locks[locking.LEVEL_NODE_RES] = []
658

    
659
  def DeclareLocks(self, level):
660
    if level == locking.LEVEL_NODEGROUP:
661
      assert self.op.iallocator is not None
662
      assert not self.op.nodes
663
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
665
      # Lock the primary group used by the instance optimistically; this
666
      # requires going via the node before it's locked, requiring
667
      # verification later on
668
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
669
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
670

    
671
    elif level == locking.LEVEL_NODE:
672
      # If an allocator is used, then we lock all the nodes in the current
673
      # instance group, as we don't know yet which ones will be selected;
674
      # if we replace the nodes without using an allocator, locks are
675
      # already declared in ExpandNames; otherwise, we need to lock all the
676
      # instance nodes for disk re-creation
677
      if self.op.iallocator:
678
        assert not self.op.nodes
679
        assert not self.needed_locks[locking.LEVEL_NODE]
680
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
681

    
682
        # Lock member nodes of the group of the primary node
683
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684
          self.needed_locks[locking.LEVEL_NODE].extend(
685
            self.cfg.GetNodeGroup(group_uuid).members)
686

    
687
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688
      elif not self.op.nodes:
689
        self._LockInstancesNodes(primary_only=False)
690
    elif level == locking.LEVEL_NODE_RES:
691
      # Copy node locks
692
      self.needed_locks[locking.LEVEL_NODE_RES] = \
693
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
694

    
695
  def BuildHooksEnv(self):
696
    """Build hooks env.
697

698
    This runs on master, primary and secondary nodes of the instance.
699

700
    """
701
    return BuildInstanceHookEnvByObject(self, self.instance)
702

    
703
  def BuildHooksNodes(self):
704
    """Build hooks nodes.
705

706
    """
707
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
708
    return (nl, nl)
709

    
710
  def CheckPrereq(self):
711
    """Check prerequisites.
712

713
    This checks that the instance is in the cluster and is not running.
714

715
    """
716
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717
    assert instance is not None, \
718
      "Cannot retrieve locked instance %s" % self.op.instance_name
719
    if self.op.nodes:
720
      if len(self.op.nodes) != len(instance.all_nodes):
721
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722
                                   " %d replacement nodes were specified" %
723
                                   (instance.name, len(instance.all_nodes),
724
                                    len(self.op.nodes)),
725
                                   errors.ECODE_INVAL)
726
      assert instance.disk_template != constants.DT_DRBD8 or \
727
             len(self.op.nodes) == 2
728
      assert instance.disk_template != constants.DT_PLAIN or \
729
             len(self.op.nodes) == 1
730
      primary_node = self.op.nodes[0]
731
    else:
732
      primary_node = instance.primary_node
733
    if not self.op.iallocator:
734
      CheckNodeOnline(self, primary_node)
735

    
736
    if instance.disk_template == constants.DT_DISKLESS:
737
      raise errors.OpPrereqError("Instance '%s' has no disks" %
738
                                 self.op.instance_name, errors.ECODE_INVAL)
739

    
740
    # Verify if node group locks are still correct
741
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
742
    if owned_groups:
743
      # Node group locks are acquired only for the primary node (and only
744
      # when the allocator is used)
745
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
746
                              primary_only=True)
747

    
748
    # if we replace nodes *and* the old primary is offline, we don't
749
    # check the instance state
750
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753
                         msg="cannot recreate disks")
754

    
755
    if self.op.disks:
756
      self.disks = dict(self.op.disks)
757
    else:
758
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
759

    
760
    maxidx = max(self.disks.keys())
761
    if maxidx >= len(instance.disks):
762
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
763
                                 errors.ECODE_INVAL)
764

    
765
    if ((self.op.nodes or self.op.iallocator) and
766
         sorted(self.disks.keys()) != range(len(instance.disks))):
767
      raise errors.OpPrereqError("Can't recreate disks partially and"
768
                                 " change the nodes at the same time",
769
                                 errors.ECODE_INVAL)
770

    
771
    self.instance = instance
772

    
773
    if self.op.iallocator:
774
      self._RunAllocator()
775
      # Release unneeded node and node resource locks
776
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
779

    
780
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
781

    
782
    if self.op.nodes:
783
      nodes = self.op.nodes
784
    else:
785
      nodes = instance.all_nodes
786
    excl_stor = compat.any(
787
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
788
      )
789
    for new_params in self.disks.values():
790
      CheckSpindlesExclusiveStorage(new_params, excl_stor)
791

    
792
  def Exec(self, feedback_fn):
793
    """Recreate the disks.
794

795
    """
796
    instance = self.instance
797

    
798
    assert (self.owned_locks(locking.LEVEL_NODE) ==
799
            self.owned_locks(locking.LEVEL_NODE_RES))
800

    
801
    to_skip = []
802
    mods = [] # keeps track of needed changes
803

    
804
    for idx, disk in enumerate(instance.disks):
805
      try:
806
        changes = self.disks[idx]
807
      except KeyError:
808
        # Disk should not be recreated
809
        to_skip.append(idx)
810
        continue
811

    
812
      # update secondaries for disks, if needed
813
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814
        # need to update the nodes and minors
815
        assert len(self.op.nodes) == 2
816
        assert len(disk.logical_id) == 6 # otherwise disk internals
817
                                         # have changed
818
        (_, _, old_port, _, _, old_secret) = disk.logical_id
819
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821
                  new_minors[0], new_minors[1], old_secret)
822
        assert len(disk.logical_id) == len(new_id)
823
      else:
824
        new_id = None
825

    
826
      mods.append((idx, new_id, changes))
827

    
828
    # now that we have passed all asserts above, we can apply the mods
829
    # in a single run (to avoid partial changes)
830
    for idx, new_id, changes in mods:
831
      disk = instance.disks[idx]
832
      if new_id is not None:
833
        assert disk.dev_type == constants.LD_DRBD8
834
        disk.logical_id = new_id
835
      if changes:
836
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837
                    mode=changes.get(constants.IDISK_MODE, None),
838
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
839

    
840
    # change primary node, if needed
841
    if self.op.nodes:
842
      instance.primary_node = self.op.nodes[0]
843
      self.LogWarning("Changing the instance's nodes, you will have to"
844
                      " remove any disks left on the older nodes manually")
845

    
846
    if self.op.nodes:
847
      self.cfg.Update(instance, feedback_fn)
848

    
849
    # All touched nodes must be locked
850
    mylocks = self.owned_locks(locking.LEVEL_NODE)
851
    assert mylocks.issuperset(frozenset(instance.all_nodes))
852
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
853

    
854
    # TODO: Release node locks before wiping, or explain why it's not possible
855
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856
      wipedisks = [(idx, disk, 0)
857
                   for (idx, disk) in enumerate(instance.disks)
858
                   if idx not in to_skip]
859
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
860

    
861

    
862
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863
  """Checks if nodes have enough free disk space in the specified VG.
864

865
  This function checks if all given nodes have the needed amount of
866
  free disk. In case any node has less disk or we cannot get the
867
  information from the node, this function raises an OpPrereqError
868
  exception.
869

870
  @type lu: C{LogicalUnit}
871
  @param lu: a logical unit from which we get configuration data
872
  @type nodenames: C{list}
873
  @param nodenames: the list of node names to check
874
  @type vg: C{str}
875
  @param vg: the volume group to check
876
  @type requested: C{int}
877
  @param requested: the amount of disk in MiB to check for
878
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
879
      or we cannot check the node
880

881
  """
882
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884
  # the current functionality. Refactor to make it more flexible.
885
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
886
                                   es_flags)
887
  for node in nodenames:
888
    info = nodeinfo[node]
889
    info.Raise("Cannot get current information from node %s" % node,
890
               prereq=True, ecode=errors.ECODE_ENVIRON)
891
    (_, (vg_info, ), _) = info.payload
892
    vg_free = vg_info.get("vg_free", None)
893
    if not isinstance(vg_free, int):
894
      raise errors.OpPrereqError("Can't compute free disk space on node"
895
                                 " %s for vg %s, result was '%s'" %
896
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
897
    if requested > vg_free:
898
      raise errors.OpPrereqError("Not enough disk space on target node %s"
899
                                 " vg %s: required %d MiB, available %d MiB" %
900
                                 (node, vg, requested, vg_free),
901
                                 errors.ECODE_NORES)
902

    
903

    
904
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905
  """Checks if nodes have enough free disk space in all the VGs.
906

907
  This function checks if all given nodes have the needed amount of
908
  free disk. In case any node has less disk or we cannot get the
909
  information from the node, this function raises an OpPrereqError
910
  exception.
911

912
  @type lu: C{LogicalUnit}
913
  @param lu: a logical unit from which we get configuration data
914
  @type nodenames: C{list}
915
  @param nodenames: the list of node names to check
916
  @type req_sizes: C{dict}
917
  @param req_sizes: the hash of vg and corresponding amount of disk in
918
      MiB to check for
919
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
920
      or we cannot check the node
921

922
  """
923
  for vg, req_size in req_sizes.items():
924
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
925

    
926

    
927
def _DiskSizeInBytesToMebibytes(lu, size):
928
  """Converts a disk size in bytes to mebibytes.
929

930
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
931

932
  """
933
  (mib, remainder) = divmod(size, 1024 * 1024)
934

    
935
  if remainder != 0:
936
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937
                  " to not overwrite existing data (%s bytes will not be"
938
                  " wiped)", (1024 * 1024) - remainder)
939
    mib += 1
940

    
941
  return mib
942

    
943

    
944
def _CalcEta(time_taken, written, total_size):
945
  """Calculates the ETA based on size written and total size.
946

947
  @param time_taken: The time taken so far
948
  @param written: amount written so far
949
  @param total_size: The total size of data to be written
950
  @return: The remaining time in seconds
951

952
  """
953
  avg_time = time_taken / float(written)
954
  return (total_size - written) * avg_time
955

    
956

    
957
def WipeDisks(lu, instance, disks=None):
958
  """Wipes instance disks.
959

960
  @type lu: L{LogicalUnit}
961
  @param lu: the logical unit on whose behalf we execute
962
  @type instance: L{objects.Instance}
963
  @param instance: the instance whose disks we should create
964
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965
  @param disks: Disk details; tuple contains disk index, disk object and the
966
    start offset
967

968
  """
969
  node = instance.primary_node
970

    
971
  if disks is None:
972
    disks = [(idx, disk, 0)
973
             for (idx, disk) in enumerate(instance.disks)]
974

    
975
  for (_, device, _) in disks:
976
    lu.cfg.SetDiskID(device, node)
977

    
978
  logging.info("Pausing synchronization of disks of instance '%s'",
979
               instance.name)
980
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
981
                                                  (map(compat.snd, disks),
982
                                                   instance),
983
                                                  True)
984
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
985

    
986
  for idx, success in enumerate(result.payload):
987
    if not success:
988
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
989
                   " failed", idx, instance.name)
990

    
991
  try:
992
    for (idx, device, offset) in disks:
993
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
995
      wipe_chunk_size = \
996
        int(min(constants.MAX_WIPE_CHUNK,
997
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
998

    
999
      size = device.size
1000
      last_output = 0
1001
      start_time = time.time()
1002

    
1003
      if offset == 0:
1004
        info_text = ""
1005
      else:
1006
        info_text = (" (from %s to %s)" %
1007
                     (utils.FormatUnit(offset, "h"),
1008
                      utils.FormatUnit(size, "h")))
1009

    
1010
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1011

    
1012
      logging.info("Wiping disk %d for instance %s on node %s using"
1013
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1014

    
1015
      while offset < size:
1016
        wipe_size = min(wipe_chunk_size, size - offset)
1017

    
1018
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1019
                      idx, offset, wipe_size)
1020

    
1021
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1022
                                           wipe_size)
1023
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024
                     (idx, offset, wipe_size))
1025

    
1026
        now = time.time()
1027
        offset += wipe_size
1028
        if now - last_output >= 60:
1029
          eta = _CalcEta(now - start_time, offset, size)
1030
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1031
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1032
          last_output = now
1033
  finally:
1034
    logging.info("Resuming synchronization of disks for instance '%s'",
1035
                 instance.name)
1036

    
1037
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038
                                                    (map(compat.snd, disks),
1039
                                                     instance),
1040
                                                    False)
1041

    
1042
    if result.fail_msg:
1043
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044
                    node, result.fail_msg)
1045
    else:
1046
      for idx, success in enumerate(result.payload):
1047
        if not success:
1048
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049
                        " failed", idx, instance.name)
1050

    
1051

    
1052
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053
  """Wrapper for L{WipeDisks} that handles errors.
1054

1055
  @type lu: L{LogicalUnit}
1056
  @param lu: the logical unit on whose behalf we execute
1057
  @type instance: L{objects.Instance}
1058
  @param instance: the instance whose disks we should wipe
1059
  @param disks: see L{WipeDisks}
1060
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1061
      case of error
1062
  @raise errors.OpPrereqError: in case of failure
1063

1064
  """
1065
  try:
1066
    WipeDisks(lu, instance, disks=disks)
1067
  except errors.OpExecError:
1068
    logging.warning("Wiping disks for instance '%s' failed",
1069
                    instance.name)
1070
    _UndoCreateDisks(lu, cleanup)
1071
    raise
1072

    
1073

    
1074
def ExpandCheckDisks(instance, disks):
1075
  """Return the instance disks selected by the disks list
1076

1077
  @type disks: list of L{objects.Disk} or None
1078
  @param disks: selected disks
1079
  @rtype: list of L{objects.Disk}
1080
  @return: selected instance disks to act on
1081

1082
  """
1083
  if disks is None:
1084
    return instance.disks
1085
  else:
1086
    if not set(disks).issubset(instance.disks):
1087
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1088
                                   " target instance: expected a subset of %r,"
1089
                                   " got %r" % (instance.disks, disks))
1090
    return disks
1091

    
1092

    
1093
def WaitForSync(lu, instance, disks=None, oneshot=False):
1094
  """Sleep and poll for an instance's disk to sync.
1095

1096
  """
1097
  if not instance.disks or disks is not None and not disks:
1098
    return True
1099

    
1100
  disks = ExpandCheckDisks(instance, disks)
1101

    
1102
  if not oneshot:
1103
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1104

    
1105
  node = instance.primary_node
1106

    
1107
  for dev in disks:
1108
    lu.cfg.SetDiskID(dev, node)
1109

    
1110
  # TODO: Convert to utils.Retry
1111

    
1112
  retries = 0
1113
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1114
  while True:
1115
    max_time = 0
1116
    done = True
1117
    cumul_degraded = False
1118
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1119
    msg = rstats.fail_msg
1120
    if msg:
1121
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1122
      retries += 1
1123
      if retries >= 10:
1124
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1125
                                 " aborting." % node)
1126
      time.sleep(6)
1127
      continue
1128
    rstats = rstats.payload
1129
    retries = 0
1130
    for i, mstat in enumerate(rstats):
1131
      if mstat is None:
1132
        lu.LogWarning("Can't compute data for node %s/%s",
1133
                      node, disks[i].iv_name)
1134
        continue
1135

    
1136
      cumul_degraded = (cumul_degraded or
1137
                        (mstat.is_degraded and mstat.sync_percent is None))
1138
      if mstat.sync_percent is not None:
1139
        done = False
1140
        if mstat.estimated_time is not None:
1141
          rem_time = ("%s remaining (estimated)" %
1142
                      utils.FormatSeconds(mstat.estimated_time))
1143
          max_time = mstat.estimated_time
1144
        else:
1145
          rem_time = "no time estimate"
1146
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1147
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1148

    
1149
    # if we're done but degraded, let's do a few small retries, to
1150
    # make sure we see a stable and not transient situation; therefore
1151
    # we force restart of the loop
1152
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1153
      logging.info("Degraded disks found, %d retries left", degr_retries)
1154
      degr_retries -= 1
1155
      time.sleep(1)
1156
      continue
1157

    
1158
    if done or oneshot:
1159
      break
1160

    
1161
    time.sleep(min(60, max_time))
1162

    
1163
  if done:
1164
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1165

    
1166
  return not cumul_degraded
1167

    
1168

    
1169
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1170
  """Shutdown block devices of an instance.
1171

1172
  This does the shutdown on all nodes of the instance.
1173

1174
  If the ignore_primary is false, errors on the primary node are
1175
  ignored.
1176

1177
  """
1178
  all_result = True
1179
  disks = ExpandCheckDisks(instance, disks)
1180

    
1181
  for disk in disks:
1182
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1183
      lu.cfg.SetDiskID(top_disk, node)
1184
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1185
      msg = result.fail_msg
1186
      if msg:
1187
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1188
                      disk.iv_name, node, msg)
1189
        if ((node == instance.primary_node and not ignore_primary) or
1190
            (node != instance.primary_node and not result.offline)):
1191
          all_result = False
1192
  return all_result
1193

    
1194

    
1195
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1196
  """Shutdown block devices of an instance.
1197

1198
  This function checks if an instance is running, before calling
1199
  _ShutdownInstanceDisks.
1200

1201
  """
1202
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1203
  ShutdownInstanceDisks(lu, instance, disks=disks)
1204

    
1205

    
1206
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1207
                           ignore_size=False):
1208
  """Prepare the block devices for an instance.
1209

1210
  This sets up the block devices on all nodes.
1211

1212
  @type lu: L{LogicalUnit}
1213
  @param lu: the logical unit on whose behalf we execute
1214
  @type instance: L{objects.Instance}
1215
  @param instance: the instance for whose disks we assemble
1216
  @type disks: list of L{objects.Disk} or None
1217
  @param disks: which disks to assemble (or all, if None)
1218
  @type ignore_secondaries: boolean
1219
  @param ignore_secondaries: if true, errors on secondary nodes
1220
      won't result in an error return from the function
1221
  @type ignore_size: boolean
1222
  @param ignore_size: if true, the current known size of the disk
1223
      will not be used during the disk activation, useful for cases
1224
      when the size is wrong
1225
  @return: False if the operation failed, otherwise a list of
1226
      (host, instance_visible_name, node_visible_name)
1227
      with the mapping from node devices to instance devices
1228

1229
  """
1230
  device_info = []
1231
  disks_ok = True
1232
  iname = instance.name
1233
  disks = ExpandCheckDisks(instance, disks)
1234

    
1235
  # With the two passes mechanism we try to reduce the window of
1236
  # opportunity for the race condition of switching DRBD to primary
1237
  # before handshaking occured, but we do not eliminate it
1238

    
1239
  # The proper fix would be to wait (with some limits) until the
1240
  # connection has been made and drbd transitions from WFConnection
1241
  # into any other network-connected state (Connected, SyncTarget,
1242
  # SyncSource, etc.)
1243

    
1244
  # 1st pass, assemble on all nodes in secondary mode
1245
  for idx, inst_disk in enumerate(disks):
1246
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1247
      if ignore_size:
1248
        node_disk = node_disk.Copy()
1249
        node_disk.UnsetSize()
1250
      lu.cfg.SetDiskID(node_disk, node)
1251
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252
                                             False, idx)
1253
      msg = result.fail_msg
1254
      if msg:
1255
        is_offline_secondary = (node in instance.secondary_nodes and
1256
                                result.offline)
1257
        lu.LogWarning("Could not prepare block device %s on node %s"
1258
                      " (is_primary=False, pass=1): %s",
1259
                      inst_disk.iv_name, node, msg)
1260
        if not (ignore_secondaries or is_offline_secondary):
1261
          disks_ok = False
1262

    
1263
  # FIXME: race condition on drbd migration to primary
1264

    
1265
  # 2nd pass, do only the primary node
1266
  for idx, inst_disk in enumerate(disks):
1267
    dev_path = None
1268

    
1269
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1270
      if node != instance.primary_node:
1271
        continue
1272
      if ignore_size:
1273
        node_disk = node_disk.Copy()
1274
        node_disk.UnsetSize()
1275
      lu.cfg.SetDiskID(node_disk, node)
1276
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1277
                                             True, idx)
1278
      msg = result.fail_msg
1279
      if msg:
1280
        lu.LogWarning("Could not prepare block device %s on node %s"
1281
                      " (is_primary=True, pass=2): %s",
1282
                      inst_disk.iv_name, node, msg)
1283
        disks_ok = False
1284
      else:
1285
        dev_path = result.payload
1286

    
1287
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1288

    
1289
  # leave the disks configured for the primary node
1290
  # this is a workaround that would be fixed better by
1291
  # improving the logical/physical id handling
1292
  for disk in disks:
1293
    lu.cfg.SetDiskID(disk, instance.primary_node)
1294

    
1295
  return disks_ok, device_info
1296

    
1297

    
1298
def StartInstanceDisks(lu, instance, force):
1299
  """Start the disks of an instance.
1300

1301
  """
1302
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1303
                                      ignore_secondaries=force)
1304
  if not disks_ok:
1305
    ShutdownInstanceDisks(lu, instance)
1306
    if force is not None and not force:
1307
      lu.LogWarning("",
1308
                    hint=("If the message above refers to a secondary node,"
1309
                          " you can retry the operation using '--force'"))
1310
    raise errors.OpExecError("Disk consistency error")
1311

    
1312

    
1313
class LUInstanceGrowDisk(LogicalUnit):
1314
  """Grow a disk of an instance.
1315

1316
  """
1317
  HPATH = "disk-grow"
1318
  HTYPE = constants.HTYPE_INSTANCE
1319
  REQ_BGL = False
1320

    
1321
  def ExpandNames(self):
1322
    self._ExpandAndLockInstance()
1323
    self.needed_locks[locking.LEVEL_NODE] = []
1324
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1325
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1326
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1327

    
1328
  def DeclareLocks(self, level):
1329
    if level == locking.LEVEL_NODE:
1330
      self._LockInstancesNodes()
1331
    elif level == locking.LEVEL_NODE_RES:
1332
      # Copy node locks
1333
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1334
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1335

    
1336
  def BuildHooksEnv(self):
1337
    """Build hooks env.
1338

1339
    This runs on the master, the primary and all the secondaries.
1340

1341
    """
1342
    env = {
1343
      "DISK": self.op.disk,
1344
      "AMOUNT": self.op.amount,
1345
      "ABSOLUTE": self.op.absolute,
1346
      }
1347
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1348
    return env
1349

    
1350
  def BuildHooksNodes(self):
1351
    """Build hooks nodes.
1352

1353
    """
1354
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1355
    return (nl, nl)
1356

    
1357
  def CheckPrereq(self):
1358
    """Check prerequisites.
1359

1360
    This checks that the instance is in the cluster.
1361

1362
    """
1363
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1364
    assert instance is not None, \
1365
      "Cannot retrieve locked instance %s" % self.op.instance_name
1366
    nodenames = list(instance.all_nodes)
1367
    for node in nodenames:
1368
      CheckNodeOnline(self, node)
1369

    
1370
    self.instance = instance
1371

    
1372
    if instance.disk_template not in constants.DTS_GROWABLE:
1373
      raise errors.OpPrereqError("Instance's disk layout does not support"
1374
                                 " growing", errors.ECODE_INVAL)
1375

    
1376
    self.disk = instance.FindDisk(self.op.disk)
1377

    
1378
    if self.op.absolute:
1379
      self.target = self.op.amount
1380
      self.delta = self.target - self.disk.size
1381
      if self.delta < 0:
1382
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1383
                                   "current disk size (%s)" %
1384
                                   (utils.FormatUnit(self.target, "h"),
1385
                                    utils.FormatUnit(self.disk.size, "h")),
1386
                                   errors.ECODE_STATE)
1387
    else:
1388
      self.delta = self.op.amount
1389
      self.target = self.disk.size + self.delta
1390
      if self.delta < 0:
1391
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1392
                                   utils.FormatUnit(self.delta, "h"),
1393
                                   errors.ECODE_INVAL)
1394

    
1395
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1396

    
1397
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1398
    template = self.instance.disk_template
1399
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1400
      # TODO: check the free disk space for file, when that feature will be
1401
      # supported
1402
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1403
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1404
                        nodes)
1405
      if es_nodes:
1406
        # With exclusive storage we need to something smarter than just looking
1407
        # at free space; for now, let's simply abort the operation.
1408
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1409
                                   " is enabled", errors.ECODE_STATE)
1410
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1411

    
1412
  def Exec(self, feedback_fn):
1413
    """Execute disk grow.
1414

1415
    """
1416
    instance = self.instance
1417
    disk = self.disk
1418

    
1419
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1420
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1421
            self.owned_locks(locking.LEVEL_NODE_RES))
1422

    
1423
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1424

    
1425
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1426
    if not disks_ok:
1427
      raise errors.OpExecError("Cannot activate block device to grow")
1428

    
1429
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1430
                (self.op.disk, instance.name,
1431
                 utils.FormatUnit(self.delta, "h"),
1432
                 utils.FormatUnit(self.target, "h")))
1433

    
1434
    # First run all grow ops in dry-run mode
1435
    for node in instance.all_nodes:
1436
      self.cfg.SetDiskID(disk, node)
1437
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1438
                                           True, True)
1439
      result.Raise("Dry-run grow request failed to node %s" % node)
1440

    
1441
    if wipe_disks:
1442
      # Get disk size from primary node for wiping
1443
      result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1444
                                                    [disk])
1445
      result.Raise("Failed to retrieve disk size from node '%s'" %
1446
                   instance.primary_node)
1447

    
1448
      (disk_dimensions, ) = result.payload
1449

    
1450
      if disk_dimensions is None:
1451
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1452
                                 " node '%s'" % instance.primary_node)
1453
      (disk_size_in_bytes, _) = disk_dimensions
1454

    
1455
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1456

    
1457
      assert old_disk_size >= disk.size, \
1458
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1459
         (old_disk_size, disk.size))
1460
    else:
1461
      old_disk_size = None
1462

    
1463
    # We know that (as far as we can test) operations across different
1464
    # nodes will succeed, time to run it for real on the backing storage
1465
    for node in instance.all_nodes:
1466
      self.cfg.SetDiskID(disk, node)
1467
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1468
                                           False, True)
1469
      result.Raise("Grow request failed to node %s" % node)
1470

    
1471
    # And now execute it for logical storage, on the primary node
1472
    node = instance.primary_node
1473
    self.cfg.SetDiskID(disk, node)
1474
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1475
                                         False, False)
1476
    result.Raise("Grow request failed to node %s" % node)
1477

    
1478
    disk.RecordGrow(self.delta)
1479
    self.cfg.Update(instance, feedback_fn)
1480

    
1481
    # Changes have been recorded, release node lock
1482
    ReleaseLocks(self, locking.LEVEL_NODE)
1483

    
1484
    # Downgrade lock while waiting for sync
1485
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1486

    
1487
    assert wipe_disks ^ (old_disk_size is None)
1488

    
1489
    if wipe_disks:
1490
      assert instance.disks[self.op.disk] == disk
1491

    
1492
      # Wipe newly added disk space
1493
      WipeDisks(self, instance,
1494
                disks=[(self.op.disk, disk, old_disk_size)])
1495

    
1496
    if self.op.wait_for_sync:
1497
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1498
      if disk_abort:
1499
        self.LogWarning("Disk syncing has not returned a good status; check"
1500
                        " the instance")
1501
      if instance.admin_state != constants.ADMINST_UP:
1502
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1503
    elif instance.admin_state != constants.ADMINST_UP:
1504
      self.LogWarning("Not shutting down the disk even if the instance is"
1505
                      " not supposed to be running because no wait for"
1506
                      " sync mode was requested")
1507

    
1508
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1509
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1510

    
1511

    
1512
class LUInstanceReplaceDisks(LogicalUnit):
1513
  """Replace the disks of an instance.
1514

1515
  """
1516
  HPATH = "mirrors-replace"
1517
  HTYPE = constants.HTYPE_INSTANCE
1518
  REQ_BGL = False
1519

    
1520
  def CheckArguments(self):
1521
    """Check arguments.
1522

1523
    """
1524
    remote_node = self.op.remote_node
1525
    ialloc = self.op.iallocator
1526
    if self.op.mode == constants.REPLACE_DISK_CHG:
1527
      if remote_node is None and ialloc is None:
1528
        raise errors.OpPrereqError("When changing the secondary either an"
1529
                                   " iallocator script must be used or the"
1530
                                   " new node given", errors.ECODE_INVAL)
1531
      else:
1532
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1533

    
1534
    elif remote_node is not None or ialloc is not None:
1535
      # Not replacing the secondary
1536
      raise errors.OpPrereqError("The iallocator and new node options can"
1537
                                 " only be used when changing the"
1538
                                 " secondary node", errors.ECODE_INVAL)
1539

    
1540
  def ExpandNames(self):
1541
    self._ExpandAndLockInstance()
1542

    
1543
    assert locking.LEVEL_NODE not in self.needed_locks
1544
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1545
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1546

    
1547
    assert self.op.iallocator is None or self.op.remote_node is None, \
1548
      "Conflicting options"
1549

    
1550
    if self.op.remote_node is not None:
1551
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1552

    
1553
      # Warning: do not remove the locking of the new secondary here
1554
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1555
      # currently it doesn't since parallel invocations of
1556
      # FindUnusedMinor will conflict
1557
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1558
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1559
    else:
1560
      self.needed_locks[locking.LEVEL_NODE] = []
1561
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1562

    
1563
      if self.op.iallocator is not None:
1564
        # iallocator will select a new node in the same group
1565
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1566
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1567

    
1568
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1569

    
1570
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1571
                                   self.op.iallocator, self.op.remote_node,
1572
                                   self.op.disks, self.op.early_release,
1573
                                   self.op.ignore_ipolicy)
1574

    
1575
    self.tasklets = [self.replacer]
1576

    
1577
  def DeclareLocks(self, level):
1578
    if level == locking.LEVEL_NODEGROUP:
1579
      assert self.op.remote_node is None
1580
      assert self.op.iallocator is not None
1581
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1582

    
1583
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1584
      # Lock all groups used by instance optimistically; this requires going
1585
      # via the node before it's locked, requiring verification later on
1586
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1587
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1588

    
1589
    elif level == locking.LEVEL_NODE:
1590
      if self.op.iallocator is not None:
1591
        assert self.op.remote_node is None
1592
        assert not self.needed_locks[locking.LEVEL_NODE]
1593
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1594

    
1595
        # Lock member nodes of all locked groups
1596
        self.needed_locks[locking.LEVEL_NODE] = \
1597
          [node_name
1598
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1599
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1600
      else:
1601
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1602

    
1603
        self._LockInstancesNodes()
1604

    
1605
    elif level == locking.LEVEL_NODE_RES:
1606
      # Reuse node locks
1607
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1608
        self.needed_locks[locking.LEVEL_NODE]
1609

    
1610
  def BuildHooksEnv(self):
1611
    """Build hooks env.
1612

1613
    This runs on the master, the primary and all the secondaries.
1614

1615
    """
1616
    instance = self.replacer.instance
1617
    env = {
1618
      "MODE": self.op.mode,
1619
      "NEW_SECONDARY": self.op.remote_node,
1620
      "OLD_SECONDARY": instance.secondary_nodes[0],
1621
      }
1622
    env.update(BuildInstanceHookEnvByObject(self, instance))
1623
    return env
1624

    
1625
  def BuildHooksNodes(self):
1626
    """Build hooks nodes.
1627

1628
    """
1629
    instance = self.replacer.instance
1630
    nl = [
1631
      self.cfg.GetMasterNode(),
1632
      instance.primary_node,
1633
      ]
1634
    if self.op.remote_node is not None:
1635
      nl.append(self.op.remote_node)
1636
    return nl, nl
1637

    
1638
  def CheckPrereq(self):
1639
    """Check prerequisites.
1640

1641
    """
1642
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1643
            self.op.iallocator is None)
1644

    
1645
    # Verify if node group locks are still correct
1646
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1647
    if owned_groups:
1648
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1649

    
1650
    return LogicalUnit.CheckPrereq(self)
1651

    
1652

    
1653
class LUInstanceActivateDisks(NoHooksLU):
1654
  """Bring up an instance's disks.
1655

1656
  """
1657
  REQ_BGL = False
1658

    
1659
  def ExpandNames(self):
1660
    self._ExpandAndLockInstance()
1661
    self.needed_locks[locking.LEVEL_NODE] = []
1662
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1663

    
1664
  def DeclareLocks(self, level):
1665
    if level == locking.LEVEL_NODE:
1666
      self._LockInstancesNodes()
1667

    
1668
  def CheckPrereq(self):
1669
    """Check prerequisites.
1670

1671
    This checks that the instance is in the cluster.
1672

1673
    """
1674
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1675
    assert self.instance is not None, \
1676
      "Cannot retrieve locked instance %s" % self.op.instance_name
1677
    CheckNodeOnline(self, self.instance.primary_node)
1678

    
1679
  def Exec(self, feedback_fn):
1680
    """Activate the disks.
1681

1682
    """
1683
    disks_ok, disks_info = \
1684
              AssembleInstanceDisks(self, self.instance,
1685
                                    ignore_size=self.op.ignore_size)
1686
    if not disks_ok:
1687
      raise errors.OpExecError("Cannot activate block devices")
1688

    
1689
    if self.op.wait_for_sync:
1690
      if not WaitForSync(self, self.instance):
1691
        raise errors.OpExecError("Some disks of the instance are degraded!")
1692

    
1693
    return disks_info
1694

    
1695

    
1696
class LUInstanceDeactivateDisks(NoHooksLU):
1697
  """Shutdown an instance's disks.
1698

1699
  """
1700
  REQ_BGL = False
1701

    
1702
  def ExpandNames(self):
1703
    self._ExpandAndLockInstance()
1704
    self.needed_locks[locking.LEVEL_NODE] = []
1705
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1706

    
1707
  def DeclareLocks(self, level):
1708
    if level == locking.LEVEL_NODE:
1709
      self._LockInstancesNodes()
1710

    
1711
  def CheckPrereq(self):
1712
    """Check prerequisites.
1713

1714
    This checks that the instance is in the cluster.
1715

1716
    """
1717
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1718
    assert self.instance is not None, \
1719
      "Cannot retrieve locked instance %s" % self.op.instance_name
1720

    
1721
  def Exec(self, feedback_fn):
1722
    """Deactivate the disks
1723

1724
    """
1725
    instance = self.instance
1726
    if self.op.force:
1727
      ShutdownInstanceDisks(self, instance)
1728
    else:
1729
      _SafeShutdownInstanceDisks(self, instance)
1730

    
1731

    
1732
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1733
                               ldisk=False):
1734
  """Check that mirrors are not degraded.
1735

1736
  @attention: The device has to be annotated already.
1737

1738
  The ldisk parameter, if True, will change the test from the
1739
  is_degraded attribute (which represents overall non-ok status for
1740
  the device(s)) to the ldisk (representing the local storage status).
1741

1742
  """
1743
  lu.cfg.SetDiskID(dev, node)
1744

    
1745
  result = True
1746

    
1747
  if on_primary or dev.AssembleOnSecondary():
1748
    rstats = lu.rpc.call_blockdev_find(node, dev)
1749
    msg = rstats.fail_msg
1750
    if msg:
1751
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1752
      result = False
1753
    elif not rstats.payload:
1754
      lu.LogWarning("Can't find disk on node %s", node)
1755
      result = False
1756
    else:
1757
      if ldisk:
1758
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1759
      else:
1760
        result = result and not rstats.payload.is_degraded
1761

    
1762
  if dev.children:
1763
    for child in dev.children:
1764
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1765
                                                     on_primary)
1766

    
1767
  return result
1768

    
1769

    
1770
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1771
  """Wrapper around L{_CheckDiskConsistencyInner}.
1772

1773
  """
1774
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1775
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1776
                                    ldisk=ldisk)
1777

    
1778

    
1779
def _BlockdevFind(lu, node, dev, instance):
1780
  """Wrapper around call_blockdev_find to annotate diskparams.
1781

1782
  @param lu: A reference to the lu object
1783
  @param node: The node to call out
1784
  @param dev: The device to find
1785
  @param instance: The instance object the device belongs to
1786
  @returns The result of the rpc call
1787

1788
  """
1789
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1790
  return lu.rpc.call_blockdev_find(node, disk)
1791

    
1792

    
1793
def _GenerateUniqueNames(lu, exts):
1794
  """Generate a suitable LV name.
1795

1796
  This will generate a logical volume name for the given instance.
1797

1798
  """
1799
  results = []
1800
  for val in exts:
1801
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1802
    results.append("%s%s" % (new_id, val))
1803
  return results
1804

    
1805

    
1806
class TLReplaceDisks(Tasklet):
1807
  """Replaces disks for an instance.
1808

1809
  Note: Locking is not within the scope of this class.
1810

1811
  """
1812
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1813
               disks, early_release, ignore_ipolicy):
1814
    """Initializes this class.
1815

1816
    """
1817
    Tasklet.__init__(self, lu)
1818

    
1819
    # Parameters
1820
    self.instance_name = instance_name
1821
    self.mode = mode
1822
    self.iallocator_name = iallocator_name
1823
    self.remote_node = remote_node
1824
    self.disks = disks
1825
    self.early_release = early_release
1826
    self.ignore_ipolicy = ignore_ipolicy
1827

    
1828
    # Runtime data
1829
    self.instance = None
1830
    self.new_node = None
1831
    self.target_node = None
1832
    self.other_node = None
1833
    self.remote_node_info = None
1834
    self.node_secondary_ip = None
1835

    
1836
  @staticmethod
1837
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1838
    """Compute a new secondary node using an IAllocator.
1839

1840
    """
1841
    req = iallocator.IAReqRelocate(name=instance_name,
1842
                                   relocate_from=list(relocate_from))
1843
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1844

    
1845
    ial.Run(iallocator_name)
1846

    
1847
    if not ial.success:
1848
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1849
                                 " %s" % (iallocator_name, ial.info),
1850
                                 errors.ECODE_NORES)
1851

    
1852
    remote_node_name = ial.result[0]
1853

    
1854
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1855
               instance_name, remote_node_name)
1856

    
1857
    return remote_node_name
1858

    
1859
  def _FindFaultyDisks(self, node_name):
1860
    """Wrapper for L{FindFaultyInstanceDisks}.
1861

1862
    """
1863
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1864
                                   node_name, True)
1865

    
1866
  def _CheckDisksActivated(self, instance):
1867
    """Checks if the instance disks are activated.
1868

1869
    @param instance: The instance to check disks
1870
    @return: True if they are activated, False otherwise
1871

1872
    """
1873
    nodes = instance.all_nodes
1874

    
1875
    for idx, dev in enumerate(instance.disks):
1876
      for node in nodes:
1877
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1878
        self.cfg.SetDiskID(dev, node)
1879

    
1880
        result = _BlockdevFind(self, node, dev, instance)
1881

    
1882
        if result.offline:
1883
          continue
1884
        elif result.fail_msg or not result.payload:
1885
          return False
1886

    
1887
    return True
1888

    
1889
  def CheckPrereq(self):
1890
    """Check prerequisites.
1891

1892
    This checks that the instance is in the cluster.
1893

1894
    """
1895
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1896
    assert instance is not None, \
1897
      "Cannot retrieve locked instance %s" % self.instance_name
1898

    
1899
    if instance.disk_template != constants.DT_DRBD8:
1900
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1901
                                 " instances", errors.ECODE_INVAL)
1902

    
1903
    if len(instance.secondary_nodes) != 1:
1904
      raise errors.OpPrereqError("The instance has a strange layout,"
1905
                                 " expected one secondary but found %d" %
1906
                                 len(instance.secondary_nodes),
1907
                                 errors.ECODE_FAULT)
1908

    
1909
    instance = self.instance
1910
    secondary_node = instance.secondary_nodes[0]
1911

    
1912
    if self.iallocator_name is None:
1913
      remote_node = self.remote_node
1914
    else:
1915
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1916
                                       instance.name, instance.secondary_nodes)
1917

    
1918
    if remote_node is None:
1919
      self.remote_node_info = None
1920
    else:
1921
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1922
             "Remote node '%s' is not locked" % remote_node
1923

    
1924
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1925
      assert self.remote_node_info is not None, \
1926
        "Cannot retrieve locked node %s" % remote_node
1927

    
1928
    if remote_node == self.instance.primary_node:
1929
      raise errors.OpPrereqError("The specified node is the primary node of"
1930
                                 " the instance", errors.ECODE_INVAL)
1931

    
1932
    if remote_node == secondary_node:
1933
      raise errors.OpPrereqError("The specified node is already the"
1934
                                 " secondary node of the instance",
1935
                                 errors.ECODE_INVAL)
1936

    
1937
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1938
                                    constants.REPLACE_DISK_CHG):
1939
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1940
                                 errors.ECODE_INVAL)
1941

    
1942
    if self.mode == constants.REPLACE_DISK_AUTO:
1943
      if not self._CheckDisksActivated(instance):
1944
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1945
                                   " first" % self.instance_name,
1946
                                   errors.ECODE_STATE)
1947
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1948
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1949

    
1950
      if faulty_primary and faulty_secondary:
1951
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1952
                                   " one node and can not be repaired"
1953
                                   " automatically" % self.instance_name,
1954
                                   errors.ECODE_STATE)
1955

    
1956
      if faulty_primary:
1957
        self.disks = faulty_primary
1958
        self.target_node = instance.primary_node
1959
        self.other_node = secondary_node
1960
        check_nodes = [self.target_node, self.other_node]
1961
      elif faulty_secondary:
1962
        self.disks = faulty_secondary
1963
        self.target_node = secondary_node
1964
        self.other_node = instance.primary_node
1965
        check_nodes = [self.target_node, self.other_node]
1966
      else:
1967
        self.disks = []
1968
        check_nodes = []
1969

    
1970
    else:
1971
      # Non-automatic modes
1972
      if self.mode == constants.REPLACE_DISK_PRI:
1973
        self.target_node = instance.primary_node
1974
        self.other_node = secondary_node
1975
        check_nodes = [self.target_node, self.other_node]
1976

    
1977
      elif self.mode == constants.REPLACE_DISK_SEC:
1978
        self.target_node = secondary_node
1979
        self.other_node = instance.primary_node
1980
        check_nodes = [self.target_node, self.other_node]
1981

    
1982
      elif self.mode == constants.REPLACE_DISK_CHG:
1983
        self.new_node = remote_node
1984
        self.other_node = instance.primary_node
1985
        self.target_node = secondary_node
1986
        check_nodes = [self.new_node, self.other_node]
1987

    
1988
        CheckNodeNotDrained(self.lu, remote_node)
1989
        CheckNodeVmCapable(self.lu, remote_node)
1990

    
1991
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1992
        assert old_node_info is not None
1993
        if old_node_info.offline and not self.early_release:
1994
          # doesn't make sense to delay the release
1995
          self.early_release = True
1996
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1997
                          " early-release mode", secondary_node)
1998

    
1999
      else:
2000
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2001
                                     self.mode)
2002

    
2003
      # If not specified all disks should be replaced
2004
      if not self.disks:
2005
        self.disks = range(len(self.instance.disks))
2006

    
2007
    # TODO: This is ugly, but right now we can't distinguish between internal
2008
    # submitted opcode and external one. We should fix that.
2009
    if self.remote_node_info:
2010
      # We change the node, lets verify it still meets instance policy
2011
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2012
      cluster = self.cfg.GetClusterInfo()
2013
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2014
                                                              new_group_info)
2015
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2016
                             self.cfg, ignore=self.ignore_ipolicy)
2017

    
2018
    for node in check_nodes:
2019
      CheckNodeOnline(self.lu, node)
2020

    
2021
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2022
                                                          self.other_node,
2023
                                                          self.target_node]
2024
                              if node_name is not None)
2025

    
2026
    # Release unneeded node and node resource locks
2027
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2028
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2029
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2030

    
2031
    # Release any owned node group
2032
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2033

    
2034
    # Check whether disks are valid
2035
    for disk_idx in self.disks:
2036
      instance.FindDisk(disk_idx)
2037

    
2038
    # Get secondary node IP addresses
2039
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2040
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2041

    
2042
  def Exec(self, feedback_fn):
2043
    """Execute disk replacement.
2044

2045
    This dispatches the disk replacement to the appropriate handler.
2046

2047
    """
2048
    if __debug__:
2049
      # Verify owned locks before starting operation
2050
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2051
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2052
          ("Incorrect node locks, owning %s, expected %s" %
2053
           (owned_nodes, self.node_secondary_ip.keys()))
2054
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2055
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2056
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2057

    
2058
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2059
      assert list(owned_instances) == [self.instance_name], \
2060
          "Instance '%s' not locked" % self.instance_name
2061

    
2062
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2063
          "Should not own any node group lock at this point"
2064

    
2065
    if not self.disks:
2066
      feedback_fn("No disks need replacement for instance '%s'" %
2067
                  self.instance.name)
2068
      return
2069

    
2070
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2071
                (utils.CommaJoin(self.disks), self.instance.name))
2072
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2073
    feedback_fn("Current seconary node: %s" %
2074
                utils.CommaJoin(self.instance.secondary_nodes))
2075

    
2076
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2077

    
2078
    # Activate the instance disks if we're replacing them on a down instance
2079
    if activate_disks:
2080
      StartInstanceDisks(self.lu, self.instance, True)
2081

    
2082
    try:
2083
      # Should we replace the secondary node?
2084
      if self.new_node is not None:
2085
        fn = self._ExecDrbd8Secondary
2086
      else:
2087
        fn = self._ExecDrbd8DiskOnly
2088

    
2089
      result = fn(feedback_fn)
2090
    finally:
2091
      # Deactivate the instance disks if we're replacing them on a
2092
      # down instance
2093
      if activate_disks:
2094
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2095

    
2096
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2097

    
2098
    if __debug__:
2099
      # Verify owned locks
2100
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2101
      nodes = frozenset(self.node_secondary_ip)
2102
      assert ((self.early_release and not owned_nodes) or
2103
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2104
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2105
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2106

    
2107
    return result
2108

    
2109
  def _CheckVolumeGroup(self, nodes):
2110
    self.lu.LogInfo("Checking volume groups")
2111

    
2112
    vgname = self.cfg.GetVGName()
2113

    
2114
    # Make sure volume group exists on all involved nodes
2115
    results = self.rpc.call_vg_list(nodes)
2116
    if not results:
2117
      raise errors.OpExecError("Can't list volume groups on the nodes")
2118

    
2119
    for node in nodes:
2120
      res = results[node]
2121
      res.Raise("Error checking node %s" % node)
2122
      if vgname not in res.payload:
2123
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2124
                                 (vgname, node))
2125

    
2126
  def _CheckDisksExistence(self, nodes):
2127
    # Check disk existence
2128
    for idx, dev in enumerate(self.instance.disks):
2129
      if idx not in self.disks:
2130
        continue
2131

    
2132
      for node in nodes:
2133
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2134
        self.cfg.SetDiskID(dev, node)
2135

    
2136
        result = _BlockdevFind(self, node, dev, self.instance)
2137

    
2138
        msg = result.fail_msg
2139
        if msg or not result.payload:
2140
          if not msg:
2141
            msg = "disk not found"
2142
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2143
                                   (idx, node, msg))
2144

    
2145
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2146
    for idx, dev in enumerate(self.instance.disks):
2147
      if idx not in self.disks:
2148
        continue
2149

    
2150
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2151
                      (idx, node_name))
2152

    
2153
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2154
                                  on_primary, ldisk=ldisk):
2155
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2156
                                 " replace disks for instance %s" %
2157
                                 (node_name, self.instance.name))
2158

    
2159
  def _CreateNewStorage(self, node_name):
2160
    """Create new storage on the primary or secondary node.
2161

2162
    This is only used for same-node replaces, not for changing the
2163
    secondary node, hence we don't want to modify the existing disk.
2164

2165
    """
2166
    iv_names = {}
2167

    
2168
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2169
    for idx, dev in enumerate(disks):
2170
      if idx not in self.disks:
2171
        continue
2172

    
2173
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2174

    
2175
      self.cfg.SetDiskID(dev, node_name)
2176

    
2177
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2178
      names = _GenerateUniqueNames(self.lu, lv_names)
2179

    
2180
      (data_disk, meta_disk) = dev.children
2181
      vg_data = data_disk.logical_id[0]
2182
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2183
                             logical_id=(vg_data, names[0]),
2184
                             params=data_disk.params)
2185
      vg_meta = meta_disk.logical_id[0]
2186
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2187
                             size=constants.DRBD_META_SIZE,
2188
                             logical_id=(vg_meta, names[1]),
2189
                             params=meta_disk.params)
2190

    
2191
      new_lvs = [lv_data, lv_meta]
2192
      old_lvs = [child.Copy() for child in dev.children]
2193
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2194
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2195

    
2196
      # we pass force_create=True to force the LVM creation
2197
      for new_lv in new_lvs:
2198
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2199
                             GetInstanceInfoText(self.instance), False,
2200
                             excl_stor)
2201

    
2202
    return iv_names
2203

    
2204
  def _CheckDevices(self, node_name, iv_names):
2205
    for name, (dev, _, _) in iv_names.iteritems():
2206
      self.cfg.SetDiskID(dev, node_name)
2207

    
2208
      result = _BlockdevFind(self, node_name, dev, self.instance)
2209

    
2210
      msg = result.fail_msg
2211
      if msg or not result.payload:
2212
        if not msg:
2213
          msg = "disk not found"
2214
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2215
                                 (name, msg))
2216

    
2217
      if result.payload.is_degraded:
2218
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2219

    
2220
  def _RemoveOldStorage(self, node_name, iv_names):
2221
    for name, (_, old_lvs, _) in iv_names.iteritems():
2222
      self.lu.LogInfo("Remove logical volumes for %s", name)
2223

    
2224
      for lv in old_lvs:
2225
        self.cfg.SetDiskID(lv, node_name)
2226

    
2227
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2228
        if msg:
2229
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2230
                             hint="remove unused LVs manually")
2231

    
2232
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2233
    """Replace a disk on the primary or secondary for DRBD 8.
2234

2235
    The algorithm for replace is quite complicated:
2236

2237
      1. for each disk to be replaced:
2238

2239
        1. create new LVs on the target node with unique names
2240
        1. detach old LVs from the drbd device
2241
        1. rename old LVs to name_replaced.<time_t>
2242
        1. rename new LVs to old LVs
2243
        1. attach the new LVs (with the old names now) to the drbd device
2244

2245
      1. wait for sync across all devices
2246

2247
      1. for each modified disk:
2248

2249
        1. remove old LVs (which have the name name_replaces.<time_t>)
2250

2251
    Failures are not very well handled.
2252

2253
    """
2254
    steps_total = 6
2255

    
2256
    # Step: check device activation
2257
    self.lu.LogStep(1, steps_total, "Check device existence")
2258
    self._CheckDisksExistence([self.other_node, self.target_node])
2259
    self._CheckVolumeGroup([self.target_node, self.other_node])
2260

    
2261
    # Step: check other node consistency
2262
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2263
    self._CheckDisksConsistency(self.other_node,
2264
                                self.other_node == self.instance.primary_node,
2265
                                False)
2266

    
2267
    # Step: create new storage
2268
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2269
    iv_names = self._CreateNewStorage(self.target_node)
2270

    
2271
    # Step: for each lv, detach+rename*2+attach
2272
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2273
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2274
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2275

    
2276
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2277
                                                     old_lvs)
2278
      result.Raise("Can't detach drbd from local storage on node"
2279
                   " %s for device %s" % (self.target_node, dev.iv_name))
2280
      #dev.children = []
2281
      #cfg.Update(instance)
2282

    
2283
      # ok, we created the new LVs, so now we know we have the needed
2284
      # storage; as such, we proceed on the target node to rename
2285
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2286
      # using the assumption that logical_id == physical_id (which in
2287
      # turn is the unique_id on that node)
2288

    
2289
      # FIXME(iustin): use a better name for the replaced LVs
2290
      temp_suffix = int(time.time())
2291
      ren_fn = lambda d, suff: (d.physical_id[0],
2292
                                d.physical_id[1] + "_replaced-%s" % suff)
2293

    
2294
      # Build the rename list based on what LVs exist on the node
2295
      rename_old_to_new = []
2296
      for to_ren in old_lvs:
2297
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2298
        if not result.fail_msg and result.payload:
2299
          # device exists
2300
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2301

    
2302
      self.lu.LogInfo("Renaming the old LVs on the target node")
2303
      result = self.rpc.call_blockdev_rename(self.target_node,
2304
                                             rename_old_to_new)
2305
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2306

    
2307
      # Now we rename the new LVs to the old LVs
2308
      self.lu.LogInfo("Renaming the new LVs on the target node")
2309
      rename_new_to_old = [(new, old.physical_id)
2310
                           for old, new in zip(old_lvs, new_lvs)]
2311
      result = self.rpc.call_blockdev_rename(self.target_node,
2312
                                             rename_new_to_old)
2313
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2314

    
2315
      # Intermediate steps of in memory modifications
2316
      for old, new in zip(old_lvs, new_lvs):
2317
        new.logical_id = old.logical_id
2318
        self.cfg.SetDiskID(new, self.target_node)
2319

    
2320
      # We need to modify old_lvs so that removal later removes the
2321
      # right LVs, not the newly added ones; note that old_lvs is a
2322
      # copy here
2323
      for disk in old_lvs:
2324
        disk.logical_id = ren_fn(disk, temp_suffix)
2325
        self.cfg.SetDiskID(disk, self.target_node)
2326

    
2327
      # Now that the new lvs have the old name, we can add them to the device
2328
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2329
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2330
                                                  (dev, self.instance), new_lvs)
2331
      msg = result.fail_msg
2332
      if msg:
2333
        for new_lv in new_lvs:
2334
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2335
                                               new_lv).fail_msg
2336
          if msg2:
2337
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2338
                               hint=("cleanup manually the unused logical"
2339
                                     "volumes"))
2340
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2341

    
2342
    cstep = itertools.count(5)
2343

    
2344
    if self.early_release:
2345
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2346
      self._RemoveOldStorage(self.target_node, iv_names)
2347
      # TODO: Check if releasing locks early still makes sense
2348
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2349
    else:
2350
      # Release all resource locks except those used by the instance
2351
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2352
                   keep=self.node_secondary_ip.keys())
2353

    
2354
    # Release all node locks while waiting for sync
2355
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2356

    
2357
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2358
    # shutdown in the caller into consideration.
2359

    
2360
    # Wait for sync
2361
    # This can fail as the old devices are degraded and _WaitForSync
2362
    # does a combined result over all disks, so we don't check its return value
2363
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2364
    WaitForSync(self.lu, self.instance)
2365

    
2366
    # Check all devices manually
2367
    self._CheckDevices(self.instance.primary_node, iv_names)
2368

    
2369
    # Step: remove old storage
2370
    if not self.early_release:
2371
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2372
      self._RemoveOldStorage(self.target_node, iv_names)
2373

    
2374
  def _ExecDrbd8Secondary(self, feedback_fn):
2375
    """Replace the secondary node for DRBD 8.
2376

2377
    The algorithm for replace is quite complicated:
2378
      - for all disks of the instance:
2379
        - create new LVs on the new node with same names
2380
        - shutdown the drbd device on the old secondary
2381
        - disconnect the drbd network on the primary
2382
        - create the drbd device on the new secondary
2383
        - network attach the drbd on the primary, using an artifice:
2384
          the drbd code for Attach() will connect to the network if it
2385
          finds a device which is connected to the good local disks but
2386
          not network enabled
2387
      - wait for sync across all devices
2388
      - remove all disks from the old secondary
2389

2390
    Failures are not very well handled.
2391

2392
    """
2393
    steps_total = 6
2394

    
2395
    pnode = self.instance.primary_node
2396

    
2397
    # Step: check device activation
2398
    self.lu.LogStep(1, steps_total, "Check device existence")
2399
    self._CheckDisksExistence([self.instance.primary_node])
2400
    self._CheckVolumeGroup([self.instance.primary_node])
2401

    
2402
    # Step: check other node consistency
2403
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2404
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2405

    
2406
    # Step: create new storage
2407
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2408
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2409
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2410
    for idx, dev in enumerate(disks):
2411
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2412
                      (self.new_node, idx))
2413
      # we pass force_create=True to force LVM creation
2414
      for new_lv in dev.children:
2415
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2416
                             True, GetInstanceInfoText(self.instance), False,
2417
                             excl_stor)
2418

    
2419
    # Step 4: dbrd minors and drbd setups changes
2420
    # after this, we must manually remove the drbd minors on both the
2421
    # error and the success paths
2422
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2423
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2424
                                         for dev in self.instance.disks],
2425
                                        self.instance.name)
2426
    logging.debug("Allocated minors %r", minors)
2427

    
2428
    iv_names = {}
2429
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2430
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2431
                      (self.new_node, idx))
2432
      # create new devices on new_node; note that we create two IDs:
2433
      # one without port, so the drbd will be activated without
2434
      # networking information on the new node at this stage, and one
2435
      # with network, for the latter activation in step 4
2436
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2437
      if self.instance.primary_node == o_node1:
2438
        p_minor = o_minor1
2439
      else:
2440
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2441
        p_minor = o_minor2
2442

    
2443
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2444
                      p_minor, new_minor, o_secret)
2445
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2446
                    p_minor, new_minor, o_secret)
2447

    
2448
      iv_names[idx] = (dev, dev.children, new_net_id)
2449
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2450
                    new_net_id)
2451
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2452
                              logical_id=new_alone_id,
2453
                              children=dev.children,
2454
                              size=dev.size,
2455
                              params={})
2456
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2457
                                            self.cfg)
2458
      try:
2459
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2460
                             anno_new_drbd,
2461
                             GetInstanceInfoText(self.instance), False,
2462
                             excl_stor)
2463
      except errors.GenericError:
2464
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2465
        raise
2466

    
2467
    # We have new devices, shutdown the drbd on the old secondary
2468
    for idx, dev in enumerate(self.instance.disks):
2469
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2470
      self.cfg.SetDiskID(dev, self.target_node)
2471
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2472
                                            (dev, self.instance)).fail_msg
2473
      if msg:
2474
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2475
                           "node: %s" % (idx, msg),
2476
                           hint=("Please cleanup this device manually as"
2477
                                 " soon as possible"))
2478

    
2479
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2480
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2481
                                               self.instance.disks)[pnode]
2482

    
2483
    msg = result.fail_msg
2484
    if msg:
2485
      # detaches didn't succeed (unlikely)
2486
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2487
      raise errors.OpExecError("Can't detach the disks from the network on"
2488
                               " old node: %s" % (msg,))
2489

    
2490
    # if we managed to detach at least one, we update all the disks of
2491
    # the instance to point to the new secondary
2492
    self.lu.LogInfo("Updating instance configuration")
2493
    for dev, _, new_logical_id in iv_names.itervalues():
2494
      dev.logical_id = new_logical_id
2495
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2496

    
2497
    self.cfg.Update(self.instance, feedback_fn)
2498

    
2499
    # Release all node locks (the configuration has been updated)
2500
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2501

    
2502
    # and now perform the drbd attach
2503
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2504
                    " (standalone => connected)")
2505
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2506
                                            self.new_node],
2507
                                           self.node_secondary_ip,
2508
                                           (self.instance.disks, self.instance),
2509
                                           self.instance.name,
2510
                                           False)
2511
    for to_node, to_result in result.items():
2512
      msg = to_result.fail_msg
2513
      if msg:
2514
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2515
                           to_node, msg,
2516
                           hint=("please do a gnt-instance info to see the"
2517
                                 " status of disks"))
2518

    
2519
    cstep = itertools.count(5)
2520

    
2521
    if self.early_release:
2522
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2523
      self._RemoveOldStorage(self.target_node, iv_names)
2524
      # TODO: Check if releasing locks early still makes sense
2525
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2526
    else:
2527
      # Release all resource locks except those used by the instance
2528
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2529
                   keep=self.node_secondary_ip.keys())
2530

    
2531
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2532
    # shutdown in the caller into consideration.
2533

    
2534
    # Wait for sync
2535
    # This can fail as the old devices are degraded and _WaitForSync
2536
    # does a combined result over all disks, so we don't check its return value
2537
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2538
    WaitForSync(self.lu, self.instance)
2539

    
2540
    # Check all devices manually
2541
    self._CheckDevices(self.instance.primary_node, iv_names)
2542

    
2543
    # Step: remove old storage
2544
    if not self.early_release:
2545
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2546
      self._RemoveOldStorage(self.target_node, iv_names)