Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 0e514de1

History | View | Annotate | Download (94.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    for key in [
347
      constants.IDISK_METAVG,
348
      constants.IDISK_ADOPT,
349
      constants.IDISK_SPINDLES,
350
      ]:
351
      if key in disk:
352
        new_disk[key] = disk[key]
353

    
354
    # For extstorage, demand the `provider' option and add any
355
    # additional parameters (ext-params) to the dict
356
    if op.disk_template == constants.DT_EXT:
357
      if ext_provider:
358
        new_disk[constants.IDISK_PROVIDER] = ext_provider
359
        for key in disk:
360
          if key not in constants.IDISK_PARAMS:
361
            new_disk[key] = disk[key]
362
      else:
363
        raise errors.OpPrereqError("Missing provider for template '%s'" %
364
                                   constants.DT_EXT, errors.ECODE_INVAL)
365

    
366
    disks.append(new_disk)
367

    
368
  return disks
369

    
370

    
371
def CheckRADOSFreeSpace():
372
  """Compute disk size requirements inside the RADOS cluster.
373

374
  """
375
  # For the RADOS cluster we assume there is always enough space.
376
  pass
377

    
378

    
379
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380
                         iv_name, p_minor, s_minor):
381
  """Generate a drbd8 device complete with its children.
382

383
  """
384
  assert len(vgnames) == len(names) == 2
385
  port = lu.cfg.AllocatePort()
386
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387

    
388
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389
                          logical_id=(vgnames[0], names[0]),
390
                          params={})
391
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
393
                          size=constants.DRBD_META_SIZE,
394
                          logical_id=(vgnames[1], names[1]),
395
                          params={})
396
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398
                          logical_id=(primary, secondary, port,
399
                                      p_minor, s_minor,
400
                                      shared_secret),
401
                          children=[dev_data, dev_meta],
402
                          iv_name=iv_name, params={})
403
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404
  return drbd_dev
405

    
406

    
407
def GenerateDiskTemplate(
408
  lu, template_name, instance_name, primary_node, secondary_nodes,
409
  disk_info, file_storage_dir, file_driver, base_index,
410
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412
  """Generate the entire disk layout for a given template type.
413

414
  """
415
  vgname = lu.cfg.GetVGName()
416
  disk_count = len(disk_info)
417
  disks = []
418

    
419
  if template_name == constants.DT_DISKLESS:
420
    pass
421
  elif template_name == constants.DT_DRBD8:
422
    if len(secondary_nodes) != 1:
423
      raise errors.ProgrammerError("Wrong template configuration")
424
    remote_node = secondary_nodes[0]
425
    minors = lu.cfg.AllocateDRBDMinor(
426
      [primary_node, remote_node] * len(disk_info), instance_name)
427

    
428
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429
                                                       full_disk_params)
430
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431

    
432
    names = []
433
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434
                                               for i in range(disk_count)]):
435
      names.append(lv_prefix + "_data")
436
      names.append(lv_prefix + "_meta")
437
    for idx, disk in enumerate(disk_info):
438
      disk_index = idx + base_index
439
      data_vg = disk.get(constants.IDISK_VG, vgname)
440
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442
                                      disk[constants.IDISK_SIZE],
443
                                      [data_vg, meta_vg],
444
                                      names[idx * 2:idx * 2 + 2],
445
                                      "disk/%d" % disk_index,
446
                                      minors[idx * 2], minors[idx * 2 + 1])
447
      disk_dev.mode = disk[constants.IDISK_MODE]
448
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
449
      disks.append(disk_dev)
450
  else:
451
    if secondary_nodes:
452
      raise errors.ProgrammerError("Wrong template configuration")
453

    
454
    if template_name == constants.DT_FILE:
455
      _req_file_storage()
456
    elif template_name == constants.DT_SHARED_FILE:
457
      _req_shr_file_storage()
458

    
459
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460
    if name_prefix is None:
461
      names = None
462
    else:
463
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464
                                        (name_prefix, base_index + i)
465
                                        for i in range(disk_count)])
466

    
467
    if template_name == constants.DT_PLAIN:
468

    
469
      def logical_id_fn(idx, _, disk):
470
        vg = disk.get(constants.IDISK_VG, vgname)
471
        return (vg, names[idx])
472

    
473
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474
      logical_id_fn = \
475
        lambda _, disk_index, disk: (file_driver,
476
                                     "%s/disk%d" % (file_storage_dir,
477
                                                    disk_index))
478
    elif template_name == constants.DT_BLOCK:
479
      logical_id_fn = \
480
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481
                                       disk[constants.IDISK_ADOPT])
482
    elif template_name == constants.DT_RBD:
483
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484
    elif template_name == constants.DT_EXT:
485
      def logical_id_fn(idx, _, disk):
486
        provider = disk.get(constants.IDISK_PROVIDER, None)
487
        if provider is None:
488
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489
                                       " not found", constants.DT_EXT,
490
                                       constants.IDISK_PROVIDER)
491
        return (provider, names[idx])
492
    else:
493
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494

    
495
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496

    
497
    for idx, disk in enumerate(disk_info):
498
      params = {}
499
      # Only for the Ext template add disk_info to params
500
      if template_name == constants.DT_EXT:
501
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502
        for key in disk:
503
          if key not in constants.IDISK_PARAMS:
504
            params[key] = disk[key]
505
      disk_index = idx + base_index
506
      size = disk[constants.IDISK_SIZE]
507
      feedback_fn("* disk %s, size %s" %
508
                  (disk_index, utils.FormatUnit(size, "h")))
509
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
510
                              logical_id=logical_id_fn(idx, disk_index, disk),
511
                              iv_name="disk/%d" % disk_index,
512
                              mode=disk[constants.IDISK_MODE],
513
                              params=params,
514
                              spindles=disk.get(constants.IDISK_SPINDLES))
515
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
516
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517
      disks.append(disk_dev)
518

    
519
  return disks
520

    
521

    
522
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
523
  """Check the presence of the spindle options with exclusive_storage.
524

525
  @type diskdict: dict
526
  @param diskdict: disk parameters
527
  @type es_flag: bool
528
  @param es_flag: the effective value of the exlusive_storage flag
529
  @type required: bool
530
  @param required: whether spindles are required or just optional
531
  @raise errors.OpPrereqError when spindles are given and they should not
532

533
  """
534
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
535
      diskdict[constants.IDISK_SPINDLES] is not None):
536
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
537
                               " when exclusive storage is not active",
538
                               errors.ECODE_INVAL)
539
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
540
                                diskdict[constants.IDISK_SPINDLES] is None)):
541
    raise errors.OpPrereqError("You must specify spindles in instance disks"
542
                               " when exclusive storage is active",
543
                               errors.ECODE_INVAL)
544

    
545

    
546
class LUInstanceRecreateDisks(LogicalUnit):
547
  """Recreate an instance's missing disks.
548

549
  """
550
  HPATH = "instance-recreate-disks"
551
  HTYPE = constants.HTYPE_INSTANCE
552
  REQ_BGL = False
553

    
554
  _MODIFYABLE = compat.UniqueFrozenset([
555
    constants.IDISK_SIZE,
556
    constants.IDISK_MODE,
557
    constants.IDISK_SPINDLES,
558
    ])
559

    
560
  # New or changed disk parameters may have different semantics
561
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
562
    constants.IDISK_ADOPT,
563

    
564
    # TODO: Implement support changing VG while recreating
565
    constants.IDISK_VG,
566
    constants.IDISK_METAVG,
567
    constants.IDISK_PROVIDER,
568
    constants.IDISK_NAME,
569
    ]))
570

    
571
  def _RunAllocator(self):
572
    """Run the allocator based on input opcode.
573

574
    """
575
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
576

    
577
    # FIXME
578
    # The allocator should actually run in "relocate" mode, but current
579
    # allocators don't support relocating all the nodes of an instance at
580
    # the same time. As a workaround we use "allocate" mode, but this is
581
    # suboptimal for two reasons:
582
    # - The instance name passed to the allocator is present in the list of
583
    #   existing instances, so there could be a conflict within the
584
    #   internal structures of the allocator. This doesn't happen with the
585
    #   current allocators, but it's a liability.
586
    # - The allocator counts the resources used by the instance twice: once
587
    #   because the instance exists already, and once because it tries to
588
    #   allocate a new instance.
589
    # The allocator could choose some of the nodes on which the instance is
590
    # running, but that's not a problem. If the instance nodes are broken,
591
    # they should be already be marked as drained or offline, and hence
592
    # skipped by the allocator. If instance disks have been lost for other
593
    # reasons, then recreating the disks on the same nodes should be fine.
594
    disk_template = self.instance.disk_template
595
    spindle_use = be_full[constants.BE_SPINDLE_USE]
596
    disks = [{
597
      constants.IDISK_SIZE: d.size,
598
      constants.IDISK_MODE: d.mode,
599
      constants.IDISK_SPINDLES: d.spindles,
600
      } for d in self.instance.disks]
601
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
602
                                        disk_template=disk_template,
603
                                        tags=list(self.instance.GetTags()),
604
                                        os=self.instance.os,
605
                                        nics=[{}],
