Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ b54ecf12

History | View | Annotate | Download (93.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    for key in [
347
      constants.IDISK_METAVG,
348
      constants.IDISK_ADOPT,
349
      constants.IDISK_SPINDLES,
350
      ]:
351
      if key in disk:
352
        new_disk[key] = disk[key]
353

    
354
    # For extstorage, demand the `provider' option and add any
355
    # additional parameters (ext-params) to the dict
356
    if op.disk_template == constants.DT_EXT:
357
      if ext_provider:
358
        new_disk[constants.IDISK_PROVIDER] = ext_provider
359
        for key in disk:
360
          if key not in constants.IDISK_PARAMS:
361
            new_disk[key] = disk[key]
362
      else:
363
        raise errors.OpPrereqError("Missing provider for template '%s'" %
364
                                   constants.DT_EXT, errors.ECODE_INVAL)
365

    
366
    disks.append(new_disk)
367

    
368
  return disks
369

    
370

    
371
def CheckRADOSFreeSpace():
372
  """Compute disk size requirements inside the RADOS cluster.
373

374
  """
375
  # For the RADOS cluster we assume there is always enough space.
376
  pass
377

    
378

    
379
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380
                         iv_name, p_minor, s_minor):
381
  """Generate a drbd8 device complete with its children.
382

383
  """
384
  assert len(vgnames) == len(names) == 2
385
  port = lu.cfg.AllocatePort()
386
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387

    
388
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389
                          logical_id=(vgnames[0], names[0]),
390
                          params={})
391
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
393
                          size=constants.DRBD_META_SIZE,
394
                          logical_id=(vgnames[1], names[1]),
395
                          params={})
396
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398
                          logical_id=(primary, secondary, port,
399
                                      p_minor, s_minor,
400
                                      shared_secret),
401
                          children=[dev_data, dev_meta],
402
                          iv_name=iv_name, params={})
403
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404
  return drbd_dev
405

    
406

    
407
def GenerateDiskTemplate(
408
  lu, template_name, instance_name, primary_node, secondary_nodes,
409
  disk_info, file_storage_dir, file_driver, base_index,
410
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412
  """Generate the entire disk layout for a given template type.
413

414
  """
415
  vgname = lu.cfg.GetVGName()
416
  disk_count = len(disk_info)
417
  disks = []
418

    
419
  if template_name == constants.DT_DISKLESS:
420
    pass
421
  elif template_name == constants.DT_DRBD8:
422
    if len(secondary_nodes) != 1:
423
      raise errors.ProgrammerError("Wrong template configuration")
424
    remote_node = secondary_nodes[0]
425
    minors = lu.cfg.AllocateDRBDMinor(
426
      [primary_node, remote_node] * len(disk_info), instance_name)
427

    
428
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429
                                                       full_disk_params)
430
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431

    
432
    names = []
433
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434
                                               for i in range(disk_count)]):
435
      names.append(lv_prefix + "_data")
436
      names.append(lv_prefix + "_meta")
437
    for idx, disk in enumerate(disk_info):
438
      disk_index = idx + base_index
439
      data_vg = disk.get(constants.IDISK_VG, vgname)
440
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442
                                      disk[constants.IDISK_SIZE],
443
                                      [data_vg, meta_vg],
444
                                      names[idx * 2:idx * 2 + 2],
445
                                      "disk/%d" % disk_index,
446
                                      minors[idx * 2], minors[idx * 2 + 1])
447
      disk_dev.mode = disk[constants.IDISK_MODE]
448
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
449
      disks.append(disk_dev)
450
  else:
451
    if secondary_nodes:
452
      raise errors.ProgrammerError("Wrong template configuration")
453

    
454
    if template_name == constants.DT_FILE:
455
      _req_file_storage()
456
    elif template_name == constants.DT_SHARED_FILE:
457
      _req_shr_file_storage()
458

    
459
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460
    if name_prefix is None:
461
      names = None
462
    else:
463
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464
                                        (name_prefix, base_index + i)
465
                                        for i in range(disk_count)])
466

    
467
    if template_name == constants.DT_PLAIN:
468

    
469
      def logical_id_fn(idx, _, disk):
470
        vg = disk.get(constants.IDISK_VG, vgname)
471
        return (vg, names[idx])
472

    
473
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474
      logical_id_fn = \
475
        lambda _, disk_index, disk: (file_driver,
476
                                     "%s/disk%d" % (file_storage_dir,
477
                                                    disk_index))
478
    elif template_name == constants.DT_BLOCK:
479
      logical_id_fn = \
480
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481
                                       disk[constants.IDISK_ADOPT])
482
    elif template_name == constants.DT_RBD:
483
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484
    elif template_name == constants.DT_EXT:
485
      def logical_id_fn(idx, _, disk):
486
        provider = disk.get(constants.IDISK_PROVIDER, None)
487
        if provider is None:
488
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489
                                       " not found", constants.DT_EXT,
490
                                       constants.IDISK_PROVIDER)
491
        return (provider, names[idx])
492
    else:
493
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494

    
495
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496

    
497
    for idx, disk in enumerate(disk_info):
498
      params = {}
499
      # Only for the Ext template add disk_info to params
500
      if template_name == constants.DT_EXT:
501
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502
        for key in disk:
503
          if key not in constants.IDISK_PARAMS:
504
            params[key] = disk[key]
505
      disk_index = idx + base_index
506
      size = disk[constants.IDISK_SIZE]
507
      feedback_fn("* disk %s, size %s" %
508
                  (disk_index, utils.FormatUnit(size, "h")))
509
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
510
                              logical_id=logical_id_fn(idx, disk_index, disk),
511
                              iv_name="disk/%d" % disk_index,
512
                              mode=disk[constants.IDISK_MODE],
513
                              params=params,
514
                              spindles=disk.get(constants.IDISK_SPINDLES))
515
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
516
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517
      disks.append(disk_dev)
518

    
519
  return disks
520

    
521

    
522
def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523
  """Check the presence of the spindle options with exclusive_storage.
524

525
  @type diskdict: dict
526
  @param diskdict: disk parameters
527
  @type es_flag: bool
528
  @param es_flag: the effective value of the exlusive_storage flag
529
  @raise errors.OpPrereqError when spindles are given and they should not
530

531
  """
532
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533
      diskdict[constants.IDISK_SPINDLES] is not None):
534
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535
                               " when exclusive storage is not active",
536
                               errors.ECODE_INVAL)
537

    
538

    
539
class LUInstanceRecreateDisks(LogicalUnit):
540
  """Recreate an instance's missing disks.
541

542
  """
543
  HPATH = "instance-recreate-disks"
544
  HTYPE = constants.HTYPE_INSTANCE
545
  REQ_BGL = False
546

    
547
  _MODIFYABLE = compat.UniqueFrozenset([
548
    constants.IDISK_SIZE,
549
    constants.IDISK_MODE,
550
    constants.IDISK_SPINDLES,
551
    ])
552

    
553
  # New or changed disk parameters may have different semantics
554
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555
    constants.IDISK_ADOPT,
556

    
557
    # TODO: Implement support changing VG while recreating
558
    constants.IDISK_VG,
559
    constants.IDISK_METAVG,
560
    constants.IDISK_PROVIDER,
561
    constants.IDISK_NAME,
562
    ]))
563

    
564
  def _RunAllocator(self):
565
    """Run the allocator based on input opcode.
566

567
    """
568
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569

    
570
    # FIXME
571
    # The allocator should actually run in "relocate" mode, but current
572
    # allocators don't support relocating all the nodes of an instance at
573
    # the same time. As a workaround we use "allocate" mode, but this is
574
    # suboptimal for two reasons:
575
    # - The instance name passed to the allocator is present in the list of
576
    #   existing instances, so there could be a conflict within the
577
    #   internal structures of the allocator. This doesn't happen with the
578
    #   current allocators, but it's a liability.
579
    # - The allocator counts the resources used by the instance twice: once
580
    #   because the instance exists already, and once because it tries to
581
    #   allocate a new instance.
582
    # The allocator could choose some of the nodes on which the instance is
583
    # running, but that's not a problem. If the instance nodes are broken,
584
    # they should be already be marked as drained or offline, and hence
585
    # skipped by the allocator. If instance disks have been lost for other
586
    # reasons, then recreating the disks on the same nodes should be fine.
587
    disk_template = self.instance.disk_template
588
    spindle_use = be_full[constants.BE_SPINDLE_USE]
589
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590
                                        disk_template=disk_template,
591
                                        tags=list(self.instance.GetTags()),
592
                                        os=self.instance.os,
593
                                        nics=[{}],
594
                                        vcpus=be_full[constants.BE_VCPUS],
595
                                        memory=be_full[constants.BE_MAXMEM],
596
                                        spindle_use=spindle_use,
597
                                        disks=[{constants.IDISK_SIZE: d.size,
598
                                                constants.IDISK_MODE: d.mode}
599
                                               for d in self.instance.disks],
600
                                        hypervisor=self.instance.hypervisor,
601
                                        node_whitelist=None)