606
                                        vcpus=be_full[constants.BE_VCPUS],
607
                                        memory=be_full[constants.BE_MAXMEM],
608
                                        spindle_use=spindle_use,
609
                                        disks=disks,
610
                                        hypervisor=self.instance.hypervisor,
611
                                        node_whitelist=None)
612
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
613

    
614
    ial.Run(self.op.iallocator)
615

    
616
    assert req.RequiredNodes() == len(self.instance.all_nodes)
617

    
618
    if not ial.success:
619
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
620
                                 " %s" % (self.op.iallocator, ial.info),
621
                                 errors.ECODE_NORES)
622

    
623
    self.op.nodes = ial.result
624
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
625
                 self.op.instance_name, self.op.iallocator,
626
                 utils.CommaJoin(ial.result))
627

    
628
  def CheckArguments(self):
629
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
630
      # Normalize and convert deprecated list of disk indices
631
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
632

    
633
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
634
    if duplicates:
635
      raise errors.OpPrereqError("Some disks have been specified more than"
636
                                 " once: %s" % utils.CommaJoin(duplicates),
637
                                 errors.ECODE_INVAL)
638

    
639
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
640
    # when neither iallocator nor nodes are specified
641
    if self.op.iallocator or self.op.nodes:
642
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
643

    
644
    for (idx, params) in self.op.disks:
645
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
646
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
647
      if unsupported:
648
        raise errors.OpPrereqError("Parameters for disk %s try to change"
649
                                   " unmodifyable parameter(s): %s" %
650
                                   (idx, utils.CommaJoin(unsupported)),
651
                                   errors.ECODE_INVAL)
652

    
653
  def ExpandNames(self):
654
    self._ExpandAndLockInstance()
655
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
656

    
657
    if self.op.nodes:
658
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
659
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
660
    else:
661
      self.needed_locks[locking.LEVEL_NODE] = []
662
      if self.op.iallocator:
663
        # iallocator will select a new node in the same group
664
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
665
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
666

    
667
    self.needed_locks[locking.LEVEL_NODE_RES] = []
668

    
669
  def DeclareLocks(self, level):
670
    if level == locking.LEVEL_NODEGROUP:
671
      assert self.op.iallocator is not None
672
      assert not self.op.nodes
673
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
674
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
675
      # Lock the primary group used by the instance optimistically; this
676
      # requires going via the node before it's locked, requiring
677
      # verification later on
678
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
679
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
680

    
681
    elif level == locking.LEVEL_NODE:
682
      # If an allocator is used, then we lock all the nodes in the current
683
      # instance group, as we don't know yet which ones will be selected;
684
      # if we replace the nodes without using an allocator, locks are
685
      # already declared in ExpandNames; otherwise, we need to lock all the
686
      # instance nodes for disk re-creation
687
      if self.op.iallocator:
688
        assert not self.op.nodes
689
        assert not self.needed_locks[locking.LEVEL_NODE]
690
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
691

    
692
        # Lock member nodes of the group of the primary node
693
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
694
          self.needed_locks[locking.LEVEL_NODE].extend(
695
            self.cfg.GetNodeGroup(group_uuid).members)
696

    
697
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
698
      elif not self.op.nodes:
699
        self._LockInstancesNodes(primary_only=False)
700
    elif level == locking.LEVEL_NODE_RES:
701
      # Copy node locks
702
      self.needed_locks[locking.LEVEL_NODE_RES] = \
703
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    This runs on master, primary and secondary nodes of the instance.
709

710
    """
711
    return BuildInstanceHookEnvByObject(self, self.instance)
712

    
713
  def BuildHooksNodes(self):
714
    """Build hooks nodes.
715

716
    """
717
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
718
    return (nl, nl)
719

    
720
  def CheckPrereq(self):
721
    """Check prerequisites.
722

723
    This checks that the instance is in the cluster and is not running.
724

725
    """
726
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
727
    assert instance is not None, \
728
      "Cannot retrieve locked instance %s" % self.op.instance_name
729
    if self.op.nodes:
730
      if len(self.op.nodes) != len(instance.all_nodes):
731
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
732
                                   " %d replacement nodes were specified" %
733
                                   (instance.name, len(instance.all_nodes),
734
                                    len(self.op.nodes)),
735
                                   errors.ECODE_INVAL)
736
      assert instance.disk_template != constants.DT_DRBD8 or \
737
             len(self.op.nodes) == 2
738
      assert instance.disk_template != constants.DT_PLAIN or \
739
             len(self.op.nodes) == 1
740
      primary_node = self.op.nodes[0]
741
    else:
742
      primary_node = instance.primary_node
743
    if not self.op.iallocator:
744
      CheckNodeOnline(self, primary_node)
745

    
746
    if instance.disk_template == constants.DT_DISKLESS:
747
      raise errors.OpPrereqError("Instance '%s' has no disks" %
748
                                 self.op.instance_name, errors.ECODE_INVAL)
749

    
750
    # Verify if node group locks are still correct
751
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
752
    if owned_groups:
753
      # Node group locks are acquired only for the primary node (and only
754
      # when the allocator is used)
755
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
756
                              primary_only=True)
757

    
758
    # if we replace nodes *and* the old primary is offline, we don't
759
    # check the instance state
760
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
761
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
762
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
763
                         msg="cannot recreate disks")
764

    
765
    if self.op.disks:
766
      self.disks = dict(self.op.disks)
767
    else:
768
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
769

    
770
    maxidx = max(self.disks.keys())
771
    if maxidx >= len(instance.disks):
772
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
773
                                 errors.ECODE_INVAL)
774

    
775
    if ((self.op.nodes or self.op.iallocator) and
776
         sorted(self.disks.keys()) != range(len(instance.disks))):
777
      raise errors.OpPrereqError("Can't recreate disks partially and"
778
                                 " change the nodes at the same time",
779
                                 errors.ECODE_INVAL)
780

    
781
    self.instance = instance
782

    
783
    if self.op.iallocator:
784
      self._RunAllocator()
785
      # Release unneeded node and node resource locks
786
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
787
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
788
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
789

    
790
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
791

    
792
    if self.op.nodes:
793
      nodes = self.op.nodes
794
    else:
795
      nodes = instance.all_nodes
796
    excl_stor = compat.any(
797
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
798
      )
799
    for new_params in self.disks.values():
800
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
801

    
802
  def Exec(self, feedback_fn):
803
    """Recreate the disks.
804

805
    """
806
    instance = self.instance
807

    
808
    assert (self.owned_locks(locking.LEVEL_NODE) ==
809
            self.owned_locks(locking.LEVEL_NODE_RES))
810

    
811
    to_skip = []
812
    mods = [] # keeps track of needed changes
813

    
814
    for idx, disk in enumerate(instance.disks):
815
      try:
816
        changes = self.disks[idx]
817
      except KeyError:
818
        # Disk should not be recreated
819
        to_skip.append(idx)
820
        continue
821

    
822
      # update secondaries for disks, if needed
823
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
824
        # need to update the nodes and minors
825
        assert len(self.op.nodes) == 2
826
        assert len(disk.logical_id) == 6 # otherwise disk internals
827
                                         # have changed
828
        (_, _, old_port, _, _, old_secret) = disk.logical_id
829
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
830
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
831
                  new_minors[0], new_minors[1], old_secret)
832
        assert len(disk.logical_id) == len(new_id)
833
      else:
834
        new_id = None
835

    
836
      mods.append((idx, new_id, changes))
837

    
838
    # now that we have passed all asserts above, we can apply the mods
839
    # in a single run (to avoid partial changes)
840
    for idx, new_id, changes in mods:
841
      disk = instance.disks[idx]
842
      if new_id is not None:
843
        assert disk.dev_type == constants.LD_DRBD8
844
        disk.logical_id = new_id
845
      if changes:
846
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
847
                    mode=changes.get(constants.IDISK_MODE, None),
848
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
849

    
850
    # change primary node, if needed
851
    if self.op.nodes:
852
      instance.primary_node = self.op.nodes[0]
853
      self.LogWarning("Changing the instance's nodes, you will have to"
854
                      " remove any disks left on the older nodes manually")
855

    
856
    if self.op.nodes:
857
      self.cfg.Update(instance, feedback_fn)
858

    
859
    # All touched nodes must be locked
860
    mylocks = self.owned_locks(locking.LEVEL_NODE)
861
    assert mylocks.issuperset(frozenset(instance.all_nodes))
862
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
863

    
864
    # TODO: Release node locks before wiping, or explain why it's not possible
865
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
866
      wipedisks = [(idx, disk, 0)
867
                   for (idx, disk) in enumerate(instance.disks)
868
                   if idx not in to_skip]
869
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
870

    
871

    
872
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
873
  """Checks if nodes have enough free disk space in the specified VG.
874

875
  This function checks if all given nodes have the needed amount of
876
  free disk. In case any node has less disk or we cannot get the
877
  information from the node, this function raises an OpPrereqError
878
  exception.
879

880
  @type lu: C{LogicalUnit}
881
  @param lu: a logical unit from which we get configuration data
882
  @type nodenames: C{list}
883
  @param nodenames: the list of node names to check