602
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
603

    
604
    ial.Run(self.op.iallocator)
605

    
606
    assert req.RequiredNodes() == len(self.instance.all_nodes)
607

    
608
    if not ial.success:
609
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610
                                 " %s" % (self.op.iallocator, ial.info),
611
                                 errors.ECODE_NORES)
612

    
613
    self.op.nodes = ial.result
614
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615
                 self.op.instance_name, self.op.iallocator,
616
                 utils.CommaJoin(ial.result))
617

    
618
  def CheckArguments(self):
619
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620
      # Normalize and convert deprecated list of disk indices
621
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
622

    
623
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
624
    if duplicates:
625
      raise errors.OpPrereqError("Some disks have been specified more than"
626
                                 " once: %s" % utils.CommaJoin(duplicates),
627
                                 errors.ECODE_INVAL)
628

    
629
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630
    # when neither iallocator nor nodes are specified
631
    if self.op.iallocator or self.op.nodes:
632
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
633

    
634
    for (idx, params) in self.op.disks:
635
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
637
      if unsupported:
638
        raise errors.OpPrereqError("Parameters for disk %s try to change"
639
                                   " unmodifyable parameter(s): %s" %
640
                                   (idx, utils.CommaJoin(unsupported)),
641
                                   errors.ECODE_INVAL)
642

    
643
  def ExpandNames(self):
644
    self._ExpandAndLockInstance()
645
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
646

    
647
    if self.op.nodes:
648
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
650
    else:
651
      self.needed_locks[locking.LEVEL_NODE] = []
652
      if self.op.iallocator:
653
        # iallocator will select a new node in the same group
654
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
655
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
656

    
657
    self.needed_locks[locking.LEVEL_NODE_RES] = []
658

    
659
  def DeclareLocks(self, level):
660
    if level == locking.LEVEL_NODEGROUP:
661
      assert self.op.iallocator is not None
662
      assert not self.op.nodes
663
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
665
      # Lock the primary group used by the instance optimistically; this
666
      # requires going via the node before it's locked, requiring
667
      # verification later on
668
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
669
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
670

    
671
    elif level == locking.LEVEL_NODE:
672
      # If an allocator is used, then we lock all the nodes in the current
673
      # instance group, as we don't know yet which ones will be selected;
674
      # if we replace the nodes without using an allocator, locks are
675
      # already declared in ExpandNames; otherwise, we need to lock all the
676
      # instance nodes for disk re-creation
677
      if self.op.iallocator:
678
        assert not self.op.nodes
679
        assert not self.needed_locks[locking.LEVEL_NODE]
680
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
681

    
682
        # Lock member nodes of the group of the primary node
683
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684
          self.needed_locks[locking.LEVEL_NODE].extend(
685
            self.cfg.GetNodeGroup(group_uuid).members)
686

    
687
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688
      elif not self.op.nodes:
689
        self._LockInstancesNodes(primary_only=False)
690
    elif level == locking.LEVEL_NODE_RES:
691
      # Copy node locks
692
      self.needed_locks[locking.LEVEL_NODE_RES] = \
693
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
694

    
695
  def BuildHooksEnv(self):
696
    """Build hooks env.
697

698
    This runs on master, primary and secondary nodes of the instance.
699

700
    """
701
    return BuildInstanceHookEnvByObject(self, self.instance)
702

    
703
  def BuildHooksNodes(self):
704
    """Build hooks nodes.
705

706
    """
707
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
708
    return (nl, nl)
709

    
710
  def CheckPrereq(self):
711
    """Check prerequisites.
712

713
    This checks that the instance is in the cluster and is not running.
714

715
    """
716
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717
    assert instance is not None, \
718
      "Cannot retrieve locked instance %s" % self.op.instance_name
719
    if self.op.nodes:
720
      if len(self.op.nodes) != len(instance.all_nodes):
721
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722
                                   " %d replacement nodes were specified" %
723
                                   (instance.name, len(instance.all_nodes),
724
                                    len(self.op.nodes)),
725
                                   errors.ECODE_INVAL)
726
      assert instance.disk_template != constants.DT_DRBD8 or \
727
             len(self.op.nodes) == 2
728
      assert instance.disk_template != constants.DT_PLAIN or \
729
             len(self.op.nodes) == 1
730
      primary_node = self.op.nodes[0]
731
    else:
732
      primary_node = instance.primary_node
733
    if not self.op.iallocator:
734
      CheckNodeOnline(self, primary_node)
735

    
736
    if instance.disk_template == constants.DT_DISKLESS:
737
      raise errors.OpPrereqError("Instance '%s' has no disks" %
738
                                 self.op.instance_name, errors.ECODE_INVAL)
739

    
740
    # Verify if node group locks are still correct
741
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
742
    if owned_groups:
743
      # Node group locks are acquired only for the primary node (and only
744
      # when the allocator is used)
745
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
746
                              primary_only=True)
747

    
748
    # if we replace nodes *and* the old primary is offline, we don't
749
    # check the instance state
750
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753
                         msg="cannot recreate disks")
754

    
755
    if self.op.disks:
756
      self.disks = dict(self.op.disks)
757
    else:
758
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
759

    
760
    maxidx = max(self.disks.keys())
761
    if maxidx >= len(instance.disks):
762
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
763
                                 errors.ECODE_INVAL)
764

    
765
    if ((self.op.nodes or self.op.iallocator) and
766
         sorted(self.disks.keys()) != range(len(instance.disks))):
767
      raise errors.OpPrereqError("Can't recreate disks partially and"
768
                                 " change the nodes at the same time",
769
                                 errors.ECODE_INVAL)
770

    
771
    self.instance = instance
772

    
773
    if self.op.iallocator:
774
      self._RunAllocator()
775
      # Release unneeded node and node resource locks
776
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
779

    
780
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
781

    
782
    if self.op.nodes:
783
      nodes = self.op.nodes
784
    else:
785
      nodes = instance.all_nodes
786
    excl_stor = compat.any(
787
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
788
      )
789
    for new_params in self.disks.values():
790
      CheckSpindlesExclusiveStorage(new_params, excl_stor)
791

    
792
  def Exec(self, feedback_fn):
793
    """Recreate the disks.
794

795
    """
796
    instance = self.instance
797

    
798
    assert (self.owned_locks(locking.LEVEL_NODE) ==
799
            self.owned_locks(locking.LEVEL_NODE_RES))
800

    
801
    to_skip = []
802
    mods = [] # keeps track of needed changes
803

    
804
    for idx, disk in enumerate(instance.disks):
805
      try:
806
        changes = self.disks[idx]
807
      except KeyError:
808
        # Disk should not be recreated
809
        to_skip.append(idx)
810
        continue
811

    
812
      # update secondaries for disks, if needed
813
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814
        # need to update the nodes and minors
815
        assert len(self.op.nodes) == 2
816
        assert len(disk.logical_id) == 6 # otherwise disk internals
817
                                         # have changed
818
        (_, _, old_port, _, _, old_secret) = disk.logical_id
819
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821
                  new_minors[0], new_minors[1], old_secret)
822
        assert len(disk.logical_id) == len(new_id)
823
      else:
824
        new_id = None
825

    
826
      mods.append((idx, new_id, changes))
827

    
828
    # now that we have passed all asserts above, we can apply the mods
829
    # in a single run (to avoid partial changes)
830
    for idx, new_id, changes in mods:
831
      disk = instance.disks[idx]
832
      if new_id is not None:
833
        assert disk.dev_type == constants.LD_DRBD8
834
        disk.logical_id = new_id
835
      if changes:
836
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837
                    mode=changes.get(constants.IDISK_MODE, None),
838
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
839

    
840
    # change primary node, if needed
841
    if self.op.nodes:
842
      instance.primary_node = self.op.nodes[0]
843
      self.LogWarning("Changing the instance's nodes, you will have to"
844
                      " remove any disks left on the older nodes manually")
845

    
846
    if self.op.nodes:
847
      self.cfg.Update(instance, feedback_fn)
848

    
849
    # All touched nodes must be locked
850
    mylocks = self.owned_locks(locking.LEVEL_NODE)
851
    assert mylocks.issuperset(frozenset(instance.all_nodes))
852
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
853

    
854
    # TODO: Release node locks before wiping, or explain why it's not possible
855
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856
      wipedisks = [(idx, disk, 0)
857
                   for (idx, disk) in enumerate(instance.disks)
858
                   if idx not in to_skip]
859
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
860

    
861

    
862
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863
  """Checks if nodes have enough free disk space in the specified VG.
864

865
  This function checks if all given nodes have the needed amount of
866
  free disk. In case any node has less disk or we cannot get the
867
  information from the node, this function raises an OpPrereqError
868
  exception.
869

870
  @type lu: C{LogicalUnit}
871
  @param lu: a logical unit from which we get configuration data
872
  @type nodenames: C{list}
873
  @param nodenames: the list of node names to check
874
  @type vg: C{str}
875
  @param vg: the volume group to check
876
  @type requested: C{int}
877
  @param requested: the amount of disk in MiB to check for
878
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
879
      or we cannot check the node
880

881
  """
882
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884
  # the current functionality. Refactor to make it more flexible.
885
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
886
                                   es_flags)
887
  for node in nodenames:
888
    info = nodeinfo[node]
889
    info.Raise("Cannot get current information from node %s" % node,
890
               prereq=True, ecode=errors.ECODE_ENVIRON)
891
    (_, (vg_info, ), _) = info.payload
892
    vg_free = vg_info.get("vg_free", None)
893
    if not isinstance(vg_free, int):
894
      raise errors.OpPrereqError("Can't compute free disk space on node"
895
                                 " %s for vg %s, result was '%s'" %
896
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
897
    if requested > vg_free:
898
      raise errors.OpPrereqError("Not enough disk space on target node %s"
899
                                 " vg %s: required %d MiB, available %d MiB" %
900
                                 (node, vg, requested, vg_free),
901
                                 errors.ECODE_NORES)
902

    
903

    
904
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905
  """Checks if nodes have enough free disk space in all the VGs.
906

907
  This function checks if all given nodes have the needed amount of
908
  free disk. In case any node has less disk or we cannot get the
909
  information from the node, this function raises an OpPrereqError
910
  exception.
911

912
  @type lu: C{LogicalUnit}
913
  @param lu: a logical unit from which we get configuration data
914
  @type nodenames: C{list}
915
  @param nodenames: the list of node names to check
916
  @type req_sizes: C{dict}
917
  @param req_sizes: the hash of vg and corresponding amount of disk in
918
      MiB to check for
919
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
920
      or we cannot check the node
921

922
  """
923
  for vg, req_size in req_sizes.items():
924
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
925

    
926

    
927
def _DiskSizeInBytesToMebibytes(lu, size):
928
  """Converts a disk size in bytes to mebibytes.
929

930
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
931

932
  """
933
  (mib, remainder) = divmod(size, 1024 * 1024)
934

    
935
  if remainder != 0:
936
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937
                  " to not overwrite existing data (%s bytes will not be"
938
                  " wiped)", (1024 * 1024) - remainder)
939
    mib += 1
940

    
941
  return mib
942

    
943

    
944
def _CalcEta(time_taken, written, total_size):
945
  """Calculates the ETA based on size written and total size.
946

947
  @param time_taken: The time taken so far
948
  @param written: amount written so far
949
  @param total_size: The total size of data to be written
950
  @return: The remaining time in seconds
951

952
  """
953
  avg_time = time_taken / float(written)
954
  return (total_size - written) * avg_time
955

    
956

    
957
def WipeDisks(lu, instance, disks=None):
958
  """Wipes instance disks.
959

960
  @type lu: L{LogicalUnit}
961
  @param lu: the logical unit on whose behalf we execute
962
  @type instance: L{objects.Instance}
963
  @param instance: the instance whose disks we should create
964
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965
  @param disks: Disk details; tuple contains disk index, disk object and the
966
    start offset
967

968
  """
969
  node = instance.primary_node
970

    
971
  if disks is None:
972
    disks = [(idx, disk, 0)
973
             for (idx, disk) in enumerate(instance.disks)]
974

    
975
  for (_, device, _) in disks:
976
    lu.cfg.SetDiskID(device, node)
977

    
978
  logging.info("Pausing synchronization of disks of instance '%s'",
979
               instance.name)
980
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
981
                                                  (map(compat.snd, disks),
982
                                                   instance),
983
                                                  True)
984
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
985

    
986
  for idx, success in enumerate(result.payload):
987
    if not success:
988
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
989
                   " failed", idx, instance.name)
990

    
991
  try:
992
    for (idx, device, offset) in disks:
993
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
995
      wipe_chunk_size = \
996
        int(min(constants.MAX_WIPE_CHUNK,
997
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
998

    
999
      size = device.size
1000
      last_output = 0
1001
      start_time = time.time()
1002

    
1003
      if offset == 0:
1004
        info_text = ""
1005
      else:
1006
        info_text = (" (from %s to %s)" %
1007
                     (utils.FormatUnit(offset, "h"),
1008
                      utils.FormatUnit(size, "h")))
1009

    
1010
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1011

    
1012
      logging.info("Wiping disk %d for instance %s on node %s using"
1013
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1014

    
1015
      while offset < size:
1016
        wipe_size = min(wipe_chunk_size, size - offset)
1017

    
1018
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1019
                      idx, offset, wipe_size)
1020

    
1021
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1022
                                           wipe_size)
1023
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024
                     (idx, offset, wipe_size))
1025

    
1026
        now = time.time()
1027
        offset += wipe_size
1028
        if now - last_output >= 60:
1029
          eta = _CalcEta(now - start_time, offset, size)
1030
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1031
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1032
          last_output = now
1033
  finally:
1034
    logging.info("Resuming synchronization of disks for instance '%s'",
1035
                 instance.name)
1036

    
1037
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038
                                                    (map(compat.snd, disks),
1039
                                                     instance),
1040
                                                    False)
1041

    
1042
    if result.fail_msg:
1043
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044
                    node, result.fail_msg)
1045
    else:
1046
      for idx, success in enumerate(result.payload):
1047
        if not success:
1048
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049
                        " failed", idx, instance.name)
1050

    
1051

    
1052
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053
  """Wrapper for L{WipeDisks} that handles errors.
1054

1055
  @type lu: L{LogicalUnit}
1056
  @param lu: the logical unit on whose behalf we execute
1057
  @type instance: L{objects.Instance}
1058
  @param instance: the instance whose disks we should wipe
1059
  @param disks: see L{WipeDisks}
1060
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1061
      case of error
1062
  @raise errors.OpPrereqError: in case of failure
1063

1064
  """
1065
  try:
1066
    WipeDisks(lu, instance, disks=disks)
1067
  except errors.OpExecError:
1068
    logging.warning("Wiping disks for instance '%s' failed",
1069
                    instance.name)
1070
    _UndoCreateDisks(lu, cleanup)
1071
    raise
1072

    
1073

    
1074
def ExpandCheckDisks(instance, disks):
1075
  """Return the instance disks selected by the disks list
1076

1077
  @type disks: list of L{objects.Disk} or None
1078
  @param disks: selected disks
1079
  @rtype: list of L{objects.Disk}
1080
  @return: selected instance disks to act on
1081

1082
  """
1083
  if disks is None:
1084
    return instance.disks
1085
  else:
1086
    if not set(disks).issubset(instance.disks):
1087
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1088
                                   " target instance")
1089
    return disks
1090

    
1091

    
1092
def WaitForSync(lu, instance, disks=None, oneshot=False):
1093
  """Sleep and poll for an instance's disk to sync.
1094

1095
  """
1096
  if not instance.disks or disks is not None and not disks:
1097
    return True
1098

    
1099
  disks = ExpandCheckDisks(instance, disks)
1100

    
1101
  if not oneshot:
1102
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1103

    
1104
  node = instance.primary_node
1105

    
1106
  for dev in disks:
1107
    lu.cfg.SetDiskID(dev, node)
1108

    
1109
  # TODO: Convert to utils.Retry
1110

    
1111
  retries = 0
1112
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1113
  while True:
1114
    max_time = 0
1115
    done = True
1116
    cumul_degraded = False
1117
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1118
    msg = rstats.fail_msg
1119
    if msg:
1120
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1121
      retries += 1
1122
      if retries >= 10:
1123
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1124
                                 " aborting." % node)
1125
      time.sleep(6)
1126
      continue
1127
    rstats = rstats.payload
1128
    retries = 0
1129
    for i, mstat in enumerate(rstats):
1130
      if mstat is None:
1131
        lu.LogWarning("Can't compute data for node %s/%s",
1132
                      node, disks[i].iv_name)
1133
        continue
1134

    
1135
      cumul_degraded = (cumul_degraded or
1136
                        (mstat.is_degraded and mstat.sync_percent is None))
1137
      if mstat.sync_percent is not None:
1138
        done = False
1139
        if mstat.estimated_time is not None:
1140
          rem_time = ("%s remaining (estimated)" %
1141
                      utils.FormatSeconds(mstat.estimated_time))
1142
          max_time = mstat.estimated_time
1143
        else:
1144
          rem_time = "no time estimate"
1145
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1146
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1147

    
1148
    # if we're done but degraded, let's do a few small retries, to
1149
    # make sure we see a stable and not transient situation; therefore
1150
    # we force restart of the loop
1151
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1152
      logging.info("Degraded disks found, %d retries left", degr_retries)
1153
      degr_retries -= 1
1154
      time.sleep(1)
1155
      continue
1156

    
1157
    if done or oneshot:
1158
      break
1159

    
1160
    time.sleep(min(60, max_time))
1161

    
1162
  if done:
1163
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1164

    
1165
  return not cumul_degraded
1166

    
1167

    
1168
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1169
  """Shutdown block devices of an instance.
1170

1171
  This does the shutdown on all nodes of the instance.
1172

1173
  If the ignore_primary is false, errors on the primary node are
1174
  ignored.
1175

1176
  """