884
  @type vg: C{str}
885
  @param vg: the volume group to check
886
  @type requested: C{int}
887
  @param requested: the amount of disk in MiB to check for
888
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
889
      or we cannot check the node
890

891
  """
892
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
893
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
894
  # the current functionality. Refactor to make it more flexible.
895
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
896
                                   es_flags)
897
  for node in nodenames:
898
    info = nodeinfo[node]
899
    info.Raise("Cannot get current information from node %s" % node,
900
               prereq=True, ecode=errors.ECODE_ENVIRON)
901
    (_, (vg_info, ), _) = info.payload
902
    vg_free = vg_info.get("vg_free", None)
903
    if not isinstance(vg_free, int):
904
      raise errors.OpPrereqError("Can't compute free disk space on node"
905
                                 " %s for vg %s, result was '%s'" %
906
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
907
    if requested > vg_free:
908
      raise errors.OpPrereqError("Not enough disk space on target node %s"
909
                                 " vg %s: required %d MiB, available %d MiB" %
910
                                 (node, vg, requested, vg_free),
911
                                 errors.ECODE_NORES)
912

    
913

    
914
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
915
  """Checks if nodes have enough free disk space in all the VGs.
916

917
  This function checks if all given nodes have the needed amount of
918
  free disk. In case any node has less disk or we cannot get the
919
  information from the node, this function raises an OpPrereqError
920
  exception.
921

922
  @type lu: C{LogicalUnit}
923
  @param lu: a logical unit from which we get configuration data
924
  @type nodenames: C{list}
925
  @param nodenames: the list of node names to check
926
  @type req_sizes: C{dict}
927
  @param req_sizes: the hash of vg and corresponding amount of disk in
928
      MiB to check for
929
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
930
      or we cannot check the node
931

932
  """
933
  for vg, req_size in req_sizes.items():
934
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
935

    
936

    
937
def _DiskSizeInBytesToMebibytes(lu, size):
938
  """Converts a disk size in bytes to mebibytes.
939

940
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
941

942
  """
943
  (mib, remainder) = divmod(size, 1024 * 1024)
944

    
945
  if remainder != 0:
946
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
947
                  " to not overwrite existing data (%s bytes will not be"
948
                  " wiped)", (1024 * 1024) - remainder)
949
    mib += 1
950

    
951
  return mib
952

    
953

    
954
def _CalcEta(time_taken, written, total_size):
955
  """Calculates the ETA based on size written and total size.
956

957
  @param time_taken: The time taken so far
958
  @param written: amount written so far
959
  @param total_size: The total size of data to be written
960
  @return: The remaining time in seconds
961

962
  """
963
  avg_time = time_taken / float(written)
964
  return (total_size - written) * avg_time
965

    
966

    
967
def WipeDisks(lu, instance, disks=None):
968
  """Wipes instance disks.
969

970
  @type lu: L{LogicalUnit}
971
  @param lu: the logical unit on whose behalf we execute
972
  @type instance: L{objects.Instance}
973
  @param instance: the instance whose disks we should create
974
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
975
  @param disks: Disk details; tuple contains disk index, disk object and the
976
    start offset
977

978
  """
979
  node = instance.primary_node
980

    
981
  if disks is None:
982
    disks = [(idx, disk, 0)
983
             for (idx, disk) in enumerate(instance.disks)]
984

    
985
  for (_, device, _) in disks:
986
    lu.cfg.SetDiskID(device, node)
987

    
988
  logging.info("Pausing synchronization of disks of instance '%s'",
989
               instance.name)
990
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
991
                                                  (map(compat.snd, disks),
992
                                                   instance),
993
                                                  True)
994
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
995

    
996
  for idx, success in enumerate(result.payload):
997
    if not success:
998
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
999
                   " failed", idx, instance.name)
1000

    
1001
  try:
1002
    for (idx, device, offset) in disks:
1003
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1004
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1005
      wipe_chunk_size = \
1006
        int(min(constants.MAX_WIPE_CHUNK,
1007
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1008

    
1009
      size = device.size
1010
      last_output = 0
1011
      start_time = time.time()
1012

    
1013
      if offset == 0:
1014
        info_text = ""
1015
      else:
1016
        info_text = (" (from %s to %s)" %
1017
                     (utils.FormatUnit(offset, "h"),
1018
                      utils.FormatUnit(size, "h")))
1019

    
1020
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1021

    
1022
      logging.info("Wiping disk %d for instance %s on node %s using"
1023
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1024

    
1025
      while offset < size:
1026
        wipe_size = min(wipe_chunk_size, size - offset)
1027

    
1028
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1029
                      idx, offset, wipe_size)
1030

    
1031
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1032
                                           wipe_size)
1033
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1034
                     (idx, offset, wipe_size))
1035

    
1036
        now = time.time()
1037
        offset += wipe_size
1038
        if now - last_output >= 60:
1039
          eta = _CalcEta(now - start_time, offset, size)
1040
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1041
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1042
          last_output = now
1043
  finally:
1044
    logging.info("Resuming synchronization of disks for instance '%s'",
1045
                 instance.name)
1046

    
1047
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1048
                                                    (map(compat.snd, disks),
1049
                                                     instance),
1050
                                                    False)
1051

    
1052
    if result.fail_msg:
1053
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1054
                    node, result.fail_msg)
1055
    else:
1056
      for idx, success in enumerate(result.payload):
1057
        if not success:
1058
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1059
                        " failed", idx, instance.name)
1060

    
1061

    
1062
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1063
  """Wrapper for L{WipeDisks} that handles errors.
1064

1065
  @type lu: L{LogicalUnit}
1066
  @param lu: the logical unit on whose behalf we execute
1067
  @type instance: L{objects.Instance}
1068
  @param instance: the instance whose disks we should wipe
1069
  @param disks: see L{WipeDisks}
1070
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1071
      case of error
1072
  @raise errors.OpPrereqError: in case of failure
1073

1074
  """
1075
  try:
1076
    WipeDisks(lu, instance, disks=disks)
1077
  except errors.OpExecError:
1078
    logging.warning("Wiping disks for instance '%s' failed",
1079
                    instance.name)
1080
    _UndoCreateDisks(lu, cleanup)
1081
    raise
1082

    
1083

    
1084
def ExpandCheckDisks(instance, disks):
1085
  """Return the instance disks selected by the disks list
1086

1087
  @type disks: list of L{objects.Disk} or None
1088
  @param disks: selected disks
1089
  @rtype: list of L{objects.Disk}
1090
  @return: selected instance disks to act on
1091

1092
  """
1093
  if disks is None:
1094
    return instance.disks
1095
  else:
1096
    if not set(disks).issubset(instance.disks):
1097
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1098
                                   " target instance: expected a subset of %r,"
1099
                                   " got %r" % (instance.disks, disks))
1100
    return disks
1101

    
1102

    
1103
def WaitForSync(lu, instance, disks=None, oneshot=False):
1104
  """Sleep and poll for an instance's disk to sync.
1105

1106
  """
1107
  if not instance.disks or disks is not None and not disks:
1108
    return True
1109

    
1110
  disks = ExpandCheckDisks(instance, disks)
1111

    
1112
  if not oneshot:
1113
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1114

    
1115
  node = instance.primary_node
1116

    
1117
  for dev in disks:
1118
    lu.cfg.SetDiskID(dev, node)
1119

    
1120
  # TODO: Convert to utils.Retry
1121

    
1122
  retries = 0
1123
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1124
  while True:
1125
    max_time = 0
1126
    done = True
1127
    cumul_degraded = False
1128
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1129
    msg = rstats.fail_msg
1130
    if msg:
1131
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1132
      retries += 1
1133
      if retries >= 10:
1134
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1135
                                 " aborting." % node)
1136
      time.sleep(6)
1137
      continue
1138
    rstats = rstats.payload
1139
    retries = 0
1140
    for i, mstat in enumerate(rstats):
1141
      if mstat is None:
1142
        lu.LogWarning("Can't compute data for node %s/%s",
1143
                      node, disks[i].iv_name)
1144
        continue
1145

    
1146
      cumul_degraded = (cumul_degraded or
1147
                        (mstat.is_degraded and mstat.sync_percent is None))
1148
      if mstat.sync_percent is not None:
1149
        done = False
1150
        if mstat.estimated_time is not None:
1151
          rem_time = ("%s remaining (estimated)" %
1152
                      utils.FormatSeconds(mstat.estimated_time))
1153
          max_time = mstat.estimated_time
1154
        else:
1155
          rem_time = "no time estimate"
1156
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1157
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1158

    
1159
    # if we're done but degraded, let's do a few small retries, to
1160
    # make sure we see a stable and not transient situation; therefore
1161
    # we force restart of the loop
1162
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1163
      logging.info("Degraded disks found, %d retries left", degr_retries)
1164
      degr_retries -= 1
1165
      time.sleep(1)
1166
      continue
1167

    
1168
    if done or oneshot:
1169
      break
1170

    
1171
    time.sleep(min(60, max_time))
1172

    
1173
  if done:
1174
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1175

    
1176
  return not cumul_degraded
1177

    
1178

    
1179
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1180
  """Shutdown block devices of an instance.