1177
  all_result = True
1178
  disks = ExpandCheckDisks(instance, disks)
1179

    
1180
  for disk in disks:
1181
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1182
      lu.cfg.SetDiskID(top_disk, node)
1183
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1184
      msg = result.fail_msg
1185
      if msg:
1186
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1187
                      disk.iv_name, node, msg)
1188
        if ((node == instance.primary_node and not ignore_primary) or
1189
            (node != instance.primary_node and not result.offline)):
1190
          all_result = False
1191
  return all_result
1192

    
1193

    
1194
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1195
  """Shutdown block devices of an instance.
1196

1197
  This function checks if an instance is running, before calling
1198
  _ShutdownInstanceDisks.
1199

1200
  """
1201
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1202
  ShutdownInstanceDisks(lu, instance, disks=disks)
1203

    
1204

    
1205
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1206
                           ignore_size=False):
1207
  """Prepare the block devices for an instance.
1208

1209
  This sets up the block devices on all nodes.
1210

1211
  @type lu: L{LogicalUnit}
1212
  @param lu: the logical unit on whose behalf we execute
1213
  @type instance: L{objects.Instance}
1214
  @param instance: the instance for whose disks we assemble
1215
  @type disks: list of L{objects.Disk} or None
1216
  @param disks: which disks to assemble (or all, if None)
1217
  @type ignore_secondaries: boolean
1218
  @param ignore_secondaries: if true, errors on secondary nodes
1219
      won't result in an error return from the function
1220
  @type ignore_size: boolean
1221
  @param ignore_size: if true, the current known size of the disk
1222
      will not be used during the disk activation, useful for cases
1223
      when the size is wrong
1224
  @return: False if the operation failed, otherwise a list of
1225
      (host, instance_visible_name, node_visible_name)
1226
      with the mapping from node devices to instance devices
1227

1228
  """
1229
  device_info = []
1230
  disks_ok = True
1231
  iname = instance.name
1232
  disks = ExpandCheckDisks(instance, disks)
1233

    
1234
  # With the two passes mechanism we try to reduce the window of
1235
  # opportunity for the race condition of switching DRBD to primary
1236
  # before handshaking occured, but we do not eliminate it
1237

    
1238
  # The proper fix would be to wait (with some limits) until the
1239
  # connection has been made and drbd transitions from WFConnection
1240
  # into any other network-connected state (Connected, SyncTarget,
1241
  # SyncSource, etc.)
1242

    
1243
  # 1st pass, assemble on all nodes in secondary mode
1244
  for idx, inst_disk in enumerate(disks):
1245
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1246
      if ignore_size:
1247
        node_disk = node_disk.Copy()
1248
        node_disk.UnsetSize()
1249
      lu.cfg.SetDiskID(node_disk, node)
1250
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1251
                                             False, idx)
1252
      msg = result.fail_msg
1253
      if msg:
1254
        is_offline_secondary = (node in instance.secondary_nodes and
1255
                                result.offline)
1256
        lu.LogWarning("Could not prepare block device %s on node %s"
1257
                      " (is_primary=False, pass=1): %s",
1258
                      inst_disk.iv_name, node, msg)
1259
        if not (ignore_secondaries or is_offline_secondary):
1260
          disks_ok = False
1261

    
1262
  # FIXME: race condition on drbd migration to primary
1263

    
1264
  # 2nd pass, do only the primary node
1265
  for idx, inst_disk in enumerate(disks):
1266
    dev_path = None
1267

    
1268
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1269
      if node != instance.primary_node:
1270
        continue
1271
      if ignore_size:
1272
        node_disk = node_disk.Copy()
1273
        node_disk.UnsetSize()
1274
      lu.cfg.SetDiskID(node_disk, node)
1275
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1276
                                             True, idx)
1277
      msg = result.fail_msg
1278
      if msg:
1279
        lu.LogWarning("Could not prepare block device %s on node %s"
1280
                      " (is_primary=True, pass=2): %s",
1281
                      inst_disk.iv_name, node, msg)
1282
        disks_ok = False
1283
      else:
1284
        dev_path = result.payload
1285

    
1286
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1287

    
1288
  # leave the disks configured for the primary node
1289
  # this is a workaround that would be fixed better by
1290
  # improving the logical/physical id handling
1291
  for disk in disks:
1292
    lu.cfg.SetDiskID(disk, instance.primary_node)
1293

    
1294
  return disks_ok, device_info
1295

    
1296

    
1297
def StartInstanceDisks(lu, instance, force):
1298
  """Start the disks of an instance.
1299

1300
  """
1301
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1302
                                      ignore_secondaries=force)
1303
  if not disks_ok:
1304
    ShutdownInstanceDisks(lu, instance)
1305
    if force is not None and not force:
1306
      lu.LogWarning("",
1307
                    hint=("If the message above refers to a secondary node,"
1308
                          " you can retry the operation using '--force'"))
1309
    raise errors.OpExecError("Disk consistency error")
1310

    
1311

    
1312
class LUInstanceGrowDisk(LogicalUnit):
1313
  """Grow a disk of an instance.
1314

1315
  """
1316
  HPATH = "disk-grow"
1317
  HTYPE = constants.HTYPE_INSTANCE
1318
  REQ_BGL = False
1319

    
1320
  def ExpandNames(self):
1321
    self._ExpandAndLockInstance()
1322
    self.needed_locks[locking.LEVEL_NODE] = []
1323
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1324
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1325
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1326

    
1327
  def DeclareLocks(self, level):
1328
    if level == locking.LEVEL_NODE:
1329
      self._LockInstancesNodes()
1330
    elif level == locking.LEVEL_NODE_RES:
1331
      # Copy node locks
1332
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1333
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1334

    
1335
  def BuildHooksEnv(self):
1336
    """Build hooks env.
1337

1338
    This runs on the master, the primary and all the secondaries.
1339

1340
    """
1341
    env = {
1342
      "DISK": self.op.disk,
1343
      "AMOUNT": self.op.amount,
1344
      "ABSOLUTE": self.op.absolute,
1345
      }
1346
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1347
    return env
1348

    
1349
  def BuildHooksNodes(self):
1350
    """Build hooks nodes.
1351

1352
    """
1353
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1354
    return (nl, nl)
1355

    
1356
  def CheckPrereq(self):
1357
    """Check prerequisites.
1358

1359
    This checks that the instance is in the cluster.
1360

1361
    """
1362
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1363
    assert instance is not None, \
1364
      "Cannot retrieve locked instance %s" % self.op.instance_name
1365
    nodenames = list(instance.all_nodes)
1366
    for node in nodenames:
1367
      CheckNodeOnline(self, node)
1368

    
1369
    self.instance = instance
1370

    
1371
    if instance.disk_template not in constants.DTS_GROWABLE:
1372
      raise errors.OpPrereqError("Instance's disk layout does not support"
1373
                                 " growing", errors.ECODE_INVAL)
1374

    
1375
    self.disk = instance.FindDisk(self.op.disk)
1376

    
1377
    if self.op.absolute:
1378
      self.target = self.op.amount
1379
      self.delta = self.target - self.disk.size
1380
      if self.delta < 0:
1381
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1382
                                   "current disk size (%s)" %
1383
                                   (utils.FormatUnit(self.target, "h"),
1384
                                    utils.FormatUnit(self.disk.size, "h")),
1385
                                   errors.ECODE_STATE)
1386
    else:
1387
      self.delta = self.op.amount
1388
      self.target = self.disk.size + self.delta
1389
      if self.delta < 0:
1390
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1391
                                   utils.FormatUnit(self.delta, "h"),
1392
                                   errors.ECODE_INVAL)
1393

    
1394
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1395

    
1396
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1397
    template = self.instance.disk_template
1398
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1399
      # TODO: check the free disk space for file, when that feature will be
1400
      # supported
1401
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1402
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1403
                        nodes)
1404
      if es_nodes:
1405
        # With exclusive storage we need to something smarter than just looking
1406
        # at free space; for now, let's simply abort the operation.
1407
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1408
                                   " is enabled", errors.ECODE_STATE)
1409
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1410

    
1411
  def Exec(self, feedback_fn):
1412
    """Execute disk grow.
1413

1414
    """
1415
    instance = self.instance
1416
    disk = self.disk
1417

    
1418
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1419
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1420
            self.owned_locks(locking.LEVEL_NODE_RES))
1421

    
1422
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1423

    
1424
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1425
    if not disks_ok:
1426
      raise errors.OpExecError("Cannot activate block device to grow")
1427

    
1428
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1429
                (self.op.disk, instance.name,
1430
                 utils.FormatUnit(self.delta, "h"),
1431
                 utils.FormatUnit(self.target, "h")))
1432

    
1433
    # First run all grow ops in dry-run mode
1434
    for node in instance.all_nodes:
1435
      self.cfg.SetDiskID(disk, node)
1436
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1437
                                           True, True)
1438
      result.Raise("Dry-run grow request failed to node %s" % node)
1439

    
1440
    if wipe_disks:
1441
      # Get disk size from primary node for wiping
1442
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1443
      result.Raise("Failed to retrieve disk size from node '%s'" %
1444
                   instance.primary_node)
1445

    
1446
      (disk_size_in_bytes, ) = result.payload
1447

    
1448
      if disk_size_in_bytes is None:
1449
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1450
                                 " node '%s'" % instance.primary_node)
1451

    
1452
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1453

    
1454
      assert old_disk_size >= disk.size, \
1455
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1456
         (old_disk_size, disk.size))
1457
    else:
1458
      old_disk_size = None
1459

    
1460
    # We know that (as far as we can test) operations across different
1461
    # nodes will succeed, time to run it for real on the backing storage
1462
    for node in instance.all_nodes:
1463
      self.cfg.SetDiskID(disk, node)
1464
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1465
                                           False, True)
1466
      result.Raise("Grow request failed to node %s" % node)
1467

    
1468
    # And now execute it for logical storage, on the primary node
1469
    node = instance.primary_node
1470
    self.cfg.SetDiskID(disk, node)
1471
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1472
                                         False, False)
1473
    result.Raise("Grow request failed to node %s" % node)
1474

    
1475
    disk.RecordGrow(self.delta)
1476
    self.cfg.Update(instance, feedback_fn)
1477

    
1478
    # Changes have been recorded, release node lock
1479
    ReleaseLocks(self, locking.LEVEL_NODE)
1480

    
1481
    # Downgrade lock while waiting for sync
1482
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1483

    
1484
    assert wipe_disks ^ (old_disk_size is None)
1485

    
1486
    if wipe_disks:
1487
      assert instance.disks[self.op.disk] == disk
1488

    
1489
      # Wipe newly added disk space
1490
      WipeDisks(self, instance,
1491
                disks=[(self.op.disk, disk, old_disk_size)])
1492

    
1493
    if self.op.wait_for_sync:
1494
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1495
      if disk_abort:
1496
        self.LogWarning("Disk syncing has not returned a good status; check"
1497
                        " the instance")
1498
      if instance.admin_state != constants.ADMINST_UP:
1499
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1500
    elif instance.admin_state != constants.ADMINST_UP:
1501
      self.LogWarning("Not shutting down the disk even if the instance is"
1502
                      " not supposed to be running because no wait for"
1503
                      " sync mode was requested")
1504

    
1505
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1506
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1507

    
1508

    
1509
class LUInstanceReplaceDisks(LogicalUnit):
1510
  """Replace the disks of an instance.
1511

1512
  """
1513
  HPATH = "mirrors-replace"
1514
  HTYPE = constants.HTYPE_INSTANCE
1515
  REQ_BGL = False
1516

    
1517
  def CheckArguments(self):
1518
    """Check arguments.
1519

1520
    """
1521
    remote_node = self.op.remote_node
1522
    ialloc = self.op.iallocator
1523
    if self.op.mode == constants.REPLACE_DISK_CHG:
1524
      if remote_node is None and ialloc is None:
1525
        raise errors.OpPrereqError("When changing the secondary either an"
1526
                                   " iallocator script must be used or the"
1527
                                   " new node given", errors.ECODE_INVAL)
1528
      else:
1529
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1530

    
1531
    elif remote_node is not None or ialloc is not None:
1532
      # Not replacing the secondary
1533
      raise errors.OpPrereqError("The iallocator and new node options can"
1534
                                 " only be used when changing the"
1535
                                 " secondary node", errors.ECODE_INVAL)
1536

    
1537
  def ExpandNames(self):
1538
    self._ExpandAndLockInstance()
1539

    
1540
    assert locking.LEVEL_NODE not in self.needed_locks
1541
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1542
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1543

    
1544
    assert self.op.iallocator is None or self.op.remote_node is None, \
1545
      "Conflicting options"
1546

    
1547
    if self.op.remote_node is not None:
1548
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1549

    
1550
      # Warning: do not remove the locking of the new secondary here
1551
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1552
      # currently it doesn't since parallel invocations of
1553
      # FindUnusedMinor will conflict
1554
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1555
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1556
    else:
1557
      self.needed_locks[locking.LEVEL_NODE] = []
1558
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1559

    
1560
      if self.op.iallocator is not None:
1561
        # iallocator will select a new node in the same group
1562
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1563
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1564

    
1565
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1566

    
1567
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1568
                                   self.op.iallocator, self.op.remote_node,
1569
                                   self.op.disks, self.op.early_release,
1570
                                   self.op.ignore_ipolicy)
1571

    
1572
    self.tasklets = [self.replacer]
1573

    
1574
  def DeclareLocks(self, level):
1575
    if level == locking.LEVEL_NODEGROUP:
1576
      assert self.op.remote_node is None
1577
      assert self.op.iallocator is not None
1578
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1579

    
1580
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1581
      # Lock all groups used by instance optimistically; this requires going
1582
      # via the node before it's locked, requiring verification later on
1583
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1584
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1585

    
1586
    elif level == locking.LEVEL_NODE:
1587
      if self.op.iallocator is not None:
1588
        assert self.op.remote_node is None
1589
        assert not self.needed_locks[locking.LEVEL_NODE]
1590
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1591

    
1592
        # Lock member nodes of all locked groups
1593
        self.needed_locks[locking.LEVEL_NODE] = \
1594
          [node_name
1595
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1596
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1597
      else:
1598
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1599

    
1600
        self._LockInstancesNodes()
1601

    
1602
    elif level == locking.LEVEL_NODE_RES:
1603
      # Reuse node locks
1604
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1605
        self.needed_locks[locking.LEVEL_NODE]
1606

    
1607
  def BuildHooksEnv(self):
1608
    """Build hooks env.
1609

1610
    This runs on the master, the primary and all the secondaries.
1611

1612
    """
1613
    instance = self.replacer.instance
1614
    env = {
1615
      "MODE": self.op.mode,
1616
      "NEW_SECONDARY": self.op.remote_node,
1617
      "OLD_SECONDARY": instance.secondary_nodes[0],
1618
      }
1619
    env.update(BuildInstanceHookEnvByObject(self, instance))
1620
    return env
1621

    
1622
  def BuildHooksNodes(self):
1623
    """Build hooks nodes.
1624

1625
    """
1626
    instance = self.replacer.instance
1627
    nl = [
1628
      self.cfg.GetMasterNode(),
1629
      instance.primary_node,
1630
      ]
1631
    if self.op.remote_node is not None:
1632
      nl.append(self.op.remote_node)
1633
    return nl, nl
1634

    
1635
  def CheckPrereq(self):
1636
    """Check prerequisites.
1637

1638
    """
1639
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1640
            self.op.iallocator is None)
1641

    
1642
    # Verify if node group locks are still correct
1643
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1644
    if owned_groups:
1645
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1646

    
1647
    return LogicalUnit.CheckPrereq(self)
1648

    
1649

    
1650
class LUInstanceActivateDisks(NoHooksLU):
1651
  """Bring up an instance's disks.
1652

1653
  """
1654
  REQ_BGL = False
1655

    
1656
  def ExpandNames(self):
1657
    self._ExpandAndLockInstance()
1658
    self.needed_locks[locking.LEVEL_NODE] = []
1659
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1660

    
1661
  def DeclareLocks(self, level):
1662
    if level == locking.LEVEL_NODE:
1663
      self._LockInstancesNodes()
1664

    
1665
  def CheckPrereq(self):
1666
    """Check prerequisites.
1667

1668
    This checks that the instance is in the cluster.
1669

1670
    """
1671
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1672
    assert self.instance is not None, \
1673
      "Cannot retrieve locked instance %s" % self.op.instance_name
1674
    CheckNodeOnline(self, self.instance.primary_node)
1675

    
1676
  def Exec(self, feedback_fn):
1677
    """Activate the disks.
1678

1679
    """
1680
    disks_ok, disks_info = \
1681
              AssembleInstanceDisks(self, self.instance,
1682
                                    ignore_size=self.op.ignore_size)
1683
    if not disks_ok:
1684
      raise errors.OpExecError("Cannot activate block devices")
1685

    
1686
    if self.op.wait_for_sync:
1687
      if not WaitForSync(self, self.instance):
1688
        raise errors.OpExecError("Some disks of the instance are degraded!")
1689

    
1690
    return disks_info
1691

    
1692

    
1693
class LUInstanceDeactivateDisks(NoHooksLU):
1694
  """Shutdown an instance's disks.
1695

1696
  """
1697
  REQ_BGL = False
1698

    
1699
  def ExpandNames(self):
1700
    self._ExpandAndLockInstance()
1701
    self.needed_locks[locking.LEVEL_NODE] = []
1702
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1703

    
1704
  def DeclareLocks(self, level):
1705
    if level == locking.LEVEL_NODE:
1706
      self._LockInstancesNodes()
1707

    
1708
  def CheckPrereq(self):
1709
    """Check prerequisites.
1710

1711
    This checks that the instance is in the cluster.
1712

1713
    """
1714
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1715
    assert self.instance is not None, \
1716
      "Cannot retrieve locked instance %s" % self.op.instance_name
1717

    
1718
  def Exec(self, feedback_fn):
1719
    """Deactivate the disks
1720

1721
    """
1722
    instance = self.instance
1723
    if self.op.force:
1724
      ShutdownInstanceDisks(self, instance)
1725
    else:
1726
      _SafeShutdownInstanceDisks(self, instance)
1727

    
1728

    
1729
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1730
                               ldisk=False):
1731
  """Check that mirrors are not degraded.
1732

1733
  @attention: The device has to be annotated already.
1734

1735
  The ldisk parameter, if True, will change the test from the
1736
  is_degraded attribute (which represents overall non-ok status for
1737
  the device(s)) to the ldisk (representing the local storage status).
1738

1739
  """
1740
  lu.cfg.SetDiskID(dev, node)
1741

    
1742
  result = True
1743

    
1744
  if on_primary or dev.AssembleOnSecondary():
1745
    rstats = lu.rpc.call_blockdev_find(node, dev)
1746
    msg = rstats.fail_msg
1747
    if msg:
1748
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1749
      result = False
1750
    elif not rstats.payload:
1751
      lu.LogWarning("Can't find disk on node %s", node)
1752
      result = False
1753
    else:
1754
      if ldisk:
1755
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1756
      else:
1757
        result = result and not rstats.payload.is_degraded
1758

    
1759
  if dev.children:
1760
    for child in dev.children:
1761
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1762
                                                     on_primary)
1763

    
1764
  return result
1765

    
1766

    
1767
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1768
  """Wrapper around L{_CheckDiskConsistencyInner}.
1769

1770
  """
1771
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1772
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1773
                                    ldisk=ldisk)
1774

    
1775

    
1776
def _BlockdevFind(lu, node, dev, instance):
1777
  """Wrapper around call_blockdev_find to annotate diskparams.
1778

1779
  @param lu: A reference to the lu object
1780
  @param node: The node to call out
1781
  @param dev: The device to find
1782
  @param instance: The instance object the device belongs to
1783
  @returns The result of the rpc call
1784

1785
  """
1786
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1787
  return lu.rpc.call_blockdev_find(node, disk)
1788

    
1789

    
1790
def _GenerateUniqueNames(lu, exts):
1791
  """Generate a suitable LV name.
1792

1793
  This will generate a logical volume name for the given instance.
1794

1795
  """
1796
  results = []
1797
  for val in exts:
1798
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1799
    results.append("%s%s" % (new_id, val))
1800
  return results
1801

    
1802

    
1803
class TLReplaceDisks(Tasklet):
1804
  """Replaces disks for an instance.
1805

1806
  Note: Locking is not within the scope of this class.
1807

1808
  """
1809
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1810
               disks, early_release, ignore_ipolicy):
1811
    """Initializes this class.
1812

1813
    """
1814
    Tasklet.__init__(self, lu)
1815

    
1816
    # Parameters
1817
    self.instance_name = instance_name
1818
    self.mode = mode
1819
    self.iallocator_name = iallocator_name
1820
    self.remote_node = remote_node
1821
    self.disks = disks
1822
    self.early_release = early_release
1823
    self.ignore_ipolicy = ignore_ipolicy
1824

    
1825
    # Runtime data
1826
    self.instance = None
1827
    self.new_node = None
1828
    self.target_node = None
1829
    self.other_node = None
1830
    self.remote_node_info = None
1831
    self.node_secondary_ip = None
1832

    
1833
  @staticmethod
1834
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1835
    """Compute a new secondary node using an IAllocator.
1836

1837
    """
1838
    req = iallocator.IAReqRelocate(name=instance_name,
1839
                                   relocate_from=list(relocate_from))
1840
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1841

    
1842
    ial.Run(iallocator_name)
1843

    
1844
    if not ial.success:
1845
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1846
                                 " %s" % (iallocator_name, ial.info),
1847
                                 errors.ECODE_NORES)
1848

    
1849
    remote_node_name = ial.result[0]
1850

    
1851
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1852
               instance_name, remote_node_name)
1853

    
1854
    return remote_node_name
1855

    
1856
  def _FindFaultyDisks(self, node_name):
1857
    """Wrapper for L{FindFaultyInstanceDisks}.
1858

1859
    """
1860
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1861
                                   node_name, True)
1862

    
1863
  def _CheckDisksActivated(self, instance):
1864
    """Checks if the instance disks are activated.
1865

1866
    @param instance: The instance to check disks
1867
    @return: True if they are activated, False otherwise
1868

1869
    """
1870
    nodes = instance.all_nodes
1871

    
1872
    for idx, dev in enumerate(instance.disks):
1873
      for node in nodes:
1874
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1875
        self.cfg.SetDiskID(dev, node)
1876

    
1877
        result = _BlockdevFind(self, node, dev, instance)
1878

    
1879
        if result.offline:
1880
          continue
1881
        elif result.fail_msg or not result.payload:
1882
          return False
1883

    
1884
    return True
1885

    
1886
  def CheckPrereq(self):
1887
    """Check prerequisites.
1888

1889
    This checks that the instance is in the cluster.
1890

1891
    """
1892
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1893
    assert instance is not None, \
1894
      "Cannot retrieve locked instance %s" % self.instance_name
1895

    
1896
    if instance.disk_template != constants.DT_DRBD8:
1897
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1898
                                 " instances", errors.ECODE_INVAL)
1899

    
1900
    if len(instance.secondary_nodes) != 1:
1901
      raise errors.OpPrereqError("The instance has a strange layout,"
1902
                                 " expected one secondary but found %d" %
1903
                                 len(instance.secondary_nodes),
1904
                                 errors.ECODE_FAULT)
1905

    
1906
    instance = self.instance
1907
    secondary_node = instance.secondary_nodes[0]
1908

    
1909
    if self.iallocator_name is None:
1910
      remote_node = self.remote_node
1911
    else:
1912
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1913
                                       instance.name, instance.secondary_nodes)
1914

    
1915
    if remote_node is None:
1916
      self.remote_node_info = None
1917
    else:
1918
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1919
             "Remote node '%s' is not locked" % remote_node
1920

    
1921
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1922
      assert self.remote_node_info is not None, \
1923
        "Cannot retrieve locked node %s" % remote_node
1924

    
1925
    if remote_node == self.instance.primary_node:
1926
      raise errors.OpPrereqError("The specified node is the primary node of"
1927
                                 " the instance", errors.ECODE_INVAL)
1928

    
1929
    if remote_node == secondary_node:
1930
      raise errors.OpPrereqError("The specified node is already the"
1931
                                 " secondary node of the instance",
1932
                                 errors.ECODE_INVAL)
1933

    
1934
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1935
                                    constants.REPLACE_DISK_CHG):
1936
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1937
                                 errors.ECODE_INVAL)
1938

    
1939
    if self.mode == constants.REPLACE_DISK_AUTO:
1940
      if not self._CheckDisksActivated(instance):
1941
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1942
                                   " first" % self.instance_name,
1943
                                   errors.ECODE_STATE)
1944
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1945
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1946

    
1947
      if faulty_primary and faulty_secondary:
1948
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1949
                                   " one node and can not be repaired"
1950
                                   " automatically" % self.instance_name,
1951
                                   errors.ECODE_STATE)
1952

    
1953
      if faulty_primary:
1954
        self.disks = faulty_primary
1955
        self.target_node = instance.primary_node
1956
        self.other_node = secondary_node
1957
        check_nodes = [self.target_node, self.other_node]
1958
      elif faulty_secondary:
1959
        self.disks = faulty_secondary
1960
        self.target_node = secondary_node
1961
        self.other_node = instance.primary_node
1962
        check_nodes = [self.target_node, self.other_node]
1963
      else:
1964
        self.disks = []
1965
        check_nodes = []
1966

    
1967
    else:
1968
      # Non-automatic modes
1969
      if self.mode == constants.REPLACE_DISK_PRI:
1970
        self.target_node = instance.primary_node
1971
        self.other_node = secondary_node
1972
        check_nodes = [self.target_node, self.other_node]
1973

    
1974
      elif self.mode == constants.REPLACE_DISK_SEC:
1975
        self.target_node = secondary_node
1976
        self.other_node = instance.primary_node
1977
        check_nodes = [self.target_node, self.other_node]
1978

    
1979
      elif self.mode == constants.REPLACE_DISK_CHG:
1980
        self.new_node = remote_node
1981
        self.other_node = instance.primary_node
1982
        self.target_node = secondary_node
1983
        check_nodes = [self.new_node, self.other_node]
1984

    
1985
        CheckNodeNotDrained(self.lu, remote_node)
1986
        CheckNodeVmCapable(self.lu, remote_node)
1987

    
1988
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1989
        assert old_node_info is not None
1990
        if old_node_info.offline and not self.early_release:
1991
          # doesn't make sense to delay the release
1992
          self.early_release = True
1993
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1994
                          " early-release mode", secondary_node)
1995

    
1996
      else:
1997
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1998
                                     self.mode)
1999

    
2000
      # If not specified all disks should be replaced
2001
      if not self.disks:
2002
        self.disks = range(len(self.instance.disks))
2003

    
2004
    # TODO: This is ugly, but right now we can't distinguish between internal
2005
    # submitted opcode and external one. We should fix that.
2006
    if self.remote_node_info:
2007
      # We change the node, lets verify it still meets instance policy
2008
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2009
      cluster = self.cfg.GetClusterInfo()
2010
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2011
                                                              new_group_info)
2012
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2013
                             self.cfg, ignore=self.ignore_ipolicy)
2014

    
2015
    for node in check_nodes:
2016
      CheckNodeOnline(self.lu, node)
2017

    
2018
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2019
                                                          self.other_node,
2020
                                                          self.target_node]
2021
                              if node_name is not None)
2022

    
2023
    # Release unneeded node and node resource locks
2024
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2025
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2026
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2027

    
2028
    # Release any owned node group
2029
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2030

    
2031
    # Check whether disks are valid
2032
    for disk_idx in self.disks:
2033
      instance.FindDisk(disk_idx)
2034

    
2035
    # Get secondary node IP addresses
2036
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2037
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2038

    
2039
  def Exec(self, feedback_fn):
2040
    """Execute disk replacement.
2041

2042
    This dispatches the disk replacement to the appropriate handler.
2043

2044
    """
2045
    if __debug__:
2046
      # Verify owned locks before starting operation
2047
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2048
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2049
          ("Incorrect node locks, owning %s, expected %s" %
2050
           (owned_nodes, self.node_secondary_ip.keys()))
2051
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2052
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2053
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2054

    
2055
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2056
      assert list(owned_instances) == [self.instance_name], \
2057
          "Instance '%s' not locked" % self.instance_name
2058

    
2059
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2060
          "Should not own any node group lock at this point"
2061

    
2062
    if not self.disks:
2063
      feedback_fn("No disks need replacement for instance '%s'" %
2064
                  self.instance.name)
2065
      return
2066

    
2067
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2068
                (utils.CommaJoin(self.disks), self.instance.name))
2069
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2070
    feedback_fn("Current seconary node: %s" %
2071
                utils.CommaJoin(self.instance.secondary_nodes))
2072

    
2073
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2074

    
2075
    # Activate the instance disks if we're replacing them on a down instance
2076
    if activate_disks:
2077
      StartInstanceDisks(self.lu, self.instance, True)
2078

    
2079
    try:
2080
      # Should we replace the secondary node?
2081
      if self.new_node is not None:
2082
        fn = self._ExecDrbd8Secondary
2083
      else:
2084
        fn = self._ExecDrbd8DiskOnly
2085

    
2086
      result = fn(feedback_fn)
2087
    finally:
2088
      # Deactivate the instance disks if we're replacing them on a
2089
      # down instance
2090
      if activate_disks:
2091
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2092

    
2093
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2094

    
2095
    if __debug__:
2096
      # Verify owned locks
2097
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2098
      nodes = frozenset(self.node_secondary_ip)
2099
      assert ((self.early_release and not owned_nodes) or
2100
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2101
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2102
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2103

    
2104
    return result
2105

    
2106
  def _CheckVolumeGroup(self, nodes):
2107
    self.lu.LogInfo("Checking volume groups")
2108

    
2109
    vgname = self.cfg.GetVGName()
2110

    
2111
    # Make sure volume group exists on all involved nodes
2112
    results = self.rpc.call_vg_list(nodes)
2113
    if not results:
2114
      raise errors.OpExecError("Can't list volume groups on the nodes")
2115

    
2116
    for node in nodes:
2117
      res = results[node]
2118
      res.Raise("Error checking node %s" % node)
2119
      if vgname not in res.payload:
2120
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2121
                                 (vgname, node))
2122

    
2123
  def _CheckDisksExistence(self, nodes):
2124
    # Check disk existence
2125
    for idx, dev in enumerate(self.instance.disks):
2126
      if idx not in self.disks:
2127
        continue
2128

    
2129
      for node in nodes:
2130
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2131
        self.cfg.SetDiskID(dev, node)
2132

    
2133
        result = _BlockdevFind(self, node, dev, self.instance)
2134

    
2135
        msg = result.fail_msg
2136
        if msg or not result.payload:
2137
          if not msg:
2138
            msg = "disk not found"
2139
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2140
                                   (idx, node, msg))
2141

    
2142
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2143
    for idx, dev in enumerate(self.instance.disks):
2144
      if idx not in self.disks:
2145
        continue
2146

    
2147
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2148
                      (idx, node_name))
2149

    
2150
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2151
                                  on_primary, ldisk=ldisk):
2152
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2153
                                 " replace disks for instance %s" %
2154
                                 (node_name, self.instance.name))
2155

    
2156
  def _CreateNewStorage(self, node_name):
2157
    """Create new storage on the primary or secondary node.
2158

2159
    This is only used for same-node replaces, not for changing the
2160
    secondary node, hence we don't want to modify the existing disk.
2161

2162
    """
2163
    iv_names = {}
2164

    
2165
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2166
    for idx, dev in enumerate(disks):
2167
      if idx not in self.disks:
2168
        continue
2169

    
2170
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2171

    
2172
      self.cfg.SetDiskID(dev, node_name)
2173

    
2174
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2175
      names = _GenerateUniqueNames(self.lu, lv_names)
2176

    
2177
      (data_disk, meta_disk) = dev.children
2178
      vg_data = data_disk.logical_id[0]
2179
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2180
                             logical_id=(vg_data, names[0]),
2181
                             params=data_disk.params)
2182
      vg_meta = meta_disk.logical_id[0]
2183
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2184
                             size=constants.DRBD_META_SIZE,
2185
                             logical_id=(vg_meta, names[1]),
2186
                             params=meta_disk.params)
2187

    
2188
      new_lvs = [lv_data, lv_meta]
2189
      old_lvs = [child.Copy() for child in dev.children]
2190
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2191
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2192

    
2193
      # we pass force_create=True to force the LVM creation
2194
      for new_lv in new_lvs:
2195
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2196
                             GetInstanceInfoText(self.instance), False,
2197
                             excl_stor)
2198

    
2199
    return iv_names
2200

    
2201
  def _CheckDevices(self, node_name, iv_names):
2202
    for name, (dev, _, _) in iv_names.iteritems():
2203
      self.cfg.SetDiskID(dev, node_name)
2204

    
2205
      result = _BlockdevFind(self, node_name, dev, self.instance)
2206

    
2207
      msg = result.fail_msg
2208
      if msg or not result.payload:
2209
        if not msg:
2210
          msg = "disk not found"
2211
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2212
                                 (name, msg))
2213

    
2214
      if result.payload.is_degraded:
2215
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2216

    
2217
  def _RemoveOldStorage(self, node_name, iv_names):
2218
    for name, (_, old_lvs, _) in iv_names.iteritems():
2219
      self.lu.LogInfo("Remove logical volumes for %s", name)
2220

    
2221
      for lv in old_lvs:
2222
        self.cfg.SetDiskID(lv, node_name)
2223

    
2224
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2225
        if msg:
2226
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2227
                             hint="remove unused LVs manually")
2228

    
2229
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2230
    """Replace a disk on the primary or secondary for DRBD 8.
2231

2232
    The algorithm for replace is quite complicated:
2233

2234
      1. for each disk to be replaced:
2235

2236
        1. create new LVs on the target node with unique names
2237
        1. detach old LVs from the drbd device
2238
        1. rename old LVs to name_replaced.<time_t>
2239
        1. rename new LVs to old LVs
2240
        1. attach the new LVs (with the old names now) to the drbd device
2241

2242
      1. wait for sync across all devices
2243

2244
      1. for each modified disk:
2245

2246
        1. remove old LVs (which have the name name_replaces.<time_t>)
2247

2248
    Failures are not very well handled.
2249

2250
    """
2251
    steps_total = 6
2252

    
2253
    # Step: check device activation
2254
    self.lu.LogStep(1, steps_total, "Check device existence")
2255
    self._CheckDisksExistence([self.other_node, self.target_node])
2256
    self._CheckVolumeGroup([self.target_node, self.other_node])
2257

    
2258
    # Step: check other node consistency
2259
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2260
    self._CheckDisksConsistency(self.other_node,
2261
                                self.other_node == self.instance.primary_node,
2262
                                False)
2263

    
2264
    # Step: create new storage
2265
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2266
    iv_names = self._CreateNewStorage(self.target_node)
2267

    
2268
    # Step: for each lv, detach+rename*2+attach
2269
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2270
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2271
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2272

    
2273
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2274
                                                     old_lvs)
2275
      result.Raise("Can't detach drbd from local storage on node"
2276
                   " %s for device %s" % (self.target_node, dev.iv_name))
2277
      #dev.children = []
2278
      #cfg.Update(instance)
2279

    
2280
      # ok, we created the new LVs, so now we know we have the needed
2281
      # storage; as such, we proceed on the target node to rename