1181

1182
  This does the shutdown on all nodes of the instance.
1183

1184
  If the ignore_primary is false, errors on the primary node are
1185
  ignored.
1186

1187
  """
1188
  lu.cfg.MarkInstanceDisksInactive(instance.name)
1189
  all_result = True
1190
  disks = ExpandCheckDisks(instance, disks)
1191

    
1192
  for disk in disks:
1193
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1194
      lu.cfg.SetDiskID(top_disk, node)
1195
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1196
      msg = result.fail_msg
1197
      if msg:
1198
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1199
                      disk.iv_name, node, msg)
1200
        if ((node == instance.primary_node and not ignore_primary) or
1201
            (node != instance.primary_node and not result.offline)):
1202
          all_result = False
1203
  return all_result
1204

    
1205

    
1206
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1207
  """Shutdown block devices of an instance.
1208

1209
  This function checks if an instance is running, before calling
1210
  _ShutdownInstanceDisks.
1211

1212
  """
1213
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1214
  ShutdownInstanceDisks(lu, instance, disks=disks)
1215

    
1216

    
1217
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1218
                           ignore_size=False):
1219
  """Prepare the block devices for an instance.
1220

1221
  This sets up the block devices on all nodes.
1222

1223
  @type lu: L{LogicalUnit}
1224
  @param lu: the logical unit on whose behalf we execute
1225
  @type instance: L{objects.Instance}
1226
  @param instance: the instance for whose disks we assemble
1227
  @type disks: list of L{objects.Disk} or None
1228
  @param disks: which disks to assemble (or all, if None)
1229
  @type ignore_secondaries: boolean
1230
  @param ignore_secondaries: if true, errors on secondary nodes
1231
      won't result in an error return from the function
1232
  @type ignore_size: boolean
1233
  @param ignore_size: if true, the current known size of the disk
1234
      will not be used during the disk activation, useful for cases
1235
      when the size is wrong
1236
  @return: False if the operation failed, otherwise a list of
1237
      (host, instance_visible_name, node_visible_name)
1238
      with the mapping from node devices to instance devices
1239

1240
  """
1241
  device_info = []
1242
  disks_ok = True
1243
  iname = instance.name
1244
  disks = ExpandCheckDisks(instance, disks)
1245

    
1246
  # With the two passes mechanism we try to reduce the window of
1247
  # opportunity for the race condition of switching DRBD to primary
1248
  # before handshaking occured, but we do not eliminate it
1249

    
1250
  # The proper fix would be to wait (with some limits) until the
1251
  # connection has been made and drbd transitions from WFConnection
1252
  # into any other network-connected state (Connected, SyncTarget,
1253
  # SyncSource, etc.)
1254

    
1255
  # mark instance disks as active before doing actual work, so watcher does
1256
  # not try to shut them down erroneously
1257
  lu.cfg.MarkInstanceDisksActive(iname)
1258

    
1259
  # 1st pass, assemble on all nodes in secondary mode
1260
  for idx, inst_disk in enumerate(disks):
1261
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1262
      if ignore_size:
1263
        node_disk = node_disk.Copy()
1264
        node_disk.UnsetSize()
1265
      lu.cfg.SetDiskID(node_disk, node)
1266
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1267
                                             False, idx)
1268
      msg = result.fail_msg
1269
      if msg:
1270
        is_offline_secondary = (node in instance.secondary_nodes and
1271
                                result.offline)
1272
        lu.LogWarning("Could not prepare block device %s on node %s"
1273
                      " (is_primary=False, pass=1): %s",
1274
                      inst_disk.iv_name, node, msg)
1275
        if not (ignore_secondaries or is_offline_secondary):
1276
          disks_ok = False
1277

    
1278
  # FIXME: race condition on drbd migration to primary
1279

    
1280
  # 2nd pass, do only the primary node
1281
  for idx, inst_disk in enumerate(disks):
1282
    dev_path = None
1283

    
1284
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1285
      if node != instance.primary_node:
1286
        continue
1287
      if ignore_size:
1288
        node_disk = node_disk.Copy()
1289
        node_disk.UnsetSize()
1290
      lu.cfg.SetDiskID(node_disk, node)
1291
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1292
                                             True, idx)
1293
      msg = result.fail_msg
1294
      if msg:
1295
        lu.LogWarning("Could not prepare block device %s on node %s"
1296
                      " (is_primary=True, pass=2): %s",
1297
                      inst_disk.iv_name, node, msg)
1298
        disks_ok = False
1299
      else:
1300
        dev_path = result.payload
1301

    
1302
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1303

    
1304
  # leave the disks configured for the primary node
1305
  # this is a workaround that would be fixed better by
1306
  # improving the logical/physical id handling
1307
  for disk in disks:
1308
    lu.cfg.SetDiskID(disk, instance.primary_node)
1309

    
1310
  if not disks_ok:
1311
    lu.cfg.MarkInstanceDisksInactive(iname)
1312

    
1313
  return disks_ok, device_info
1314

    
1315

    
1316
def StartInstanceDisks(lu, instance, force):
1317
  """Start the disks of an instance.
1318

1319
  """
1320
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1321
                                      ignore_secondaries=force)
1322
  if not disks_ok:
1323
    ShutdownInstanceDisks(lu, instance)
1324
    if force is not None and not force:
1325
      lu.LogWarning("",
1326
                    hint=("If the message above refers to a secondary node,"
1327
                          " you can retry the operation using '--force'"))
1328
    raise errors.OpExecError("Disk consistency error")
1329

    
1330

    
1331
class LUInstanceGrowDisk(LogicalUnit):
1332
  """Grow a disk of an instance.
1333

1334
  """
1335
  HPATH = "disk-grow"
1336
  HTYPE = constants.HTYPE_INSTANCE
1337
  REQ_BGL = False
1338

    
1339
  def ExpandNames(self):
1340
    self._ExpandAndLockInstance()
1341
    self.needed_locks[locking.LEVEL_NODE] = []
1342
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1343
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1344
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1345

    
1346
  def DeclareLocks(self, level):
1347
    if level == locking.LEVEL_NODE:
1348
      self._LockInstancesNodes()
1349
    elif level == locking.LEVEL_NODE_RES:
1350
      # Copy node locks
1351
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1352
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1353

    
1354
  def BuildHooksEnv(self):
1355
    """Build hooks env.
1356

1357
    This runs on the master, the primary and all the secondaries.
1358

1359
    """
1360
    env = {
1361
      "DISK": self.op.disk,
1362
      "AMOUNT": self.op.amount,
1363
      "ABSOLUTE": self.op.absolute,
1364
      }
1365
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1366
    return env
1367

    
1368
  def BuildHooksNodes(self):
1369
    """Build hooks nodes.
1370

1371
    """
1372
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1373
    return (nl, nl)
1374

    
1375
  def CheckPrereq(self):
1376
    """Check prerequisites.
1377

1378
    This checks that the instance is in the cluster.
1379

1380
    """
1381
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1382
    assert instance is not None, \
1383
      "Cannot retrieve locked instance %s" % self.op.instance_name
1384
    nodenames = list(instance.all_nodes)
1385
    for node in nodenames:
1386
      CheckNodeOnline(self, node)
1387

    
1388
    self.instance = instance
1389

    
1390
    if instance.disk_template not in constants.DTS_GROWABLE:
1391
      raise errors.OpPrereqError("Instance's disk layout does not support"
1392
                                 " growing", errors.ECODE_INVAL)
1393

    
1394
    self.disk = instance.FindDisk(self.op.disk)
1395

    
1396
    if self.op.absolute:
1397
      self.target = self.op.amount
1398
      self.delta = self.target - self.disk.size
1399
      if self.delta < 0:
1400
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1401
                                   "current disk size (%s)" %
1402
                                   (utils.FormatUnit(self.target, "h"),
1403
                                    utils.FormatUnit(self.disk.size, "h")),
1404
                                   errors.ECODE_STATE)
1405
    else:
1406
      self.delta = self.op.amount
1407
      self.target = self.disk.size + self.delta
1408
      if self.delta < 0:
1409
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1410
                                   utils.FormatUnit(self.delta, "h"),
1411
                                   errors.ECODE_INVAL)
1412

    
1413
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1414

    
1415
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1416
    template = self.instance.disk_template
1417
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1418
      # TODO: check the free disk space for file, when that feature will be
1419
      # supported
1420
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1421
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1422
                        nodes)
1423
      if es_nodes:
1424
        # With exclusive storage we need to something smarter than just looking
1425
        # at free space; for now, let's simply abort the operation.
1426
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1427
                                   " is enabled", errors.ECODE_STATE)
1428
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1429

    
1430
  def Exec(self, feedback_fn):
1431
    """Execute disk grow.
1432

1433
    """
1434
    instance = self.instance
1435
    disk = self.disk
1436

    
1437
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1438
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1439
            self.owned_locks(locking.LEVEL_NODE_RES))
1440

    
1441
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1442

    
1443
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1444
    if not disks_ok:
1445
      raise errors.OpExecError("Cannot activate block device to grow")
1446

    
1447
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1448
                (self.op.disk, instance.name,
1449
                 utils.FormatUnit(self.delta, "h"),
1450
                 utils.FormatUnit(self.target, "h")))
1451

    
1452
    # First run all grow ops in dry-run mode
1453
    for node in instance.all_nodes:
1454
      self.cfg.SetDiskID(disk, node)
1455
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1456
                                           True, True)
1457
      result.Raise("Dry-run grow request failed to node %s" % node)
1458

    
1459
    if wipe_disks:
1460
      # Get disk size from primary node for wiping
1461
      result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1462
                                                    [disk])
1463
      result.Raise("Failed to retrieve disk size from node '%s'" %
1464
                   instance.primary_node)
1465

    
1466
      (disk_dimensions, ) = result.payload
1467

    
1468
      if disk_dimensions is None:
1469
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1470
                                 " node '%s'" % instance.primary_node)
1471
      (disk_size_in_bytes, _) = disk_dimensions
1472

    
1473
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1474

    
1475
      assert old_disk_size >= disk.size, \
1476
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1477
         (old_disk_size, disk.size))
1478
    else:
1479
      old_disk_size = None
1480

    
1481
    # We know that (as far as we can test) operations across different
1482
    # nodes will succeed, time to run it for real on the backing storage
1483
    for node in instance.all_nodes:
1484
      self.cfg.SetDiskID(disk, node)
1485
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1486
                                           False, True)
1487
      result.Raise("Grow request failed to node %s" % node)
1488

    
1489
    # And now execute it for logical storage, on the primary node
1490
    node = instance.primary_node
1491
    self.cfg.SetDiskID(disk, node)
1492
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1493
                                         False, False)
1494
    result.Raise("Grow request failed to node %s" % node)
1495

    
1496
    disk.RecordGrow(self.delta)
1497
    self.cfg.Update(instance, feedback_fn)
1498

    
1499
    # Changes have been recorded, release node lock
1500
    ReleaseLocks(self, locking.LEVEL_NODE)
1501

    
1502
    # Downgrade lock while waiting for sync
1503
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1504

    
1505
    assert wipe_disks ^ (old_disk_size is None)
1506

    
1507
    if wipe_disks:
1508
      assert instance.disks[self.op.disk] == disk
1509

    
1510
      # Wipe newly added disk space
1511
      WipeDisks(self, instance,
1512
                disks=[(self.op.disk, disk, old_disk_size)])
1513

    
1514
    if self.op.wait_for_sync:
1515
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1516
      if disk_abort:
1517
        self.LogWarning("Disk syncing has not returned a good status; check"
1518
                        " the instance")
1519
      if not instance.disks_active:
1520
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1521
    elif not instance.disks_active:
1522
      self.LogWarning("Not shutting down the disk even if the instance is"
1523
                      " not supposed to be running because no wait for"
1524
                      " sync mode was requested")
1525

    
1526
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1527
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1528

    
1529

    
1530
class LUInstanceReplaceDisks(LogicalUnit):
1531
  """Replace the disks of an instance.
1532

1533
  """
1534
  HPATH = "mirrors-replace"
1535
  HTYPE = constants.HTYPE_INSTANCE
1536
  REQ_BGL = False
1537

    
1538
  def CheckArguments(self):
1539
    """Check arguments.
1540

1541
    """
1542
    remote_node = self.op.remote_node
1543
    ialloc = self.op.iallocator
1544
    if self.op.mode == constants.REPLACE_DISK_CHG:
1545
      if remote_node is None and ialloc is None:
1546
        raise errors.OpPrereqError("When changing the secondary either an"
1547
                                   " iallocator script must be used or the"
1548
                                   " new node given", errors.ECODE_INVAL)
1549
      else:
1550
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1551

    
1552
    elif remote_node is not None or ialloc is not None:
1553
      # Not replacing the secondary
1554
      raise errors.OpPrereqError("The iallocator and new node options can"
1555
                                 " only be used when changing the"
1556
                                 " secondary node", errors.ECODE_INVAL)
1557

    
1558
  def ExpandNames(self):
1559
    self._ExpandAndLockInstance()
1560

    
1561
    assert locking.LEVEL_NODE not in self.needed_locks
1562
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1563
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1564

    
1565
    assert self.op.iallocator is None or self.op.remote_node is None, \
1566
      "Conflicting options"
1567

    
1568
    if self.op.remote_node is not None:
1569
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1570

    
1571
      # Warning: do not remove the locking of the new secondary here
1572
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1573
      # currently it doesn't since parallel invocations of
1574
      # FindUnusedMinor will conflict
1575
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1576
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1577
    else:
1578
      self.needed_locks[locking.LEVEL_NODE] = []
1579
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1580

    
1581
      if self.op.iallocator is not None:
1582
        # iallocator will select a new node in the same group
1583
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1584
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1585

    
1586
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1587

    
1588
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1589
                                   self.op.iallocator, self.op.remote_node,
1590
                                   self.op.disks, self.op.early_release,
1591
                                   self.op.ignore_ipolicy)
1592

    
1593
    self.tasklets = [self.replacer]
1594

    
1595
  def DeclareLocks(self, level):
1596
    if level == locking.LEVEL_NODEGROUP:
1597
      assert self.op.remote_node is None
1598
      assert self.op.iallocator is not None
1599
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1600

    
1601
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1602
      # Lock all groups used by instance optimistically; this requires going
1603
      # via the node before it's locked, requiring verification later on
1604
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1605
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1606

    
1607
    elif level == locking.LEVEL_NODE:
1608
      if self.op.iallocator is not None:
1609
        assert self.op.remote_node is None
1610
        assert not self.needed_locks[locking.LEVEL_NODE]
1611
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1612

    
1613
        # Lock member nodes of all locked groups
1614
        self.needed_locks[locking.LEVEL_NODE] = \
1615
          [node_name
1616
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1617
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1618
      else:
1619
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1620

    
1621
        self._LockInstancesNodes()
1622

    
1623
    elif level == locking.LEVEL_NODE_RES:
1624
      # Reuse node locks
1625
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1626
        self.needed_locks[locking.LEVEL_NODE]
1627

    
1628
  def BuildHooksEnv(self):
1629
    """Build hooks env.
1630

1631
    This runs on the master, the primary and all the secondaries.
1632

1633
    """
1634
    instance = self.replacer.instance
1635
    env = {
1636
      "MODE": self.op.mode,
1637
      "NEW_SECONDARY": self.op.remote_node,
1638
      "OLD_SECONDARY": instance.secondary_nodes[0],
1639
      }
1640
    env.update(BuildInstanceHookEnvByObject(self, instance))
1641
    return env
1642

    
1643
  def BuildHooksNodes(self):
1644
    """Build hooks nodes.
1645

1646
    """
1647
    instance = self.replacer.instance
1648
    nl = [
1649
      self.cfg.GetMasterNode(),
1650
      instance.primary_node,
1651
      ]
1652
    if self.op.remote_node is not None:
1653
      nl.append(self.op.remote_node)
1654
    return nl, nl
1655

    
1656
  def CheckPrereq(self):
1657
    """Check prerequisites.
1658

1659
    """
1660
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1661
            self.op.iallocator is None)
1662

    
1663
    # Verify if node group locks are still correct
1664
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1665
    if owned_groups:
1666
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1667

    
1668
    return LogicalUnit.CheckPrereq(self)
1669

    
1670

    
1671
class LUInstanceActivateDisks(NoHooksLU):
1672
  """Bring up an instance's disks.
1673

1674
  """
1675
  REQ_BGL = False
1676

    
1677
  def ExpandNames(self):
1678
    self._ExpandAndLockInstance()
1679
    self.needed_locks[locking.LEVEL_NODE] = []
1680
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1681

    
1682
  def DeclareLocks(self, level):
1683
    if level == locking.LEVEL_NODE:
1684
      self._LockInstancesNodes()
1685

    
1686
  def CheckPrereq(self):
1687
    """Check prerequisites.
1688

1689
    This checks that the instance is in the cluster.
1690

1691
    """
1692
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1693
    assert self.instance is not None, \
1694
      "Cannot retrieve locked instance %s" % self.op.instance_name
1695
    CheckNodeOnline(self, self.instance.primary_node)
1696

    
1697
  def Exec(self, feedback_fn):
1698
    """Activate the disks.
1699

1700
    """
1701
    disks_ok, disks_info = \
1702
              AssembleInstanceDisks(self, self.instance,
1703
                                    ignore_size=self.op.ignore_size)
1704
    if not disks_ok:
1705
      raise errors.OpExecError("Cannot activate block devices")
1706

    
1707
    if self.op.wait_for_sync:
1708
      if not WaitForSync(self, self.instance):
1709
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1710
        raise errors.OpExecError("Some disks of the instance are degraded!")
1711

    
1712
    return disks_info
1713

    
1714

    
1715
class LUInstanceDeactivateDisks(NoHooksLU):
1716
  """Shutdown an instance's disks.
1717

1718
  """
1719
  REQ_BGL = False
1720

    
1721
  def ExpandNames(self):
1722
    self._ExpandAndLockInstance()
1723
    self.needed_locks[locking.LEVEL_NODE] = []
1724
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1725

    
1726
  def DeclareLocks(self, level):
1727
    if level == locking.LEVEL_NODE:
1728
      self._LockInstancesNodes()
1729

    
1730
  def CheckPrereq(self):
1731
    """Check prerequisites.