2282
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2283
      # using the assumption that logical_id == physical_id (which in
2284
      # turn is the unique_id on that node)
2285

    
2286
      # FIXME(iustin): use a better name for the replaced LVs
2287
      temp_suffix = int(time.time())
2288
      ren_fn = lambda d, suff: (d.physical_id[0],
2289
                                d.physical_id[1] + "_replaced-%s" % suff)
2290

    
2291
      # Build the rename list based on what LVs exist on the node
2292
      rename_old_to_new = []
2293
      for to_ren in old_lvs:
2294
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2295
        if not result.fail_msg and result.payload:
2296
          # device exists
2297
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2298

    
2299
      self.lu.LogInfo("Renaming the old LVs on the target node")
2300
      result = self.rpc.call_blockdev_rename(self.target_node,
2301
                                             rename_old_to_new)
2302
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2303

    
2304
      # Now we rename the new LVs to the old LVs
2305
      self.lu.LogInfo("Renaming the new LVs on the target node")
2306
      rename_new_to_old = [(new, old.physical_id)
2307
                           for old, new in zip(old_lvs, new_lvs)]
2308
      result = self.rpc.call_blockdev_rename(self.target_node,
2309
                                             rename_new_to_old)
2310
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2311

    
2312
      # Intermediate steps of in memory modifications
2313
      for old, new in zip(old_lvs, new_lvs):
2314
        new.logical_id = old.logical_id
2315
        self.cfg.SetDiskID(new, self.target_node)
2316

    
2317
      # We need to modify old_lvs so that removal later removes the
2318
      # right LVs, not the newly added ones; note that old_lvs is a
2319
      # copy here
2320
      for disk in old_lvs:
2321
        disk.logical_id = ren_fn(disk, temp_suffix)
2322
        self.cfg.SetDiskID(disk, self.target_node)
2323

    
2324
      # Now that the new lvs have the old name, we can add them to the device
2325
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2326
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2327
                                                  (dev, self.instance), new_lvs)
2328
      msg = result.fail_msg
2329
      if msg:
2330
        for new_lv in new_lvs:
2331
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2332
                                               new_lv).fail_msg
2333
          if msg2:
2334
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2335
                               hint=("cleanup manually the unused logical"
2336
                                     "volumes"))
2337
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2338

    
2339
    cstep = itertools.count(5)
2340

    
2341
    if self.early_release:
2342
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2343
      self._RemoveOldStorage(self.target_node, iv_names)
2344
      # TODO: Check if releasing locks early still makes sense
2345
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2346
    else:
2347
      # Release all resource locks except those used by the instance
2348
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2349
                   keep=self.node_secondary_ip.keys())
2350

    
2351
    # Release all node locks while waiting for sync
2352
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2353

    
2354
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2355
    # shutdown in the caller into consideration.
2356

    
2357
    # Wait for sync
2358
    # This can fail as the old devices are degraded and _WaitForSync
2359
    # does a combined result over all disks, so we don't check its return value
2360
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2361
    WaitForSync(self.lu, self.instance)
2362

    
2363
    # Check all devices manually
2364
    self._CheckDevices(self.instance.primary_node, iv_names)
2365

    
2366
    # Step: remove old storage
2367
    if not self.early_release:
2368
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2369
      self._RemoveOldStorage(self.target_node, iv_names)
2370

    
2371
  def _ExecDrbd8Secondary(self, feedback_fn):
2372
    """Replace the secondary node for DRBD 8.
2373

2374
    The algorithm for replace is quite complicated:
2375
      - for all disks of the instance:
2376
        - create new LVs on the new node with same names
2377
        - shutdown the drbd device on the old secondary
2378
        - disconnect the drbd network on the primary
2379
        - create the drbd device on the new secondary
2380
        - network attach the drbd on the primary, using an artifice:
2381
          the drbd code for Attach() will connect to the network if it
2382
          finds a device which is connected to the good local disks but
2383
          not network enabled
2384
      - wait for sync across all devices
2385
      - remove all disks from the old secondary
2386

2387
    Failures are not very well handled.
2388

2389
    """
2390
    steps_total = 6
2391

    
2392
    pnode = self.instance.primary_node
2393

    
2394
    # Step: check device activation
2395
    self.lu.LogStep(1, steps_total, "Check device existence")
2396
    self._CheckDisksExistence([self.instance.primary_node])
2397
    self._CheckVolumeGroup([self.instance.primary_node])
2398

    
2399
    # Step: check other node consistency
2400
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2401
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2402

    
2403
    # Step: create new storage
2404
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2405
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2406
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2407
    for idx, dev in enumerate(disks):
2408
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2409
                      (self.new_node, idx))
2410
      # we pass force_create=True to force LVM creation
2411
      for new_lv in dev.children:
2412
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2413
                             True, GetInstanceInfoText(self.instance), False,
2414
                             excl_stor)
2415

    
2416
    # Step 4: dbrd minors and drbd setups changes
2417
    # after this, we must manually remove the drbd minors on both the
2418
    # error and the success paths
2419
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2420
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2421
                                         for dev in self.instance.disks],
2422
                                        self.instance.name)
2423
    logging.debug("Allocated minors %r", minors)
2424

    
2425
    iv_names = {}
2426
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2427
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2428
                      (self.new_node, idx))
2429
      # create new devices on new_node; note that we create two IDs:
2430
      # one without port, so the drbd will be activated without
2431
      # networking information on the new node at this stage, and one
2432
      # with network, for the latter activation in step 4
2433
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2434
      if self.instance.primary_node == o_node1:
2435
        p_minor = o_minor1
2436
      else:
2437
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2438
        p_minor = o_minor2
2439

    
2440
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2441
                      p_minor, new_minor, o_secret)
2442
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2443
                    p_minor, new_minor, o_secret)
2444

    
2445
      iv_names[idx] = (dev, dev.children, new_net_id)
2446
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2447
                    new_net_id)
2448
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2449
                              logical_id=new_alone_id,
2450
                              children=dev.children,
2451
                              size=dev.size,
2452
                              params={})
2453
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2454
                                            self.cfg)
2455
      try:
2456
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2457
                             anno_new_drbd,
2458
                             GetInstanceInfoText(self.instance), False,
2459
                             excl_stor)
2460
      except errors.GenericError:
2461
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2462
        raise
2463

    
2464
    # We have new devices, shutdown the drbd on the old secondary
2465
    for idx, dev in enumerate(self.instance.disks):
2466
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2467
      self.cfg.SetDiskID(dev, self.target_node)
2468
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2469
                                            (dev, self.instance)).fail_msg
2470
      if msg:
2471
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2472
                           "node: %s" % (idx, msg),
2473
                           hint=("Please cleanup this device manually as"
2474
                                 " soon as possible"))
2475

    
2476
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2477
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2478
                                               self.instance.disks)[pnode]
2479

    
2480
    msg = result.fail_msg
2481
    if msg:
2482
      # detaches didn't succeed (unlikely)
2483
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2484
      raise errors.OpExecError("Can't detach the disks from the network on"
2485
                               " old node: %s" % (msg,))
2486

    
2487
    # if we managed to detach at least one, we update all the disks of
2488
    # the instance to point to the new secondary
2489
    self.lu.LogInfo("Updating instance configuration")
2490
    for dev, _, new_logical_id in iv_names.itervalues():
2491
      dev.logical_id = new_logical_id
2492
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2493

    
2494
    self.cfg.Update(self.instance, feedback_fn)
2495

    
2496
    # Release all node locks (the configuration has been updated)
2497
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2498

    
2499
    # and now perform the drbd attach
2500
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2501
                    " (standalone => connected)")
2502
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2503
                                            self.new_node],
2504
                                           self.node_secondary_ip,
2505
                                           (self.instance.disks, self.instance),
2506
                                           self.instance.name,
2507
                                           False)
2508
    for to_node, to_result in result.items():
2509
      msg = to_result.fail_msg
2510
      if msg:
2511
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2512
                           to_node, msg,
2513
                           hint=("please do a gnt-instance info to see the"
2514
                                 " status of disks"))
2515

    
2516
    cstep = itertools.count(5)
2517

    
2518
    if self.early_release:
2519
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2520
      self._RemoveOldStorage(self.target_node, iv_names)
2521
      # TODO: Check if releasing locks early still makes sense
2522
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2523
    else:
2524
      # Release all resource locks except those used by the instance
2525
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2526
                   keep=self.node_secondary_ip.keys())
2527

    
2528
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2529
    # shutdown in the caller into consideration.
2530

    
2531
    # Wait for sync
2532
    # This can fail as the old devices are degraded and _WaitForSync
2533
    # does a combined result over all disks, so we don't check its return value
2534
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2535
    WaitForSync(self.lu, self.instance)
2536

    
2537
    # Check all devices manually
2538
    self._CheckDevices(self.instance.primary_node, iv_names)
2539

    
2540
    # Step: remove old storage
2541
    if not self.early_release:
2542
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2543
      self._RemoveOldStorage(self.target_node, iv_names)