1732

1733
    This checks that the instance is in the cluster.
1734

1735
    """
1736
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1737
    assert self.instance is not None, \
1738
      "Cannot retrieve locked instance %s" % self.op.instance_name
1739

    
1740
  def Exec(self, feedback_fn):
1741
    """Deactivate the disks
1742

1743
    """
1744
    instance = self.instance
1745
    if self.op.force:
1746
      ShutdownInstanceDisks(self, instance)
1747
    else:
1748
      _SafeShutdownInstanceDisks(self, instance)
1749

    
1750

    
1751
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1752
                               ldisk=False):
1753
  """Check that mirrors are not degraded.
1754

1755
  @attention: The device has to be annotated already.
1756

1757
  The ldisk parameter, if True, will change the test from the
1758
  is_degraded attribute (which represents overall non-ok status for
1759
  the device(s)) to the ldisk (representing the local storage status).
1760

1761
  """
1762
  lu.cfg.SetDiskID(dev, node)
1763

    
1764
  result = True
1765

    
1766
  if on_primary or dev.AssembleOnSecondary():
1767
    rstats = lu.rpc.call_blockdev_find(node, dev)
1768
    msg = rstats.fail_msg
1769
    if msg:
1770
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1771
      result = False
1772
    elif not rstats.payload:
1773
      lu.LogWarning("Can't find disk on node %s", node)
1774
      result = False
1775
    else:
1776
      if ldisk:
1777
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1778
      else:
1779
        result = result and not rstats.payload.is_degraded
1780

    
1781
  if dev.children:
1782
    for child in dev.children:
1783
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1784
                                                     on_primary)
1785

    
1786
  return result
1787

    
1788

    
1789
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1790
  """Wrapper around L{_CheckDiskConsistencyInner}.
1791

1792
  """
1793
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1794
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1795
                                    ldisk=ldisk)
1796

    
1797

    
1798
def _BlockdevFind(lu, node, dev, instance):
1799
  """Wrapper around call_blockdev_find to annotate diskparams.
1800

1801
  @param lu: A reference to the lu object
1802
  @param node: The node to call out
1803
  @param dev: The device to find
1804
  @param instance: The instance object the device belongs to
1805
  @returns The result of the rpc call
1806

1807
  """
1808
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1809
  return lu.rpc.call_blockdev_find(node, disk)
1810

    
1811

    
1812
def _GenerateUniqueNames(lu, exts):
1813
  """Generate a suitable LV name.
1814

1815
  This will generate a logical volume name for the given instance.
1816

1817
  """
1818
  results = []
1819
  for val in exts:
1820
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1821
    results.append("%s%s" % (new_id, val))
1822
  return results
1823

    
1824

    
1825
class TLReplaceDisks(Tasklet):
1826
  """Replaces disks for an instance.
1827

1828
  Note: Locking is not within the scope of this class.
1829

1830
  """
1831
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1832
               disks, early_release, ignore_ipolicy):
1833
    """Initializes this class.
1834

1835
    """
1836
    Tasklet.__init__(self, lu)
1837

    
1838
    # Parameters
1839
    self.instance_name = instance_name
1840
    self.mode = mode
1841
    self.iallocator_name = iallocator_name
1842
    self.remote_node = remote_node
1843
    self.disks = disks
1844
    self.early_release = early_release
1845
    self.ignore_ipolicy = ignore_ipolicy
1846

    
1847
    # Runtime data
1848
    self.instance = None
1849
    self.new_node = None
1850
    self.target_node = None
1851
    self.other_node = None
1852
    self.remote_node_info = None
1853
    self.node_secondary_ip = None
1854

    
1855
  @staticmethod
1856
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1857
    """Compute a new secondary node using an IAllocator.
1858

1859
    """
1860
    req = iallocator.IAReqRelocate(name=instance_name,
1861
                                   relocate_from=list(relocate_from))
1862
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1863

    
1864
    ial.Run(iallocator_name)
1865

    
1866
    if not ial.success:
1867
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1868
                                 " %s" % (iallocator_name, ial.info),
1869
                                 errors.ECODE_NORES)
1870

    
1871
    remote_node_name = ial.result[0]
1872

    
1873
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1874
               instance_name, remote_node_name)
1875

    
1876
    return remote_node_name
1877

    
1878
  def _FindFaultyDisks(self, node_name):
1879
    """Wrapper for L{FindFaultyInstanceDisks}.
1880

1881
    """
1882
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1883
                                   node_name, True)
1884

    
1885
  def _CheckDisksActivated(self, instance):
1886
    """Checks if the instance disks are activated.
1887

1888
    @param instance: The instance to check disks
1889
    @return: True if they are activated, False otherwise
1890

1891
    """
1892
    nodes = instance.all_nodes
1893

    
1894
    for idx, dev in enumerate(instance.disks):
1895
      for node in nodes:
1896
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1897
        self.cfg.SetDiskID(dev, node)
1898

    
1899
        result = _BlockdevFind(self, node, dev, instance)
1900

    
1901
        if result.offline:
1902
          continue
1903
        elif result.fail_msg or not result.payload:
1904
          return False
1905

    
1906
    return True
1907

    
1908
  def CheckPrereq(self):
1909
    """Check prerequisites.
1910

1911
    This checks that the instance is in the cluster.
1912

1913
    """
1914
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1915
    assert instance is not None, \
1916
      "Cannot retrieve locked instance %s" % self.instance_name
1917

    
1918
    if instance.disk_template != constants.DT_DRBD8:
1919
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1920
                                 " instances", errors.ECODE_INVAL)
1921

    
1922
    if len(instance.secondary_nodes) != 1:
1923
      raise errors.OpPrereqError("The instance has a strange layout,"
1924
                                 " expected one secondary but found %d" %
1925
                                 len(instance.secondary_nodes),
1926
                                 errors.ECODE_FAULT)
1927

    
1928
    instance = self.instance
1929
    secondary_node = instance.secondary_nodes[0]
1930

    
1931
    if self.iallocator_name is None:
1932
      remote_node = self.remote_node
1933
    else:
1934
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1935
                                       instance.name, instance.secondary_nodes)
1936

    
1937
    if remote_node is None:
1938
      self.remote_node_info = None
1939
    else:
1940
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1941
             "Remote node '%s' is not locked" % remote_node
1942

    
1943
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1944
      assert self.remote_node_info is not None, \
1945
        "Cannot retrieve locked node %s" % remote_node
1946

    
1947
    if remote_node == self.instance.primary_node:
1948
      raise errors.OpPrereqError("The specified node is the primary node of"
1949
                                 " the instance", errors.ECODE_INVAL)
1950

    
1951
    if remote_node == secondary_node:
1952
      raise errors.OpPrereqError("The specified node is already the"
1953
                                 " secondary node of the instance",
1954
                                 errors.ECODE_INVAL)
1955

    
1956
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1957
                                    constants.REPLACE_DISK_CHG):
1958
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1959
                                 errors.ECODE_INVAL)
1960

    
1961
    if self.mode == constants.REPLACE_DISK_AUTO:
1962
      if not self._CheckDisksActivated(instance):
1963
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1964
                                   " first" % self.instance_name,
1965
                                   errors.ECODE_STATE)
1966
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1967
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1968

    
1969
      if faulty_primary and faulty_secondary:
1970
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1971
                                   " one node and can not be repaired"
1972
                                   " automatically" % self.instance_name,
1973
                                   errors.ECODE_STATE)
1974

    
1975
      if faulty_primary:
1976
        self.disks = faulty_primary
1977
        self.target_node = instance.primary_node
1978
        self.other_node = secondary_node
1979
        check_nodes = [self.target_node, self.other_node]
1980
      elif faulty_secondary:
1981
        self.disks = faulty_secondary
1982
        self.target_node = secondary_node
1983
        self.other_node = instance.primary_node
1984
        check_nodes = [self.target_node, self.other_node]
1985
      else:
1986
        self.disks = []
1987
        check_nodes = []
1988

    
1989
    else:
1990
      # Non-automatic modes
1991
      if self.mode == constants.REPLACE_DISK_PRI:
1992
        self.target_node = instance.primary_node
1993
        self.other_node = secondary_node
1994
        check_nodes = [self.target_node, self.other_node]
1995

    
1996
      elif self.mode == constants.REPLACE_DISK_SEC:
1997
        self.target_node = secondary_node
1998
        self.other_node = instance.primary_node
1999
        check_nodes = [self.target_node, self.other_node]
2000

    
2001
      elif self.mode == constants.REPLACE_DISK_CHG:
2002
        self.new_node = remote_node
2003
        self.other_node = instance.primary_node
2004
        self.target_node = secondary_node
2005
        check_nodes = [self.new_node, self.other_node]
2006

    
2007
        CheckNodeNotDrained(self.lu, remote_node)
2008
        CheckNodeVmCapable(self.lu, remote_node)
2009

    
2010
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
2011
        assert old_node_info is not None
2012
        if old_node_info.offline and not self.early_release:
2013
          # doesn't make sense to delay the release
2014
          self.early_release = True
2015
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2016
                          " early-release mode", secondary_node)
2017

    
2018
      else:
2019
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2020
                                     self.mode)
2021

    
2022
      # If not specified all disks should be replaced
2023
      if not self.disks:
2024
        self.disks = range(len(self.instance.disks))
2025

    
2026
    # TODO: This is ugly, but right now we can't distinguish between internal
2027
    # submitted opcode and external one. We should fix that.
2028
    if self.remote_node_info:
2029
      # We change the node, lets verify it still meets instance policy
2030
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2031
      cluster = self.cfg.GetClusterInfo()
2032
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2033
                                                              new_group_info)
2034
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2035
                             self.cfg, ignore=self.ignore_ipolicy)
2036

    
2037
    for node in check_nodes:
2038
      CheckNodeOnline(self.lu, node)
2039

    
2040
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2041
                                                          self.other_node,
2042
                                                          self.target_node]
2043
                              if node_name is not None)
2044

    
2045
    # Release unneeded node and node resource locks
2046
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2047
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2048
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2049

    
2050
    # Release any owned node group
2051
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2052

    
2053
    # Check whether disks are valid
2054
    for disk_idx in self.disks:
2055
      instance.FindDisk(disk_idx)
2056

    
2057
    # Get secondary node IP addresses
2058
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2059
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2060

    
2061
  def Exec(self, feedback_fn):
2062
    """Execute disk replacement.
2063

2064
    This dispatches the disk replacement to the appropriate handler.
2065

2066
    """
2067
    if __debug__:
2068
      # Verify owned locks before starting operation
2069
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2070
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2071
          ("Incorrect node locks, owning %s, expected %s" %
2072
           (owned_nodes, self.node_secondary_ip.keys()))
2073
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2074
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2075
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2076

    
2077
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2078
      assert list(owned_instances) == [self.instance_name], \
2079
          "Instance '%s' not locked" % self.instance_name
2080

    
2081
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2082
          "Should not own any node group lock at this point"
2083

    
2084
    if not self.disks:
2085
      feedback_fn("No disks need replacement for instance '%s'" %
2086
                  self.instance.name)
2087
      return
2088

    
2089
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2090
                (utils.CommaJoin(self.disks), self.instance.name))
2091
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2092
    feedback_fn("Current seconary node: %s" %
2093
                utils.CommaJoin(self.instance.secondary_nodes))
2094

    
2095
    activate_disks = not self.instance.disks_active
2096

    
2097
    # Activate the instance disks if we're replacing them on a down instance
2098
    if activate_disks:
2099
      StartInstanceDisks(self.lu, self.instance, True)
2100

    
2101
    try:
2102
      # Should we replace the secondary node?
2103
      if self.new_node is not None:
2104
        fn = self._ExecDrbd8Secondary
2105
      else:
2106
        fn = self._ExecDrbd8DiskOnly
2107

    
2108
      result = fn(feedback_fn)
2109
    finally:
2110
      # Deactivate the instance disks if we're replacing them on a
2111
      # down instance
2112
      if activate_disks:
2113
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2114

    
2115
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2116

    
2117
    if __debug__:
2118
      # Verify owned locks
2119
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2120
      nodes = frozenset(self.node_secondary_ip)
2121
      assert ((self.early_release and not owned_nodes) or
2122
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2123
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2124
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2125

    
2126
    return result
2127

    
2128
  def _CheckVolumeGroup(self, nodes):
2129
    self.lu.LogInfo("Checking volume groups")
2130

    
2131
    vgname = self.cfg.GetVGName()
2132

    
2133
    # Make sure volume group exists on all involved nodes
2134
    results = self.rpc.call_vg_list(nodes)
2135
    if not results:
2136
      raise errors.OpExecError("Can't list volume groups on the nodes")
2137

    
2138
    for node in nodes:
2139
      res = results[node]
2140
      res.Raise("Error checking node %s" % node)
2141
      if vgname not in res.payload:
2142
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2143
                                 (vgname, node))
2144

    
2145
  def _CheckDisksExistence(self, nodes):
2146
    # Check disk existence
2147
    for idx, dev in enumerate(self.instance.disks):
2148
      if idx not in self.disks:
2149
        continue
2150

    
2151
      for node in nodes:
2152
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2153
        self.cfg.SetDiskID(dev, node)
2154

    
2155
        result = _BlockdevFind(self, node, dev, self.instance)
2156

    
2157
        msg = result.fail_msg
2158
        if msg or not result.payload:
2159
          if not msg:
2160
            msg = "disk not found"
2161
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2162
                                   (idx, node, msg))
2163

    
2164
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2165
    for idx, dev in enumerate(self.instance.disks):
2166
      if idx not in self.disks:
2167
        continue
2168

    
2169
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2170
                      (idx, node_name))
2171

    
2172
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2173
                                  on_primary, ldisk=ldisk):
2174
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2175
                                 " replace disks for instance %s" %
2176
                                 (node_name, self.instance.name))
2177

    
2178
  def _CreateNewStorage(self, node_name):
2179
    """Create new storage on the primary or secondary node.
2180

2181
    This is only used for same-node replaces, not for changing the
2182
    secondary node, hence we don't want to modify the existing disk.
2183

2184
    """
2185
    iv_names = {}
2186

    
2187
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2188
    for idx, dev in enumerate(disks):
2189
      if idx not in self.disks:
2190
        continue
2191

    
2192
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2193

    
2194
      self.cfg.SetDiskID(dev, node_name)
2195

    
2196
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2197
      names = _GenerateUniqueNames(self.lu, lv_names)
2198

    
2199
      (data_disk, meta_disk) = dev.children
2200
      vg_data = data_disk.logical_id[0]
2201
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2202
                             logical_id=(vg_data, names[0]),
2203
                             params=data_disk.params)
2204
      vg_meta = meta_disk.logical_id[0]
2205
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2206
                             size=constants.DRBD_META_SIZE,
2207
                             logical_id=(vg_meta, names[1]),
2208
                             params=meta_disk.params)
2209

    
2210
      new_lvs = [lv_data, lv_meta]
2211
      old_lvs = [child.Copy() for child in dev.children]
2212
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2213
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2214

    
2215
      # we pass force_create=True to force the LVM creation
2216
      for new_lv in new_lvs:
2217
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2218
                             GetInstanceInfoText(self.instance), False,
2219
                             excl_stor)
2220

    
2221
    return iv_names
2222

    
2223
  def _CheckDevices(self, node_name, iv_names):
2224
    for name, (dev, _, _) in iv_names.iteritems():
2225
      self.cfg.SetDiskID(dev, node_name)
2226

    
2227
      result = _BlockdevFind(self, node_name, dev, self.instance)
2228

    
2229
      msg = result.fail_msg
2230
      if msg or not result.payload:
2231
        if not msg:
2232
          msg = "disk not found"
2233
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2234
                                 (name, msg))
2235

    
2236
      if result.payload.is_degraded:
2237
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2238

    
2239
  def _RemoveOldStorage(self, node_name, iv_names):
2240
    for name, (_, old_lvs, _) in iv_names.iteritems():
2241
      self.lu.LogInfo("Remove logical volumes for %s", name)
2242

    
2243
      for lv in old_lvs:
2244
        self.cfg.SetDiskID(lv, node_name)
2245

    
2246
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2247
        if msg:
2248
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2249
                             hint="remove unused LVs manually")
2250

    
2251
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2252
    """Replace a disk on the primary or secondary for DRBD 8.
2253

2254
    The algorithm for replace is quite complicated:
2255

2256
      1. for each disk to be replaced:
2257

2258
        1. create new LVs on the target node with unique names
2259
        1. detach old LVs from the drbd device
2260
        1. rename old LVs to name_replaced.<time_t>
2261
        1. rename new LVs to old LVs
2262
        1. attach the new LVs (with the old names now) to the drbd device
2263

2264
      1. wait for sync across all devices
2265

2266
      1. for each modified disk:
2267

2268
        1. remove old LVs (which have the name name_replaces.<time_t>)
2269

2270
    Failures are not very well handled.
2271

2272
    """
2273
    steps_total = 6
2274

    
2275
    # Step: check device activation
2276
    self.lu.LogStep(1, steps_total, "Check device existence")
2277
    self._CheckDisksExistence([self.other_node, self.target_node])
2278
    self._CheckVolumeGroup([self.target_node, self.other_node])
2279

    
2280
    # Step: check other node consistency
2281
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2282
    self._CheckDisksConsistency(self.other_node,
2283
                                self.other_node == self.instance.primary_node,
2284
                                False)
2285

    
2286
    # Step: create new storage
2287
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2288
    iv_names = self._CreateNewStorage(self.target_node)
2289

    
2290
    # Step: for each lv, detach+rename*2+attach
2291
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2292
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2293
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2294

    
2295
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2296
                                                     old_lvs)
2297
      result.Raise("Can't detach drbd from local storage on node"
2298
                   " %s for device %s" % (self.target_node, dev.iv_name))
2299
      #dev.children = []
2300
      #cfg.Update(instance)
2301

    
2302
      # ok, we created the new LVs, so now we know we have the needed
2303
      # storage; as such, we proceed on the target node to rename
2304
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2305
      # using the assumption that logical_id == physical_id (which in
2306
      # turn is the unique_id on that node)
2307

    
2308
      # FIXME(iustin): use a better name for the replaced LVs
2309
      temp_suffix = int(time.time())
2310
      ren_fn = lambda d, suff: (d.physical_id[0],
2311
                                d.physical_id[1] + "_replaced-%s" % suff)
2312

    
2313
      # Build the rename list based on what LVs exist on the node
2314
      rename_old_to_new = []
2315
      for to_ren in old_lvs:
2316
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2317
        if not result.fail_msg and result.payload:
2318
          # device exists
2319
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2320

    
2321
      self.lu.LogInfo("Renaming the old LVs on the target node")
2322
      result = self.rpc.call_blockdev_rename(self.target_node,
2323
                                             rename_old_to_new)
2324
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2325

    
2326
      # Now we rename the new LVs to the old LVs
2327
      self.lu.LogInfo("Renaming the new LVs on the target node")
2328
      rename_new_to_old = [(new, old.physical_id)
2329
                           for old, new in zip(old_lvs, new_lvs)]
2330
      result = self.rpc.call_blockdev_rename(self.target_node,
2331
                                             rename_new_to_old)
2332
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2333

    
2334
      # Intermediate steps of in memory modifications
2335
      for old, new in zip(old_lvs, new_lvs):
2336
        new.logical_id = old.logical_id
2337
        self.cfg.SetDiskID(new, self.target_node)
2338

    
2339
      # We need to modify old_lvs so that removal later removes the
2340
      # right LVs, not the newly added ones; note that old_lvs is a
2341
      # copy here
2342
      for disk in old_lvs:
2343
        disk.logical_id = ren_fn(disk, temp_suffix)
2344
        self.cfg.SetDiskID(disk, self.target_node)
2345

    
2346
      # Now that the new lvs have the old name, we can add them to the device
2347
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2348
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2349
                                                  (dev, self.instance), new_lvs)
2350
      msg = result.fail_msg
2351
      if msg:
2352
        for new_lv in new_lvs:
2353
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2354
                                               new_lv).fail_msg
2355
          if msg2:
2356
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2357
                               hint=("cleanup manually the unused logical"
2358
                                     "volumes"))
2359
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2360

    
2361
    cstep = itertools.count(5)
2362

    
2363
    if self.early_release:
2364
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2365
      self._RemoveOldStorage(self.target_node, iv_names)
2366
      # TODO: Check if releasing locks early still makes sense
2367
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2368
    else:
2369
      # Release all resource locks except those used by the instance
2370
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2371
                   keep=self.node_secondary_ip.keys())
2372

    
2373
    # Release all node locks while waiting for sync
2374
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2375

    
2376
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2377
    # shutdown in the caller into consideration.
2378

    
2379
    # Wait for sync
2380
    # This can fail as the old devices are degraded and _WaitForSync
2381
    # does a combined result over all disks, so we don't check its return value
2382
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2383
    WaitForSync(self.lu, self.instance)
2384

    
2385
    # Check all devices manually
2386
    self._CheckDevices(self.instance.primary_node, iv_names)
2387

    
2388
    # Step: remove old storage
2389
    if not self.early_release:
2390
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2391
      self._RemoveOldStorage(self.target_node, iv_names)
2392

    
2393
  def _ExecDrbd8Secondary(self, feedback_fn):
2394
    """Replace the secondary node for DRBD 8.
2395

2396
    The algorithm for replace is quite complicated:
2397
      - for all disks of the instance:
2398
        - create new LVs on the new node with same names
2399
        - shutdown the drbd device on the old secondary
2400
        - disconnect the drbd network on the primary
2401
        - create the drbd device on the new secondary
2402
        - network attach the drbd on the primary, using an artifice:
2403
          the drbd code for Attach() will connect to the network if it
2404
          finds a device which is connected to the good local disks but
2405
          not network enabled
2406
      - wait for sync across all devices
2407
      - remove all disks from the old secondary
2408

2409
    Failures are not very well handled.
2410

2411
    """
2412
    steps_total = 6
2413

    
2414
    pnode = self.instance.primary_node
2415

    
2416
    # Step: check device activation
2417
    self.lu.LogStep(1, steps_total, "Check device existence")
2418
    self._CheckDisksExistence([self.instance.primary_node])
2419
    self._CheckVolumeGroup([self.instance.primary_node])
2420

    
2421
    # Step: check other node consistency
2422
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2423
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2424

    
2425
    # Step: create new storage
2426
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2427
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2428
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2429
    for idx, dev in enumerate(disks):
2430
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2431
                      (self.new_node, idx))
2432
      # we pass force_create=True to force LVM creation
2433
      for new_lv in dev.children:
2434
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2435
                             True, GetInstanceInfoText(self.instance), False,
2436
                             excl_stor)
2437

    
2438
    # Step 4: dbrd minors and drbd setups changes
2439
    # after this, we must manually remove the drbd minors on both the
2440
    # error and the success paths
2441
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2442
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2443
                                         for dev in self.instance.disks],
2444
                                        self.instance.name)
2445
    logging.debug("Allocated minors %r", minors)
2446

    
2447
    iv_names = {}
2448
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2449
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2450
                      (self.new_node, idx))
2451
      # create new devices on new_node; note that we create two IDs:
2452
      # one without port, so the drbd will be activated without
2453
      # networking information on the new node at this stage, and one
2454
      # with network, for the latter activation in step 4
2455
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2456
      if self.instance.primary_node == o_node1:
2457
        p_minor = o_minor1
2458
      else:
2459
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2460
        p_minor = o_minor2
2461

    
2462
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2463
                      p_minor, new_minor, o_secret)
2464
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2465
                    p_minor, new_minor, o_secret)
2466

    
2467
      iv_names[idx] = (dev, dev.children, new_net_id)
2468
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2469
                    new_net_id)
2470
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2471
                              logical_id=new_alone_id,
2472
                              children=dev.children,
2473
                              size=dev.size,
2474
                              params={})
2475
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2476
                                            self.cfg)
2477
      try:
2478
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2479
                             anno_new_drbd,
2480
                             GetInstanceInfoText(self.instance), False,
2481
                             excl_stor)
2482
      except errors.GenericError:
2483
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2484
        raise
2485

    
2486
    # We have new devices, shutdown the drbd on the old secondary
2487
    for idx, dev in enumerate(self.instance.disks):
2488
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2489
      self.cfg.SetDiskID(dev, self.target_node)
2490
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2491
                                            (dev, self.instance)).fail_msg
2492
      if msg:
2493
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2494
                           "node: %s" % (idx, msg),
2495
                           hint=("Please cleanup this device manually as"
2496
                                 " soon as possible"))
2497

    
2498
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2499
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2500
                                               self.instance.disks)[pnode]
2501

    
2502
    msg = result.fail_msg
2503
    if msg:
2504
      # detaches didn't succeed (unlikely)
2505
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2506
      raise errors.OpExecError("Can't detach the disks from the network on"
2507
                               " old node: %s" % (msg,))
2508

    
2509
    # if we managed to detach at least one, we update all the disks of
2510
    # the instance to point to the new secondary
2511
    self.lu.LogInfo("Updating instance configuration")
2512
    for dev, _, new_logical_id in iv_names.itervalues():
2513
      dev.logical_id = new_logical_id
2514
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2515

    
2516
    self.cfg.Update(self.instance, feedback_fn)
2517

    
2518
    # Release all node locks (the configuration has been updated)
2519
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2520

    
2521
    # and now perform the drbd attach
2522
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2523
                    " (standalone => connected)")
2524
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2525
                                            self.new_node],
2526
                                           self.node_secondary_ip,
2527
                                           (self.instance.disks, self.instance),
2528
                                           self.instance.name,
2529
                                           False)
2530
    for to_node, to_result in result.items():
2531
      msg = to_result.fail_msg
2532
      if msg:
2533
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2534
                           to_node, msg,
2535
                           hint=("please do a gnt-instance info to see the"
2536
                                 " status of disks"))
2537

    
2538
    cstep = itertools.count(5)
2539

    
2540
    if self.early_release:
2541
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2542
      self._RemoveOldStorage(self.target_node, iv_names)
2543
      # TODO: Check if releasing locks early still makes sense
2544
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2545
    else:
2546
      # Release all resource locks except those used by the instance
2547
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2548
                   keep=self.node_secondary_ip.keys())
2549

    
2550
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2551
    # shutdown in the caller into consideration.
2552

    
2553
    # Wait for sync
2554
    # This can fail as the old devices are degraded and _WaitForSync
2555
    # does a combined result over all disks, so we don't check its return value
2556
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2557
    WaitForSync(self.lu, self.instance)
2558

    
2559
    # Check all devices manually
2560
    self._CheckDevices(self.instance.primary_node, iv_names)
2561

    
2562
    # Step: remove old storage
2563
    if not self.early_release:
2564
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2565
      self._RemoveOldStorage(self.target_node, iv_names)