Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 94e252a3

History | View | Annotate | Download (99.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  constants.DT_FILE: ".file",
56
  constants.DT_SHARED_FILE: ".sharedfile",
57
  }
58

    
59

    
60
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61
                         excl_stor):
62
  """Create a single block device on a given node.
63

64
  This will not recurse over children of the device, so they must be
65
  created in advance.
66

67
  @param lu: the lu on whose behalf we execute
68
  @param node_uuid: the node on which to create the device
69
  @type instance: L{objects.Instance}
70
  @param instance: the instance which owns the device
71
  @type device: L{objects.Disk}
72
  @param device: the device to create
73
  @param info: the extra 'metadata' we should attach to the device
74
      (this will be represented as a LVM tag)
75
  @type force_open: boolean
76
  @param force_open: this parameter will be passes to the
77
      L{backend.BlockdevCreate} function where it specifies
78
      whether we run on primary or not, and it affects both
79
      the child assembly and the device own Open() execution
80
  @type excl_stor: boolean
81
  @param excl_stor: Whether exclusive_storage is active for the node
82

83
  """
84
  lu.cfg.SetDiskID(device, node_uuid)
85
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
86
                                       instance.name, force_open, info,
87
                                       excl_stor)
88
  result.Raise("Can't create block device %s on"
89
               " node %s for instance %s" % (device,
90
                                             lu.cfg.GetNodeName(node_uuid),
91
                                             instance.name))
92
  if device.physical_id is None:
93
    device.physical_id = result.payload
94

    
95

    
96
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
97
                         info, force_open, excl_stor):
98
  """Create a tree of block devices on a given node.
99

100
  If this device type has to be created on secondaries, create it and
101
  all its children.
102

103
  If not, just recurse to children keeping the same 'force' value.
104

105
  @attention: The device has to be annotated already.
106

107
  @param lu: the lu on whose behalf we execute
108
  @param node_uuid: the node on which to create the device
109
  @type instance: L{objects.Instance}
110
  @param instance: the instance which owns the device
111
  @type device: L{objects.Disk}
112
  @param device: the device to create
113
  @type force_create: boolean
114
  @param force_create: whether to force creation of this device; this
115
      will be change to True whenever we find a device which has
116
      CreateOnSecondary() attribute
117
  @param info: the extra 'metadata' we should attach to the device
118
      (this will be represented as a LVM tag)
119
  @type force_open: boolean
120
  @param force_open: this parameter will be passes to the
121
      L{backend.BlockdevCreate} function where it specifies
122
      whether we run on primary or not, and it affects both
123
      the child assembly and the device own Open() execution
124
  @type excl_stor: boolean
125
  @param excl_stor: Whether exclusive_storage is active for the node
126

127
  @return: list of created devices
128
  """
129
  created_devices = []
130
  try:
131
    if device.CreateOnSecondary():
132
      force_create = True
133

    
134
    if device.children:
135
      for child in device.children:
136
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
137
                                    force_create, info, force_open, excl_stor)
138
        created_devices.extend(devs)
139

    
140
    if not force_create:
141
      return created_devices
142

    
143
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
144
                         excl_stor)
145
    # The device has been completely created, so there is no point in keeping
146
    # its subdevices in the list. We just add the device itself instead.
147
    created_devices = [(node_uuid, device)]
148
    return created_devices
149

    
150
  except errors.DeviceCreationError, e:
151
    e.created_devices.extend(created_devices)
152
    raise e
153
  except errors.OpExecError, e:
154
    raise errors.DeviceCreationError(str(e), created_devices)
155

    
156

    
157
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
158
  """Whether exclusive_storage is in effect for the given node.
159

160
  @type cfg: L{config.ConfigWriter}
161
  @param cfg: The cluster configuration
162
  @type node_uuid: string
163
  @param node_uuid: The node UUID
164
  @rtype: bool
165
  @return: The effective value of exclusive_storage
166
  @raise errors.OpPrereqError: if no node exists with the given name
167

168
  """
169
  ni = cfg.GetNodeInfo(node_uuid)
170
  if ni is None:
171
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
172
                               errors.ECODE_NOENT)
173
  return IsExclusiveStorageEnabledNode(cfg, ni)
174

    
175

    
176
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
177
                    force_open):
178
  """Wrapper around L{_CreateBlockDevInner}.
179

180
  This method annotates the root device first.
181

182
  """
183
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
184
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
185
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
186
                              force_open, excl_stor)
187

    
188

    
189
def _UndoCreateDisks(lu, disks_created):
190
  """Undo the work performed by L{CreateDisks}.
191

192
  This function is called in case of an error to undo the work of
193
  L{CreateDisks}.
194

195
  @type lu: L{LogicalUnit}
196
  @param lu: the logical unit on whose behalf we execute
197
  @param disks_created: the result returned by L{CreateDisks}
198

199
  """
200
  for (node_uuid, disk) in disks_created:
201
    lu.cfg.SetDiskID(disk, node_uuid)
202
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
203
    result.Warn("Failed to remove newly-created disk %s on node %s" %
204
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
205

    
206

    
207
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208
  """Create all disks for an instance.
209

210
  This abstracts away some work from AddInstance.
211

212
  @type lu: L{LogicalUnit}
213
  @param lu: the logical unit on whose behalf we execute
214
  @type instance: L{objects.Instance}
215
  @param instance: the instance whose disks we should create
216
  @type to_skip: list
217
  @param to_skip: list of indices to skip
218
  @type target_node_uuid: string
219
  @param target_node_uuid: if passed, overrides the target node for creation
220
  @type disks: list of {objects.Disk}
221
  @param disks: the disks to create; if not specified, all the disks of the
222
      instance are created
223
  @return: information about the created disks, to be used to call
224
      L{_UndoCreateDisks}
225
  @raise errors.OpPrereqError: in case of error
226

227
  """
228
  info = GetInstanceInfoText(instance)
229
  if target_node_uuid is None:
230
    pnode_uuid = instance.primary_node
231
    all_node_uuids = instance.all_nodes
232
  else:
233
    pnode_uuid = target_node_uuid
234
    all_node_uuids = [pnode_uuid]
235

    
236
  if disks is None:
237
    disks = instance.disks
238

    
239
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
240

    
241
  if instance.disk_template in constants.DTS_FILEBASED:
242
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
243
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
244

    
245
    result.Raise("Failed to create directory '%s' on"
246
                 " node %s" % (file_storage_dir,
247
                               lu.cfg.GetNodeName(pnode_uuid)))
248

    
249
  disks_created = []
250
  for idx, device in enumerate(disks):
251
    if to_skip and idx in to_skip:
252
      continue
253
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
254
    for node_uuid in all_node_uuids:
255
      f_create = node_uuid == pnode_uuid
256
      try:
257
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
258
                        f_create)
259
        disks_created.append((node_uuid, device))
260
      except errors.DeviceCreationError, e:
261
        logging.warning("Creating disk %s for instance '%s' failed",
262
                        idx, instance.name)
263
        disks_created.extend(e.created_devices)
264
        _UndoCreateDisks(lu, disks_created)
265
        raise errors.OpExecError(e.message)
266
  return disks_created
267

    
268

    
269
def ComputeDiskSizePerVG(disk_template, disks):
270
  """Compute disk size requirements in the volume group
271

272
  """
273
  def _compute(disks, payload):
274
    """Universal algorithm.
275

276
    """
277
    vgs = {}
278
    for disk in disks:
279
      vgs[disk[constants.IDISK_VG]] = \
280
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
281

    
282
    return vgs
283

    
284
  # Required free disk space as a function of disk and swap space
285
  req_size_dict = {
286
    constants.DT_DISKLESS: {},
287
    constants.DT_PLAIN: _compute(disks, 0),
288
    # 128 MB are added for drbd metadata for each disk
289
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
290
    constants.DT_FILE: {},
291
    constants.DT_SHARED_FILE: {},
292
    }
293

    
294
  if disk_template not in req_size_dict:
295
    raise errors.ProgrammerError("Disk template '%s' size requirement"
296
                                 " is unknown" % disk_template)
297

    
298
  return req_size_dict[disk_template]
299

    
300

    
301
def ComputeDisks(op, default_vg):
302
  """Computes the instance disks.
303

304
  @param op: The instance opcode
305
  @param default_vg: The default_vg to assume
306

307
  @return: The computed disks
308

309
  """
310
  disks = []
311
  for disk in op.disks:
312
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
313
    if mode not in constants.DISK_ACCESS_SET:
314
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
315
                                 mode, errors.ECODE_INVAL)
316
    size = disk.get(constants.IDISK_SIZE, None)
317
    if size is None:
318
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
319
    try:
320
      size = int(size)
321
    except (TypeError, ValueError):
322
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
323
                                 errors.ECODE_INVAL)
324

    
325
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
326
    if ext_provider and op.disk_template != constants.DT_EXT:
327
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
328
                                 " disk template, not %s" %
329
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
330
                                  op.disk_template), errors.ECODE_INVAL)
331

    
332
    data_vg = disk.get(constants.IDISK_VG, default_vg)
333
    name = disk.get(constants.IDISK_NAME, None)
334
    if name is not None and name.lower() == constants.VALUE_NONE:
335
      name = None
336
    new_disk = {
337
      constants.IDISK_SIZE: size,
338
      constants.IDISK_MODE: mode,
339
      constants.IDISK_VG: data_vg,
340
      constants.IDISK_NAME: name,
341
      }
342

    
343
    for key in [
344
      constants.IDISK_METAVG,
345
      constants.IDISK_ADOPT,
346
      constants.IDISK_SPINDLES,
347
      ]:
348
      if key in disk:
349
        new_disk[key] = disk[key]
350

    
351
    # For extstorage, demand the `provider' option and add any
352
    # additional parameters (ext-params) to the dict
353
    if op.disk_template == constants.DT_EXT:
354
      if ext_provider:
355
        new_disk[constants.IDISK_PROVIDER] = ext_provider
356
        for key in disk:
357
          if key not in constants.IDISK_PARAMS:
358
            new_disk[key] = disk[key]
359
      else:
360
        raise errors.OpPrereqError("Missing provider for template '%s'" %
361
                                   constants.DT_EXT, errors.ECODE_INVAL)
362

    
363
    disks.append(new_disk)
364

    
365
  return disks
366

    
367

    
368
def CheckRADOSFreeSpace():
369
  """Compute disk size requirements inside the RADOS cluster.
370

371
  """
372
  # For the RADOS cluster we assume there is always enough space.
373
  pass
374

    
375

    
376
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
377
                         iv_name, p_minor, s_minor):
378
  """Generate a drbd8 device complete with its children.
379

380
  """
381
  assert len(vgnames) == len(names) == 2
382
  port = lu.cfg.AllocatePort()
383
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384

    
385
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
386
                          logical_id=(vgnames[0], names[0]),
387
                          params={})
388
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
390
                          size=constants.DRBD_META_SIZE,
391
                          logical_id=(vgnames[1], names[1]),
392
                          params={})
393
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
395
                          logical_id=(primary_uuid, secondary_uuid, port,
396
                                      p_minor, s_minor,
397
                                      shared_secret),
398
                          children=[dev_data, dev_meta],
399
                          iv_name=iv_name, params={})
400
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401
  return drbd_dev
402

    
403

    
404
def GenerateDiskTemplate(
405
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
406
  disk_info, file_storage_dir, file_driver, base_index,
407
  feedback_fn, full_disk_params):
408
  """Generate the entire disk layout for a given template type.
409

410
  """
411
  vgname = lu.cfg.GetVGName()
412
  disk_count = len(disk_info)
413
  disks = []
414

    
415
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
416

    
417
  if template_name == constants.DT_DISKLESS:
418
    pass
419
  elif template_name == constants.DT_DRBD8:
420
    if len(secondary_node_uuids) != 1:
421
      raise errors.ProgrammerError("Wrong template configuration")
422
    remote_node_uuid = secondary_node_uuids[0]
423
    minors = lu.cfg.AllocateDRBDMinor(
424
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
425

    
426
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
427
                                                       full_disk_params)
428
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
429

    
430
    names = []
431
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
432
                                               for i in range(disk_count)]):
433
      names.append(lv_prefix + "_data")
434
      names.append(lv_prefix + "_meta")
435
    for idx, disk in enumerate(disk_info):
436
      disk_index = idx + base_index
437
      data_vg = disk.get(constants.IDISK_VG, vgname)
438
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
439
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
440
                                      disk[constants.IDISK_SIZE],
441
                                      [data_vg, meta_vg],
442
                                      names[idx * 2:idx * 2 + 2],
443
                                      "disk/%d" % disk_index,
444
                                      minors[idx * 2], minors[idx * 2 + 1])
445
      disk_dev.mode = disk[constants.IDISK_MODE]
446
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
447
      disks.append(disk_dev)
448
  else:
449
    if secondary_node_uuids:
450
      raise errors.ProgrammerError("Wrong template configuration")
451

    
452
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
453
    if name_prefix is None:
454
      names = None
455
    else:
456
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
457
                                        (name_prefix, base_index + i)
458
                                        for i in range(disk_count)])
459

    
460
    if template_name == constants.DT_PLAIN:
461

    
462
      def logical_id_fn(idx, _, disk):
463
        vg = disk.get(constants.IDISK_VG, vgname)
464
        return (vg, names[idx])
465

    
466
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
467
      logical_id_fn = \
468
        lambda _, disk_index, disk: (file_driver,
469
                                     "%s/%s" % (file_storage_dir,
470
                                                names[idx]))
471
    elif template_name == constants.DT_BLOCK:
472
      logical_id_fn = \
473
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
474
                                       disk[constants.IDISK_ADOPT])
475
    elif template_name == constants.DT_RBD:
476
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
477
    elif template_name == constants.DT_EXT:
478
      def logical_id_fn(idx, _, disk):
479
        provider = disk.get(constants.IDISK_PROVIDER, None)
480
        if provider is None:
481
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
482
                                       " not found", constants.DT_EXT,
483
                                       constants.IDISK_PROVIDER)
484
        return (provider, names[idx])
485
    else:
486
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
487

    
488
    dev_type = template_name
489

    
490
    for idx, disk in enumerate(disk_info):
491
      params = {}
492
      # Only for the Ext template add disk_info to params
493
      if template_name == constants.DT_EXT:
494
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
495
        for key in disk:
496
          if key not in constants.IDISK_PARAMS:
497
            params[key] = disk[key]
498
      disk_index = idx + base_index
499
      size = disk[constants.IDISK_SIZE]
500
      feedback_fn("* disk %s, size %s" %
501
                  (disk_index, utils.FormatUnit(size, "h")))
502
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
503
                              logical_id=logical_id_fn(idx, disk_index, disk),
504
                              iv_name="disk/%d" % disk_index,
505
                              mode=disk[constants.IDISK_MODE],
506
                              params=params,
507
                              spindles=disk.get(constants.IDISK_SPINDLES))
508
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
509
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
510
      disks.append(disk_dev)
511

    
512
  return disks
513

    
514

    
515
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
516
  """Check the presence of the spindle options with exclusive_storage.
517

518
  @type diskdict: dict
519
  @param diskdict: disk parameters
520
  @type es_flag: bool
521
  @param es_flag: the effective value of the exlusive_storage flag
522
  @type required: bool
523
  @param required: whether spindles are required or just optional
524
  @raise errors.OpPrereqError when spindles are given and they should not
525

526
  """
527
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
528
      diskdict[constants.IDISK_SPINDLES] is not None):
529
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
530
                               " when exclusive storage is not active",
531
                               errors.ECODE_INVAL)
532
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
533
                                diskdict[constants.IDISK_SPINDLES] is None)):
534
    raise errors.OpPrereqError("You must specify spindles in instance disks"
535
                               " when exclusive storage is active",
536
                               errors.ECODE_INVAL)
537

    
538

    
539
class LUInstanceRecreateDisks(LogicalUnit):
540
  """Recreate an instance's missing disks.
541

542
  """
543
  HPATH = "instance-recreate-disks"
544
  HTYPE = constants.HTYPE_INSTANCE
545
  REQ_BGL = False
546

    
547
  _MODIFYABLE = compat.UniqueFrozenset([
548
    constants.IDISK_SIZE,
549
    constants.IDISK_MODE,
550
    constants.IDISK_SPINDLES,
551
    ])
552

    
553
  # New or changed disk parameters may have different semantics
554
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555
    constants.IDISK_ADOPT,
556

    
557
    # TODO: Implement support changing VG while recreating
558
    constants.IDISK_VG,
559
    constants.IDISK_METAVG,
560
    constants.IDISK_PROVIDER,
561
    constants.IDISK_NAME,
562
    ]))
563

    
564
  def _RunAllocator(self):
565
    """Run the allocator based on input opcode.
566

567
    """
568
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569

    
570
    # FIXME
571
    # The allocator should actually run in "relocate" mode, but current
572
    # allocators don't support relocating all the nodes of an instance at
573
    # the same time. As a workaround we use "allocate" mode, but this is
574
    # suboptimal for two reasons:
575
    # - The instance name passed to the allocator is present in the list of
576
    #   existing instances, so there could be a conflict within the
577
    #   internal structures of the allocator. This doesn't happen with the
578
    #   current allocators, but it's a liability.
579
    # - The allocator counts the resources used by the instance twice: once
580
    #   because the instance exists already, and once because it tries to
581
    #   allocate a new instance.
582
    # The allocator could choose some of the nodes on which the instance is
583
    # running, but that's not a problem. If the instance nodes are broken,
584
    # they should be already be marked as drained or offline, and hence
585
    # skipped by the allocator. If instance disks have been lost for other
586
    # reasons, then recreating the disks on the same nodes should be fine.
587
    disk_template = self.instance.disk_template
588
    spindle_use = be_full[constants.BE_SPINDLE_USE]
589
    disks = [{
590
      constants.IDISK_SIZE: d.size,
591
      constants.IDISK_MODE: d.mode,
592
      constants.IDISK_SPINDLES: d.spindles,
593
      } for d in self.instance.disks]
594
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
595
                                        disk_template=disk_template,
596
                                        tags=list(self.instance.GetTags()),
597
                                        os=self.instance.os,
598
                                        nics=[{}],
599
                                        vcpus=be_full[constants.BE_VCPUS],
600
                                        memory=be_full[constants.BE_MAXMEM],
601
                                        spindle_use=spindle_use,
602
                                        disks=disks,
603
                                        hypervisor=self.instance.hypervisor,
604
                                        node_whitelist=None)
605
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
606

    
607
    ial.Run(self.op.iallocator)
608

    
609
    assert req.RequiredNodes() == len(self.instance.all_nodes)
610

    
611
    if not ial.success:
612
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
613
                                 " %s" % (self.op.iallocator, ial.info),
614
                                 errors.ECODE_NORES)
615

    
616
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
617
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
618
                 self.op.instance_name, self.op.iallocator,
619
                 utils.CommaJoin(self.op.nodes))
620

    
621
  def CheckArguments(self):
622
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
623
      # Normalize and convert deprecated list of disk indices
624
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
625

    
626
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
627
    if duplicates:
628
      raise errors.OpPrereqError("Some disks have been specified more than"
629
                                 " once: %s" % utils.CommaJoin(duplicates),
630
                                 errors.ECODE_INVAL)
631

    
632
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
633
    # when neither iallocator nor nodes are specified
634
    if self.op.iallocator or self.op.nodes:
635
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
636

    
637
    for (idx, params) in self.op.disks:
638
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
639
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
640
      if unsupported:
641
        raise errors.OpPrereqError("Parameters for disk %s try to change"
642
                                   " unmodifyable parameter(s): %s" %
643
                                   (idx, utils.CommaJoin(unsupported)),
644
                                   errors.ECODE_INVAL)
645

    
646
  def ExpandNames(self):
647
    self._ExpandAndLockInstance()
648
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
649

    
650
    if self.op.nodes:
651
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
652
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
653
    else:
654
      self.needed_locks[locking.LEVEL_NODE] = []
655
      if self.op.iallocator:
656
        # iallocator will select a new node in the same group
657
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
658
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
659

    
660
    self.needed_locks[locking.LEVEL_NODE_RES] = []
661

    
662
  def DeclareLocks(self, level):
663
    if level == locking.LEVEL_NODEGROUP:
664
      assert self.op.iallocator is not None
665
      assert not self.op.nodes
666
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
667
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
668
      # Lock the primary group used by the instance optimistically; this
669
      # requires going via the node before it's locked, requiring
670
      # verification later on
671
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
672
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
673

    
674
    elif level == locking.LEVEL_NODE:
675
      # If an allocator is used, then we lock all the nodes in the current
676
      # instance group, as we don't know yet which ones will be selected;
677
      # if we replace the nodes without using an allocator, locks are
678
      # already declared in ExpandNames; otherwise, we need to lock all the
679
      # instance nodes for disk re-creation
680
      if self.op.iallocator:
681
        assert not self.op.nodes
682
        assert not self.needed_locks[locking.LEVEL_NODE]
683
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
684

    
685
        # Lock member nodes of the group of the primary node
686
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
687
          self.needed_locks[locking.LEVEL_NODE].extend(
688
            self.cfg.GetNodeGroup(group_uuid).members)
689

    
690
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
691
      elif not self.op.nodes:
692
        self._LockInstancesNodes(primary_only=False)
693
    elif level == locking.LEVEL_NODE_RES:
694
      # Copy node locks
695
      self.needed_locks[locking.LEVEL_NODE_RES] = \
696
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
697

    
698
  def BuildHooksEnv(self):
699
    """Build hooks env.
700

701
    This runs on master, primary and secondary nodes of the instance.
702

703
    """
704
    return BuildInstanceHookEnvByObject(self, self.instance)
705

    
706
  def BuildHooksNodes(self):
707
    """Build hooks nodes.
708

709
    """
710
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
711
    return (nl, nl)
712

    
713
  def CheckPrereq(self):
714
    """Check prerequisites.
715

716
    This checks that the instance is in the cluster and is not running.
717

718
    """
719
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
720
    assert instance is not None, \
721
      "Cannot retrieve locked instance %s" % self.op.instance_name
722
    if self.op.node_uuids:
723
      if len(self.op.node_uuids) != len(instance.all_nodes):
724
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
725
                                   " %d replacement nodes were specified" %
726
                                   (instance.name, len(instance.all_nodes),
727
                                    len(self.op.node_uuids)),
728
                                   errors.ECODE_INVAL)
729
      assert instance.disk_template != constants.DT_DRBD8 or \
730
             len(self.op.node_uuids) == 2
731
      assert instance.disk_template != constants.DT_PLAIN or \
732
             len(self.op.node_uuids) == 1
733
      primary_node = self.op.node_uuids[0]
734
    else:
735
      primary_node = instance.primary_node
736
    if not self.op.iallocator:
737
      CheckNodeOnline(self, primary_node)
738

    
739
    if instance.disk_template == constants.DT_DISKLESS:
740
      raise errors.OpPrereqError("Instance '%s' has no disks" %
741
                                 self.op.instance_name, errors.ECODE_INVAL)
742

    
743
    # Verify if node group locks are still correct
744
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
745
    if owned_groups:
746
      # Node group locks are acquired only for the primary node (and only
747
      # when the allocator is used)
748
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
749
                              primary_only=True)
750

    
751
    # if we replace nodes *and* the old primary is offline, we don't
752
    # check the instance state
753
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
754
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
755
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
756
                         msg="cannot recreate disks")
757

    
758
    if self.op.disks:
759
      self.disks = dict(self.op.disks)
760
    else:
761
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
762

    
763
    maxidx = max(self.disks.keys())
764
    if maxidx >= len(instance.disks):
765
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
766
                                 errors.ECODE_INVAL)
767

    
768
    if ((self.op.node_uuids or self.op.iallocator) and
769
         sorted(self.disks.keys()) != range(len(instance.disks))):
770
      raise errors.OpPrereqError("Can't recreate disks partially and"
771
                                 " change the nodes at the same time",
772
                                 errors.ECODE_INVAL)
773

    
774
    self.instance = instance
775

    
776
    if self.op.iallocator:
777
      self._RunAllocator()
778
      # Release unneeded node and node resource locks
779
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
780
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
781
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
782

    
783
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
784

    
785
    if self.op.node_uuids:
786
      node_uuids = self.op.node_uuids
787
    else:
788
      node_uuids = instance.all_nodes
789
    excl_stor = compat.any(
790
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
791
      )
792
    for new_params in self.disks.values():
793
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
794

    
795
  def Exec(self, feedback_fn):
796
    """Recreate the disks.
797

798
    """
799
    assert (self.owned_locks(locking.LEVEL_NODE) ==
800
            self.owned_locks(locking.LEVEL_NODE_RES))
801

    
802
    to_skip = []
803
    mods = [] # keeps track of needed changes
804

    
805
    for idx, disk in enumerate(self.instance.disks):
806
      try:
807
        changes = self.disks[idx]
808
      except KeyError:
809
        # Disk should not be recreated
810
        to_skip.append(idx)
811
        continue
812

    
813
      # update secondaries for disks, if needed
814
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
815
        # need to update the nodes and minors
816
        assert len(self.op.node_uuids) == 2
817
        assert len(disk.logical_id) == 6 # otherwise disk internals
818
                                         # have changed
819
        (_, _, old_port, _, _, old_secret) = disk.logical_id
820
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
821
                                                self.instance.uuid)
822
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
823
                  new_minors[0], new_minors[1], old_secret)
824
        assert len(disk.logical_id) == len(new_id)
825
      else:
826
        new_id = None
827

    
828
      mods.append((idx, new_id, changes))
829

    
830
    # now that we have passed all asserts above, we can apply the mods
831
    # in a single run (to avoid partial changes)
832
    for idx, new_id, changes in mods:
833
      disk = self.instance.disks[idx]
834
      if new_id is not None:
835
        assert disk.dev_type == constants.DT_DRBD8
836
        disk.logical_id = new_id
837
      if changes:
838
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
839
                    mode=changes.get(constants.IDISK_MODE, None),
840
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
841

    
842
    # change primary node, if needed
843
    if self.op.node_uuids:
844
      self.instance.primary_node = self.op.node_uuids[0]
845
      self.LogWarning("Changing the instance's nodes, you will have to"
846
                      " remove any disks left on the older nodes manually")
847

    
848
    if self.op.node_uuids:
849
      self.cfg.Update(self.instance, feedback_fn)
850

    
851
    # All touched nodes must be locked
852
    mylocks = self.owned_locks(locking.LEVEL_NODE)
853
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
854
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
855

    
856
    # TODO: Release node locks before wiping, or explain why it's not possible
857
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
858
      wipedisks = [(idx, disk, 0)
859
                   for (idx, disk) in enumerate(self.instance.disks)
860
                   if idx not in to_skip]
861
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
862
                         cleanup=new_disks)
863

    
864

    
865
def _PerformNodeInfoCall(lu, node_uuids, vg):
866
  """Prepares the input and performs a node info call.
867

868
  @type lu: C{LogicalUnit}
869
  @param lu: a logical unit from which we get configuration data
870
  @type node_uuids: list of string
871
  @param node_uuids: list of node UUIDs to perform the call for
872
  @type vg: string
873
  @param vg: the volume group's name
874

875
  """
876
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
877
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
878
                                                  node_uuids)
879
  hvname = lu.cfg.GetHypervisorType()
880
  hvparams = lu.cfg.GetClusterInfo().hvparams
881
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
882
                                   [(hvname, hvparams[hvname])])
883
  return nodeinfo
884

    
885

    
886
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
887
  """Checks the vg capacity for a given node.
888

889
  @type node_info: tuple (_, list of dicts, _)
890
  @param node_info: the result of the node info call for one node
891
  @type node_name: string
892
  @param node_name: the name of the node
893
  @type vg: string
894
  @param vg: volume group name
895
  @type requested: int
896
  @param requested: the amount of disk in MiB to check for
897
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
898
      or we cannot check the node
899

900
  """
901
  (_, space_info, _) = node_info
902
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
903
      space_info, constants.ST_LVM_VG)
904
  if not lvm_vg_info:
905
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
906
  vg_free = lvm_vg_info.get("storage_free", None)
907
  if not isinstance(vg_free, int):
908
    raise errors.OpPrereqError("Can't compute free disk space on node"
909
                               " %s for vg %s, result was '%s'" %
910
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
911
  if requested > vg_free:
912
    raise errors.OpPrereqError("Not enough disk space on target node %s"
913
                               " vg %s: required %d MiB, available %d MiB" %
914
                               (node_name, vg, requested, vg_free),
915
                               errors.ECODE_NORES)
916

    
917

    
918
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
919
  """Checks if nodes have enough free disk space in the specified VG.
920

921
  This function checks if all given nodes have the needed amount of
922
  free disk. In case any node has less disk or we cannot get the
923
  information from the node, this function raises an OpPrereqError
924
  exception.
925

926
  @type lu: C{LogicalUnit}
927
  @param lu: a logical unit from which we get configuration data
928
  @type node_uuids: C{list}
929
  @param node_uuids: the list of node UUIDs to check
930
  @type vg: C{str}
931
  @param vg: the volume group to check
932
  @type requested: C{int}
933
  @param requested: the amount of disk in MiB to check for
934
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
935
      or we cannot check the node
936

937
  """
938
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
939
  for node in node_uuids:
940
    node_name = lu.cfg.GetNodeName(node)
941
    info = nodeinfo[node]
942
    info.Raise("Cannot get current information from node %s" % node_name,
943
               prereq=True, ecode=errors.ECODE_ENVIRON)
944
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
945

    
946

    
947
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
948
  """Checks if nodes have enough free disk space in all the VGs.
949

950
  This function checks if all given nodes have the needed amount of
951
  free disk. In case any node has less disk or we cannot get the
952
  information from the node, this function raises an OpPrereqError
953
  exception.
954

955
  @type lu: C{LogicalUnit}
956
  @param lu: a logical unit from which we get configuration data
957
  @type node_uuids: C{list}
958
  @param node_uuids: the list of node UUIDs to check
959
  @type req_sizes: C{dict}
960
  @param req_sizes: the hash of vg and corresponding amount of disk in
961
      MiB to check for
962
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
963
      or we cannot check the node
964

965
  """
966
  for vg, req_size in req_sizes.items():
967
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
968

    
969

    
970
def _DiskSizeInBytesToMebibytes(lu, size):
971
  """Converts a disk size in bytes to mebibytes.
972

973
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
974

975
  """
976
  (mib, remainder) = divmod(size, 1024 * 1024)
977

    
978
  if remainder != 0:
979
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
980
                  " to not overwrite existing data (%s bytes will not be"
981
                  " wiped)", (1024 * 1024) - remainder)
982
    mib += 1
983

    
984
  return mib
985

    
986

    
987
def _CalcEta(time_taken, written, total_size):
988
  """Calculates the ETA based on size written and total size.
989

990
  @param time_taken: The time taken so far
991
  @param written: amount written so far
992
  @param total_size: The total size of data to be written
993
  @return: The remaining time in seconds
994

995
  """
996
  avg_time = time_taken / float(written)
997
  return (total_size - written) * avg_time
998

    
999

    
1000
def WipeDisks(lu, instance, disks=None):
1001
  """Wipes instance disks.
1002

1003
  @type lu: L{LogicalUnit}
1004
  @param lu: the logical unit on whose behalf we execute
1005
  @type instance: L{objects.Instance}
1006
  @param instance: the instance whose disks we should create
1007
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1008
  @param disks: Disk details; tuple contains disk index, disk object and the
1009
    start offset
1010

1011
  """
1012
  node_uuid = instance.primary_node
1013
  node_name = lu.cfg.GetNodeName(node_uuid)
1014

    
1015
  if disks is None:
1016
    disks = [(idx, disk, 0)
1017
             for (idx, disk) in enumerate(instance.disks)]
1018

    
1019
  for (_, device, _) in disks:
1020
    lu.cfg.SetDiskID(device, node_uuid)
1021

    
1022
  logging.info("Pausing synchronization of disks of instance '%s'",
1023
               instance.name)
1024
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025
                                                  (map(compat.snd, disks),
1026
                                                   instance),
1027
                                                  True)
1028
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1029

    
1030
  for idx, success in enumerate(result.payload):
1031
    if not success:
1032
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033
                   " failed", idx, instance.name)
1034

    
1035
  try:
1036
    for (idx, device, offset) in disks:
1037
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1038
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1039
      wipe_chunk_size = \
1040
        int(min(constants.MAX_WIPE_CHUNK,
1041
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1042

    
1043
      size = device.size
1044
      last_output = 0
1045
      start_time = time.time()
1046

    
1047
      if offset == 0:
1048
        info_text = ""
1049
      else:
1050
        info_text = (" (from %s to %s)" %
1051
                     (utils.FormatUnit(offset, "h"),
1052
                      utils.FormatUnit(size, "h")))
1053

    
1054
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1055

    
1056
      logging.info("Wiping disk %d for instance %s on node %s using"
1057
                   " chunk size %s", idx, instance.name, node_name,
1058
                   wipe_chunk_size)
1059

    
1060
      while offset < size:
1061
        wipe_size = min(wipe_chunk_size, size - offset)
1062

    
1063
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1064
                      idx, offset, wipe_size)
1065

    
1066
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1067
                                           offset, wipe_size)
1068
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069
                     (idx, offset, wipe_size))
1070

    
1071
        now = time.time()
1072
        offset += wipe_size
1073
        if now - last_output >= 60:
1074
          eta = _CalcEta(now - start_time, offset, size)
1075
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1076
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1077
          last_output = now
1078
  finally:
1079
    logging.info("Resuming synchronization of disks for instance '%s'",
1080
                 instance.name)
1081

    
1082
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083
                                                    (map(compat.snd, disks),
1084
                                                     instance),
1085
                                                    False)
1086

    
1087
    if result.fail_msg:
1088
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089
                    node_name, result.fail_msg)
1090
    else:
1091
      for idx, success in enumerate(result.payload):
1092
        if not success:
1093
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094
                        " failed", idx, instance.name)
1095

    
1096

    
1097
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098
  """Wrapper for L{WipeDisks} that handles errors.
1099

1100
  @type lu: L{LogicalUnit}
1101
  @param lu: the logical unit on whose behalf we execute
1102
  @type instance: L{objects.Instance}
1103
  @param instance: the instance whose disks we should wipe
1104
  @param disks: see L{WipeDisks}
1105
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1106
      case of error
1107
  @raise errors.OpPrereqError: in case of failure
1108

1109
  """
1110
  try:
1111
    WipeDisks(lu, instance, disks=disks)
1112
  except errors.OpExecError:
1113
    logging.warning("Wiping disks for instance '%s' failed",
1114
                    instance.name)
1115
    _UndoCreateDisks(lu, cleanup)
1116
    raise
1117

    
1118

    
1119
def ExpandCheckDisks(instance, disks):
1120
  """Return the instance disks selected by the disks list
1121

1122
  @type disks: list of L{objects.Disk} or None
1123
  @param disks: selected disks
1124
  @rtype: list of L{objects.Disk}
1125
  @return: selected instance disks to act on
1126

1127
  """
1128
  if disks is None:
1129
    return instance.disks
1130
  else:
1131
    if not set(disks).issubset(instance.disks):
1132
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1133
                                   " target instance: expected a subset of %r,"
1134
                                   " got %r" % (instance.disks, disks))
1135
    return disks
1136

    
1137

    
1138
def WaitForSync(lu, instance, disks=None, oneshot=False):
1139
  """Sleep and poll for an instance's disk to sync.
1140

1141
  """
1142
  if not instance.disks or disks is not None and not disks:
1143
    return True
1144

    
1145
  disks = ExpandCheckDisks(instance, disks)
1146

    
1147
  if not oneshot:
1148
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1149

    
1150
  node_uuid = instance.primary_node
1151
  node_name = lu.cfg.GetNodeName(node_uuid)
1152

    
1153
  for dev in disks:
1154
    lu.cfg.SetDiskID(dev, node_uuid)
1155

    
1156
  # TODO: Convert to utils.Retry
1157

    
1158
  retries = 0
1159
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1160
  while True:
1161
    max_time = 0
1162
    done = True
1163
    cumul_degraded = False
1164
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1165
    msg = rstats.fail_msg
1166
    if msg:
1167
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1168
      retries += 1
1169
      if retries >= 10:
1170
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1171
                                 " aborting." % node_name)
1172
      time.sleep(6)
1173
      continue
1174
    rstats = rstats.payload
1175
    retries = 0
1176
    for i, mstat in enumerate(rstats):
1177
      if mstat is None:
1178
        lu.LogWarning("Can't compute data for node %s/%s",
1179
                      node_name, disks[i].iv_name)
1180
        continue
1181

    
1182
      cumul_degraded = (cumul_degraded or
1183
                        (mstat.is_degraded and mstat.sync_percent is None))
1184
      if mstat.sync_percent is not None:
1185
        done = False
1186
        if mstat.estimated_time is not None:
1187
          rem_time = ("%s remaining (estimated)" %
1188
                      utils.FormatSeconds(mstat.estimated_time))
1189
          max_time = mstat.estimated_time
1190
        else:
1191
          rem_time = "no time estimate"
1192
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1193
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1194

    
1195
    # if we're done but degraded, let's do a few small retries, to
1196
    # make sure we see a stable and not transient situation; therefore
1197
    # we force restart of the loop
1198
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1199
      logging.info("Degraded disks found, %d retries left", degr_retries)
1200
      degr_retries -= 1
1201
      time.sleep(1)
1202
      continue
1203

    
1204
    if done or oneshot:
1205
      break
1206

    
1207
    time.sleep(min(60, max_time))
1208

    
1209
  if done:
1210
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1211

    
1212
  return not cumul_degraded
1213

    
1214

    
1215
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1216
  """Shutdown block devices of an instance.
1217

1218
  This does the shutdown on all nodes of the instance.
1219

1220
  If the ignore_primary is false, errors on the primary node are
1221
  ignored.
1222

1223
  """
1224
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225
  all_result = True
1226
  disks = ExpandCheckDisks(instance, disks)
1227

    
1228
  for disk in disks:
1229
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1230
      lu.cfg.SetDiskID(top_disk, node_uuid)
1231
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232
      msg = result.fail_msg
1233
      if msg:
1234
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1237
            (node_uuid != instance.primary_node and not result.offline)):
1238
          all_result = False
1239
  return all_result
1240

    
1241

    
1242
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243
  """Shutdown block devices of an instance.
1244

1245
  This function checks if an instance is running, before calling
1246
  _ShutdownInstanceDisks.
1247

1248
  """
1249
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250
  ShutdownInstanceDisks(lu, instance, disks=disks)
1251

    
1252

    
1253
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254
                           ignore_size=False):
1255
  """Prepare the block devices for an instance.
1256

1257
  This sets up the block devices on all nodes.
1258

1259
  @type lu: L{LogicalUnit}
1260
  @param lu: the logical unit on whose behalf we execute
1261
  @type instance: L{objects.Instance}
1262
  @param instance: the instance for whose disks we assemble
1263
  @type disks: list of L{objects.Disk} or None
1264
  @param disks: which disks to assemble (or all, if None)
1265
  @type ignore_secondaries: boolean
1266
  @param ignore_secondaries: if true, errors on secondary nodes
1267
      won't result in an error return from the function
1268
  @type ignore_size: boolean
1269
  @param ignore_size: if true, the current known size of the disk
1270
      will not be used during the disk activation, useful for cases
1271
      when the size is wrong
1272
  @return: False if the operation failed, otherwise a list of
1273
      (host, instance_visible_name, node_visible_name)
1274
      with the mapping from node devices to instance devices
1275

1276
  """
1277
  device_info = []
1278
  disks_ok = True
1279
  disks = ExpandCheckDisks(instance, disks)
1280

    
1281
  # With the two passes mechanism we try to reduce the window of
1282
  # opportunity for the race condition of switching DRBD to primary
1283
  # before handshaking occured, but we do not eliminate it
1284

    
1285
  # The proper fix would be to wait (with some limits) until the
1286
  # connection has been made and drbd transitions from WFConnection
1287
  # into any other network-connected state (Connected, SyncTarget,
1288
  # SyncSource, etc.)
1289

    
1290
  # mark instance disks as active before doing actual work, so watcher does
1291
  # not try to shut them down erroneously
1292
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1293

    
1294
  # 1st pass, assemble on all nodes in secondary mode
1295
  for idx, inst_disk in enumerate(disks):
1296
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297
                                  instance.primary_node):
1298
      if ignore_size:
1299
        node_disk = node_disk.Copy()
1300
        node_disk.UnsetSize()
1301
      lu.cfg.SetDiskID(node_disk, node_uuid)
1302
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303
                                             instance.name, False, idx)
1304
      msg = result.fail_msg
1305
      if msg:
1306
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1307
                                result.offline)
1308
        lu.LogWarning("Could not prepare block device %s on node %s"
1309
                      " (is_primary=False, pass=1): %s",
1310
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311
        if not (ignore_secondaries or is_offline_secondary):
1312
          disks_ok = False
1313

    
1314
  # FIXME: race condition on drbd migration to primary
1315

    
1316
  # 2nd pass, do only the primary node
1317
  for idx, inst_disk in enumerate(disks):
1318
    dev_path = None
1319

    
1320
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321
                                  instance.primary_node):
1322
      if node_uuid != instance.primary_node:
1323
        continue
1324
      if ignore_size:
1325
        node_disk = node_disk.Copy()
1326
        node_disk.UnsetSize()
1327
      lu.cfg.SetDiskID(node_disk, node_uuid)
1328
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329
                                             instance.name, True, idx)
1330
      msg = result.fail_msg
1331
      if msg:
1332
        lu.LogWarning("Could not prepare block device %s on node %s"
1333
                      " (is_primary=True, pass=2): %s",
1334
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1335
        disks_ok = False
1336
      else:
1337
        dev_path = result.payload
1338

    
1339
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340
                        inst_disk.iv_name, dev_path))
1341

    
1342
  # leave the disks configured for the primary node
1343
  # this is a workaround that would be fixed better by
1344
  # improving the logical/physical id handling
1345
  for disk in disks:
1346
    lu.cfg.SetDiskID(disk, instance.primary_node)
1347

    
1348
  if not disks_ok:
1349
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1350

    
1351
  return disks_ok, device_info
1352

    
1353

    
1354
def StartInstanceDisks(lu, instance, force):
1355
  """Start the disks of an instance.
1356

1357
  """
1358
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1359
                                      ignore_secondaries=force)
1360
  if not disks_ok:
1361
    ShutdownInstanceDisks(lu, instance)
1362
    if force is not None and not force:
1363
      lu.LogWarning("",
1364
                    hint=("If the message above refers to a secondary node,"
1365
                          " you can retry the operation using '--force'"))
1366
    raise errors.OpExecError("Disk consistency error")
1367

    
1368

    
1369
class LUInstanceGrowDisk(LogicalUnit):
1370
  """Grow a disk of an instance.
1371

1372
  """
1373
  HPATH = "disk-grow"
1374
  HTYPE = constants.HTYPE_INSTANCE
1375
  REQ_BGL = False
1376

    
1377
  def ExpandNames(self):
1378
    self._ExpandAndLockInstance()
1379
    self.needed_locks[locking.LEVEL_NODE] = []
1380
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1381
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1382
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1383

    
1384
  def DeclareLocks(self, level):
1385
    if level == locking.LEVEL_NODE:
1386
      self._LockInstancesNodes()
1387
    elif level == locking.LEVEL_NODE_RES:
1388
      # Copy node locks
1389
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1390
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1391

    
1392
  def BuildHooksEnv(self):
1393
    """Build hooks env.
1394

1395
    This runs on the master, the primary and all the secondaries.
1396

1397
    """
1398
    env = {
1399
      "DISK": self.op.disk,
1400
      "AMOUNT": self.op.amount,
1401
      "ABSOLUTE": self.op.absolute,
1402
      }
1403
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1404
    return env
1405

    
1406
  def BuildHooksNodes(self):
1407
    """Build hooks nodes.
1408

1409
    """
1410
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1411
    return (nl, nl)
1412

    
1413
  def CheckPrereq(self):
1414
    """Check prerequisites.
1415

1416
    This checks that the instance is in the cluster.
1417

1418
    """
1419
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1420
    assert self.instance is not None, \
1421
      "Cannot retrieve locked instance %s" % self.op.instance_name
1422
    node_uuids = list(self.instance.all_nodes)
1423
    for node_uuid in node_uuids:
1424
      CheckNodeOnline(self, node_uuid)
1425
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1426

    
1427
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1428
      raise errors.OpPrereqError("Instance's disk layout does not support"
1429
                                 " growing", errors.ECODE_INVAL)
1430

    
1431
    self.disk = self.instance.FindDisk(self.op.disk)
1432

    
1433
    if self.op.absolute:
1434
      self.target = self.op.amount
1435
      self.delta = self.target - self.disk.size
1436
      if self.delta < 0:
1437
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1438
                                   "current disk size (%s)" %
1439
                                   (utils.FormatUnit(self.target, "h"),
1440
                                    utils.FormatUnit(self.disk.size, "h")),
1441
                                   errors.ECODE_STATE)
1442
    else:
1443
      self.delta = self.op.amount
1444
      self.target = self.disk.size + self.delta
1445
      if self.delta < 0:
1446
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1447
                                   utils.FormatUnit(self.delta, "h"),
1448
                                   errors.ECODE_INVAL)
1449

    
1450
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1451

    
1452
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1453
    template = self.instance.disk_template
1454
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1455
        not any(self.node_es_flags.values())):
1456
      # TODO: check the free disk space for file, when that feature will be
1457
      # supported
1458
      # With exclusive storage we need to do something smarter than just looking
1459
      # at free space, which, in the end, is basically a dry run. So we rely on
1460
      # the dry run performed in Exec() instead.
1461
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1462

    
1463
  def Exec(self, feedback_fn):
1464
    """Execute disk grow.
1465

1466
    """
1467
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1468
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1469
            self.owned_locks(locking.LEVEL_NODE_RES))
1470

    
1471
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1472

    
1473
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1474
    if not disks_ok:
1475
      raise errors.OpExecError("Cannot activate block device to grow")
1476

    
1477
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1478
                (self.op.disk, self.instance.name,
1479
                 utils.FormatUnit(self.delta, "h"),
1480
                 utils.FormatUnit(self.target, "h")))
1481

    
1482
    # First run all grow ops in dry-run mode
1483
    for node_uuid in self.instance.all_nodes:
1484
      self.cfg.SetDiskID(self.disk, node_uuid)
1485
      result = self.rpc.call_blockdev_grow(node_uuid,
1486
                                           (self.disk, self.instance),
1487
                                           self.delta, True, True,
1488
                                           self.node_es_flags[node_uuid])
1489
      result.Raise("Dry-run grow request failed to node %s" %
1490
                   self.cfg.GetNodeName(node_uuid))
1491

    
1492
    if wipe_disks:
1493
      # Get disk size from primary node for wiping
1494
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1495
      result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1496
                                                    [self.disk])
1497
      result.Raise("Failed to retrieve disk size from node '%s'" %
1498
                   self.instance.primary_node)
1499

    
1500
      (disk_dimensions, ) = result.payload
1501

    
1502
      if disk_dimensions is None:
1503
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1504
                                 " node '%s'" % self.instance.primary_node)
1505
      (disk_size_in_bytes, _) = disk_dimensions
1506

    
1507
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1508

    
1509
      assert old_disk_size >= self.disk.size, \
1510
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1511
         (old_disk_size, self.disk.size))
1512
    else:
1513
      old_disk_size = None
1514

    
1515
    # We know that (as far as we can test) operations across different
1516
    # nodes will succeed, time to run it for real on the backing storage
1517
    for node_uuid in self.instance.all_nodes:
1518
      self.cfg.SetDiskID(self.disk, node_uuid)
1519
      result = self.rpc.call_blockdev_grow(node_uuid,
1520
                                           (self.disk, self.instance),
1521
                                           self.delta, False, True,
1522
                                           self.node_es_flags[node_uuid])
1523
      result.Raise("Grow request failed to node %s" %
1524
                   self.cfg.GetNodeName(node_uuid))
1525

    
1526
    # And now execute it for logical storage, on the primary node
1527
    node_uuid = self.instance.primary_node
1528
    self.cfg.SetDiskID(self.disk, node_uuid)
1529
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1530
                                         self.delta, False, False,
1531
                                         self.node_es_flags[node_uuid])
1532
    result.Raise("Grow request failed to node %s" %
1533
                 self.cfg.GetNodeName(node_uuid))
1534

    
1535
    self.disk.RecordGrow(self.delta)
1536
    self.cfg.Update(self.instance, feedback_fn)
1537

    
1538
    # Changes have been recorded, release node lock
1539
    ReleaseLocks(self, locking.LEVEL_NODE)
1540

    
1541
    # Downgrade lock while waiting for sync
1542
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1543

    
1544
    assert wipe_disks ^ (old_disk_size is None)
1545

    
1546
    if wipe_disks:
1547
      assert self.instance.disks[self.op.disk] == self.disk
1548

    
1549
      # Wipe newly added disk space
1550
      WipeDisks(self, self.instance,
1551
                disks=[(self.op.disk, self.disk, old_disk_size)])
1552

    
1553
    if self.op.wait_for_sync:
1554
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1555
      if disk_abort:
1556
        self.LogWarning("Disk syncing has not returned a good status; check"
1557
                        " the instance")
1558
      if not self.instance.disks_active:
1559
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1560
    elif not self.instance.disks_active:
1561
      self.LogWarning("Not shutting down the disk even if the instance is"
1562
                      " not supposed to be running because no wait for"
1563
                      " sync mode was requested")
1564

    
1565
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1566
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1567

    
1568

    
1569
class LUInstanceReplaceDisks(LogicalUnit):
1570
  """Replace the disks of an instance.
1571

1572
  """
1573
  HPATH = "mirrors-replace"
1574
  HTYPE = constants.HTYPE_INSTANCE
1575
  REQ_BGL = False
1576

    
1577
  def CheckArguments(self):
1578
    """Check arguments.
1579

1580
    """
1581
    if self.op.mode == constants.REPLACE_DISK_CHG:
1582
      if self.op.remote_node is None and self.op.iallocator is None:
1583
        raise errors.OpPrereqError("When changing the secondary either an"
1584
                                   " iallocator script must be used or the"
1585
                                   " new node given", errors.ECODE_INVAL)
1586
      else:
1587
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1588

    
1589
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1590
      # Not replacing the secondary
1591
      raise errors.OpPrereqError("The iallocator and new node options can"
1592
                                 " only be used when changing the"
1593
                                 " secondary node", errors.ECODE_INVAL)
1594

    
1595
  def ExpandNames(self):
1596
    self._ExpandAndLockInstance()
1597

    
1598
    assert locking.LEVEL_NODE not in self.needed_locks
1599
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1600
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1601

    
1602
    assert self.op.iallocator is None or self.op.remote_node is None, \
1603
      "Conflicting options"
1604

    
1605
    if self.op.remote_node is not None:
1606
      (self.op.remote_node_uuid, self.op.remote_node) = \
1607
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1608
                              self.op.remote_node)
1609

    
1610
      # Warning: do not remove the locking of the new secondary here
1611
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1612
      # currently it doesn't since parallel invocations of
1613
      # FindUnusedMinor will conflict
1614
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1615
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1616
    else:
1617
      self.needed_locks[locking.LEVEL_NODE] = []
1618
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1619

    
1620
      if self.op.iallocator is not None:
1621
        # iallocator will select a new node in the same group
1622
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1623
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1624

    
1625
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1626

    
1627
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1628
                                   self.op.instance_name, self.op.mode,
1629
                                   self.op.iallocator, self.op.remote_node_uuid,
1630
                                   self.op.disks, self.op.early_release,
1631
                                   self.op.ignore_ipolicy)
1632

    
1633
    self.tasklets = [self.replacer]
1634

    
1635
  def DeclareLocks(self, level):
1636
    if level == locking.LEVEL_NODEGROUP:
1637
      assert self.op.remote_node_uuid is None
1638
      assert self.op.iallocator is not None
1639
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1640

    
1641
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1642
      # Lock all groups used by instance optimistically; this requires going
1643
      # via the node before it's locked, requiring verification later on
1644
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1645
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1646

    
1647
    elif level == locking.LEVEL_NODE:
1648
      if self.op.iallocator is not None:
1649
        assert self.op.remote_node_uuid is None
1650
        assert not self.needed_locks[locking.LEVEL_NODE]
1651
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1652

    
1653
        # Lock member nodes of all locked groups
1654
        self.needed_locks[locking.LEVEL_NODE] = \
1655
          [node_uuid
1656
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1657
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1658
      else:
1659
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1660

    
1661
        self._LockInstancesNodes()
1662

    
1663
    elif level == locking.LEVEL_NODE_RES:
1664
      # Reuse node locks
1665
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1666
        self.needed_locks[locking.LEVEL_NODE]
1667

    
1668
  def BuildHooksEnv(self):
1669
    """Build hooks env.
1670

1671
    This runs on the master, the primary and all the secondaries.
1672

1673
    """
1674
    instance = self.replacer.instance
1675
    env = {
1676
      "MODE": self.op.mode,
1677
      "NEW_SECONDARY": self.op.remote_node,
1678
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1679
      }
1680
    env.update(BuildInstanceHookEnvByObject(self, instance))
1681
    return env
1682

    
1683
  def BuildHooksNodes(self):
1684
    """Build hooks nodes.
1685

1686
    """
1687
    instance = self.replacer.instance
1688
    nl = [
1689
      self.cfg.GetMasterNode(),
1690
      instance.primary_node,
1691
      ]
1692
    if self.op.remote_node_uuid is not None:
1693
      nl.append(self.op.remote_node_uuid)
1694
    return nl, nl
1695

    
1696
  def CheckPrereq(self):
1697
    """Check prerequisites.
1698

1699
    """
1700
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1701
            self.op.iallocator is None)
1702

    
1703
    # Verify if node group locks are still correct
1704
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1705
    if owned_groups:
1706
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1707

    
1708
    return LogicalUnit.CheckPrereq(self)
1709

    
1710

    
1711
class LUInstanceActivateDisks(NoHooksLU):
1712
  """Bring up an instance's disks.
1713

1714
  """
1715
  REQ_BGL = False
1716

    
1717
  def ExpandNames(self):
1718
    self._ExpandAndLockInstance()
1719
    self.needed_locks[locking.LEVEL_NODE] = []
1720
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1721

    
1722
  def DeclareLocks(self, level):
1723
    if level == locking.LEVEL_NODE:
1724
      self._LockInstancesNodes()
1725

    
1726
  def CheckPrereq(self):
1727
    """Check prerequisites.
1728

1729
    This checks that the instance is in the cluster.
1730

1731
    """
1732
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733
    assert self.instance is not None, \
1734
      "Cannot retrieve locked instance %s" % self.op.instance_name
1735
    CheckNodeOnline(self, self.instance.primary_node)
1736

    
1737
  def Exec(self, feedback_fn):
1738
    """Activate the disks.
1739

1740
    """
1741
    disks_ok, disks_info = \
1742
              AssembleInstanceDisks(self, self.instance,
1743
                                    ignore_size=self.op.ignore_size)
1744
    if not disks_ok:
1745
      raise errors.OpExecError("Cannot activate block devices")
1746

    
1747
    if self.op.wait_for_sync:
1748
      if not WaitForSync(self, self.instance):
1749
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1750
        raise errors.OpExecError("Some disks of the instance are degraded!")
1751

    
1752
    return disks_info
1753

    
1754

    
1755
class LUInstanceDeactivateDisks(NoHooksLU):
1756
  """Shutdown an instance's disks.
1757

1758
  """
1759
  REQ_BGL = False
1760

    
1761
  def ExpandNames(self):
1762
    self._ExpandAndLockInstance()
1763
    self.needed_locks[locking.LEVEL_NODE] = []
1764
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1765

    
1766
  def DeclareLocks(self, level):
1767
    if level == locking.LEVEL_NODE:
1768
      self._LockInstancesNodes()
1769

    
1770
  def CheckPrereq(self):
1771
    """Check prerequisites.
1772

1773
    This checks that the instance is in the cluster.
1774

1775
    """
1776
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1777
    assert self.instance is not None, \
1778
      "Cannot retrieve locked instance %s" % self.op.instance_name
1779

    
1780
  def Exec(self, feedback_fn):
1781
    """Deactivate the disks
1782

1783
    """
1784
    if self.op.force:
1785
      ShutdownInstanceDisks(self, self.instance)
1786
    else:
1787
      _SafeShutdownInstanceDisks(self, self.instance)
1788

    
1789

    
1790
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1791
                               ldisk=False):
1792
  """Check that mirrors are not degraded.
1793

1794
  @attention: The device has to be annotated already.
1795

1796
  The ldisk parameter, if True, will change the test from the
1797
  is_degraded attribute (which represents overall non-ok status for
1798
  the device(s)) to the ldisk (representing the local storage status).
1799

1800
  """
1801
  lu.cfg.SetDiskID(dev, node_uuid)
1802

    
1803
  result = True
1804

    
1805
  if on_primary or dev.AssembleOnSecondary():
1806
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1807
    msg = rstats.fail_msg
1808
    if msg:
1809
      lu.LogWarning("Can't find disk on node %s: %s",
1810
                    lu.cfg.GetNodeName(node_uuid), msg)
1811
      result = False
1812
    elif not rstats.payload:
1813
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1814
      result = False
1815
    else:
1816
      if ldisk:
1817
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1818
      else:
1819
        result = result and not rstats.payload.is_degraded
1820

    
1821
  if dev.children:
1822
    for child in dev.children:
1823
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1824
                                                     node_uuid, on_primary)
1825

    
1826
  return result
1827

    
1828

    
1829
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1830
  """Wrapper around L{_CheckDiskConsistencyInner}.
1831

1832
  """
1833
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1834
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1835
                                    ldisk=ldisk)
1836

    
1837

    
1838
def _BlockdevFind(lu, node_uuid, dev, instance):
1839
  """Wrapper around call_blockdev_find to annotate diskparams.
1840

1841
  @param lu: A reference to the lu object
1842
  @param node_uuid: The node to call out
1843
  @param dev: The device to find
1844
  @param instance: The instance object the device belongs to
1845
  @returns The result of the rpc call
1846

1847
  """
1848
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1849
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1850

    
1851

    
1852
def _GenerateUniqueNames(lu, exts):
1853
  """Generate a suitable LV name.
1854

1855
  This will generate a logical volume name for the given instance.
1856

1857
  """
1858
  results = []
1859
  for val in exts:
1860
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1861
    results.append("%s%s" % (new_id, val))
1862
  return results
1863

    
1864

    
1865
class TLReplaceDisks(Tasklet):
1866
  """Replaces disks for an instance.
1867

1868
  Note: Locking is not within the scope of this class.
1869

1870
  """
1871
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1872
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1873
    """Initializes this class.
1874

1875
    """
1876
    Tasklet.__init__(self, lu)
1877

    
1878
    # Parameters
1879
    self.instance_uuid = instance_uuid
1880
    self.instance_name = instance_name
1881
    self.mode = mode
1882
    self.iallocator_name = iallocator_name
1883
    self.remote_node_uuid = remote_node_uuid
1884
    self.disks = disks
1885
    self.early_release = early_release
1886
    self.ignore_ipolicy = ignore_ipolicy
1887

    
1888
    # Runtime data
1889
    self.instance = None
1890
    self.new_node_uuid = None
1891
    self.target_node_uuid = None
1892
    self.other_node_uuid = None
1893
    self.remote_node_info = None
1894
    self.node_secondary_ip = None
1895

    
1896
  @staticmethod
1897
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1898
                    relocate_from_node_uuids):
1899
    """Compute a new secondary node using an IAllocator.
1900

1901
    """
1902
    req = iallocator.IAReqRelocate(
1903
          inst_uuid=instance_uuid,
1904
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1905
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1906

    
1907
    ial.Run(iallocator_name)
1908

    
1909
    if not ial.success:
1910
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1911
                                 " %s" % (iallocator_name, ial.info),
1912
                                 errors.ECODE_NORES)
1913

    
1914
    remote_node_name = ial.result[0]
1915
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1916

    
1917
    if remote_node is None:
1918
      raise errors.OpPrereqError("Node %s not found in configuration" %
1919
                                 remote_node_name, errors.ECODE_NOENT)
1920

    
1921
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1922
               instance_uuid, remote_node_name)
1923

    
1924
    return remote_node.uuid
1925

    
1926
  def _FindFaultyDisks(self, node_uuid):
1927
    """Wrapper for L{FindFaultyInstanceDisks}.
1928

1929
    """
1930
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1931
                                   node_uuid, True)
1932

    
1933
  def _CheckDisksActivated(self, instance):
1934
    """Checks if the instance disks are activated.
1935

1936
    @param instance: The instance to check disks
1937
    @return: True if they are activated, False otherwise
1938

1939
    """
1940
    node_uuids = instance.all_nodes
1941

    
1942
    for idx, dev in enumerate(instance.disks):
1943
      for node_uuid in node_uuids:
1944
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1945
                        self.cfg.GetNodeName(node_uuid))
1946
        self.cfg.SetDiskID(dev, node_uuid)
1947

    
1948
        result = _BlockdevFind(self, node_uuid, dev, instance)
1949

    
1950
        if result.offline:
1951
          continue
1952
        elif result.fail_msg or not result.payload:
1953
          return False
1954

    
1955
    return True
1956

    
1957
  def CheckPrereq(self):
1958
    """Check prerequisites.
1959

1960
    This checks that the instance is in the cluster.
1961

1962
    """
1963
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1964
    assert self.instance is not None, \
1965
      "Cannot retrieve locked instance %s" % self.instance_name
1966

    
1967
    if self.instance.disk_template != constants.DT_DRBD8:
1968
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1969
                                 " instances", errors.ECODE_INVAL)
1970

    
1971
    if len(self.instance.secondary_nodes) != 1:
1972
      raise errors.OpPrereqError("The instance has a strange layout,"
1973
                                 " expected one secondary but found %d" %
1974
                                 len(self.instance.secondary_nodes),
1975
                                 errors.ECODE_FAULT)
1976

    
1977
    secondary_node_uuid = self.instance.secondary_nodes[0]
1978

    
1979
    if self.iallocator_name is None:
1980
      remote_node_uuid = self.remote_node_uuid
1981
    else:
1982
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1983
                                            self.instance.uuid,
1984
                                            self.instance.secondary_nodes)
1985

    
1986
    if remote_node_uuid is None:
1987
      self.remote_node_info = None
1988
    else:
1989
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1990
             "Remote node '%s' is not locked" % remote_node_uuid
1991

    
1992
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1993
      assert self.remote_node_info is not None, \
1994
        "Cannot retrieve locked node %s" % remote_node_uuid
1995

    
1996
    if remote_node_uuid == self.instance.primary_node:
1997
      raise errors.OpPrereqError("The specified node is the primary node of"
1998
                                 " the instance", errors.ECODE_INVAL)
1999

    
2000
    if remote_node_uuid == secondary_node_uuid:
2001
      raise errors.OpPrereqError("The specified node is already the"
2002
                                 " secondary node of the instance",
2003
                                 errors.ECODE_INVAL)
2004

    
2005
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2006
                                    constants.REPLACE_DISK_CHG):
2007
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2008
                                 errors.ECODE_INVAL)
2009

    
2010
    if self.mode == constants.REPLACE_DISK_AUTO:
2011
      if not self._CheckDisksActivated(self.instance):
2012
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2013
                                   " first" % self.instance_name,
2014
                                   errors.ECODE_STATE)
2015
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2016
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2017

    
2018
      if faulty_primary and faulty_secondary:
2019
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2020
                                   " one node and can not be repaired"
2021
                                   " automatically" % self.instance_name,
2022
                                   errors.ECODE_STATE)
2023

    
2024
      if faulty_primary:
2025
        self.disks = faulty_primary
2026
        self.target_node_uuid = self.instance.primary_node
2027
        self.other_node_uuid = secondary_node_uuid
2028
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029
      elif faulty_secondary:
2030
        self.disks = faulty_secondary
2031
        self.target_node_uuid = secondary_node_uuid
2032
        self.other_node_uuid = self.instance.primary_node
2033
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2034
      else:
2035
        self.disks = []
2036
        check_nodes = []
2037

    
2038
    else:
2039
      # Non-automatic modes
2040
      if self.mode == constants.REPLACE_DISK_PRI:
2041
        self.target_node_uuid = self.instance.primary_node
2042
        self.other_node_uuid = secondary_node_uuid
2043
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2044

    
2045
      elif self.mode == constants.REPLACE_DISK_SEC:
2046
        self.target_node_uuid = secondary_node_uuid
2047
        self.other_node_uuid = self.instance.primary_node
2048
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2049

    
2050
      elif self.mode == constants.REPLACE_DISK_CHG:
2051
        self.new_node_uuid = remote_node_uuid
2052
        self.other_node_uuid = self.instance.primary_node
2053
        self.target_node_uuid = secondary_node_uuid
2054
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2055

    
2056
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2057
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2058

    
2059
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2060
        assert old_node_info is not None
2061
        if old_node_info.offline and not self.early_release:
2062
          # doesn't make sense to delay the release
2063
          self.early_release = True
2064
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2065
                          " early-release mode", secondary_node_uuid)
2066

    
2067
      else:
2068
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2069
                                     self.mode)
2070

    
2071
      # If not specified all disks should be replaced
2072
      if not self.disks:
2073
        self.disks = range(len(self.instance.disks))
2074

    
2075
    # TODO: This is ugly, but right now we can't distinguish between internal
2076
    # submitted opcode and external one. We should fix that.
2077
    if self.remote_node_info:
2078
      # We change the node, lets verify it still meets instance policy
2079
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2080
      cluster = self.cfg.GetClusterInfo()
2081
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2082
                                                              new_group_info)
2083
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2084
                             self.remote_node_info, self.cfg,
2085
                             ignore=self.ignore_ipolicy)
2086

    
2087
    for node_uuid in check_nodes:
2088
      CheckNodeOnline(self.lu, node_uuid)
2089

    
2090
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2091
                                                          self.other_node_uuid,
2092
                                                          self.target_node_uuid]
2093
                              if node_uuid is not None)
2094

    
2095
    # Release unneeded node and node resource locks
2096
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2097
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2098
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2099

    
2100
    # Release any owned node group
2101
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2102

    
2103
    # Check whether disks are valid
2104
    for disk_idx in self.disks:
2105
      self.instance.FindDisk(disk_idx)
2106

    
2107
    # Get secondary node IP addresses
2108
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2109
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2110

    
2111
  def Exec(self, feedback_fn):
2112
    """Execute disk replacement.
2113

2114
    This dispatches the disk replacement to the appropriate handler.
2115

2116
    """
2117
    if __debug__:
2118
      # Verify owned locks before starting operation
2119
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2120
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2121
          ("Incorrect node locks, owning %s, expected %s" %
2122
           (owned_nodes, self.node_secondary_ip.keys()))
2123
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2124
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2125
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2126

    
2127
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2128
      assert list(owned_instances) == [self.instance_name], \
2129
          "Instance '%s' not locked" % self.instance_name
2130

    
2131
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2132
          "Should not own any node group lock at this point"
2133

    
2134
    if not self.disks:
2135
      feedback_fn("No disks need replacement for instance '%s'" %
2136
                  self.instance.name)
2137
      return
2138

    
2139
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2140
                (utils.CommaJoin(self.disks), self.instance.name))
2141
    feedback_fn("Current primary node: %s" %
2142
                self.cfg.GetNodeName(self.instance.primary_node))
2143
    feedback_fn("Current seconary node: %s" %
2144
                utils.CommaJoin(self.cfg.GetNodeNames(
2145
                                  self.instance.secondary_nodes)))
2146

    
2147
    activate_disks = not self.instance.disks_active
2148

    
2149
    # Activate the instance disks if we're replacing them on a down instance
2150
    if activate_disks:
2151
      StartInstanceDisks(self.lu, self.instance, True)
2152

    
2153
    try:
2154
      # Should we replace the secondary node?
2155
      if self.new_node_uuid is not None:
2156
        fn = self._ExecDrbd8Secondary
2157
      else:
2158
        fn = self._ExecDrbd8DiskOnly
2159

    
2160
      result = fn(feedback_fn)
2161
    finally:
2162
      # Deactivate the instance disks if we're replacing them on a
2163
      # down instance
2164
      if activate_disks:
2165
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2166

    
2167
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2168

    
2169
    if __debug__:
2170
      # Verify owned locks
2171
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2172
      nodes = frozenset(self.node_secondary_ip)
2173
      assert ((self.early_release and not owned_nodes) or
2174
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2175
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2176
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2177

    
2178
    return result
2179

    
2180
  def _CheckVolumeGroup(self, node_uuids):
2181
    self.lu.LogInfo("Checking volume groups")
2182

    
2183
    vgname = self.cfg.GetVGName()
2184

    
2185
    # Make sure volume group exists on all involved nodes
2186
    results = self.rpc.call_vg_list(node_uuids)
2187
    if not results:
2188
      raise errors.OpExecError("Can't list volume groups on the nodes")
2189

    
2190
    for node_uuid in node_uuids:
2191
      res = results[node_uuid]
2192
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2193
      if vgname not in res.payload:
2194
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2195
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2196

    
2197
  def _CheckDisksExistence(self, node_uuids):
2198
    # Check disk existence
2199
    for idx, dev in enumerate(self.instance.disks):
2200
      if idx not in self.disks:
2201
        continue
2202

    
2203
      for node_uuid in node_uuids:
2204
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2205
                        self.cfg.GetNodeName(node_uuid))
2206
        self.cfg.SetDiskID(dev, node_uuid)
2207

    
2208
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2209

    
2210
        msg = result.fail_msg
2211
        if msg or not result.payload:
2212
          if not msg:
2213
            msg = "disk not found"
2214
          if not self._CheckDisksActivated(self.instance):
2215
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2216
                          " running activate-disks on the instance before"
2217
                          " using replace-disks.")
2218
          else:
2219
            extra_hint = ""
2220
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2221
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2222
                                    extra_hint))
2223

    
2224
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2225
    for idx, dev in enumerate(self.instance.disks):
2226
      if idx not in self.disks:
2227
        continue
2228

    
2229
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2230
                      (idx, self.cfg.GetNodeName(node_uuid)))
2231

    
2232
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2233
                                  on_primary, ldisk=ldisk):
2234
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2235
                                 " replace disks for instance %s" %
2236
                                 (self.cfg.GetNodeName(node_uuid),
2237
                                  self.instance.name))
2238

    
2239
  def _CreateNewStorage(self, node_uuid):
2240
    """Create new storage on the primary or secondary node.
2241

2242
    This is only used for same-node replaces, not for changing the
2243
    secondary node, hence we don't want to modify the existing disk.
2244

2245
    """
2246
    iv_names = {}
2247

    
2248
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2249
    for idx, dev in enumerate(disks):
2250
      if idx not in self.disks:
2251
        continue
2252

    
2253
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2254
                      self.cfg.GetNodeName(node_uuid), idx)
2255

    
2256
      self.cfg.SetDiskID(dev, node_uuid)
2257

    
2258
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2259
      names = _GenerateUniqueNames(self.lu, lv_names)
2260

    
2261
      (data_disk, meta_disk) = dev.children
2262
      vg_data = data_disk.logical_id[0]
2263
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2264
                             logical_id=(vg_data, names[0]),
2265
                             params=data_disk.params)
2266
      vg_meta = meta_disk.logical_id[0]
2267
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2268
                             size=constants.DRBD_META_SIZE,
2269
                             logical_id=(vg_meta, names[1]),
2270
                             params=meta_disk.params)
2271

    
2272
      new_lvs = [lv_data, lv_meta]
2273
      old_lvs = [child.Copy() for child in dev.children]
2274
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2275
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2276

    
2277
      # we pass force_create=True to force the LVM creation
2278
      for new_lv in new_lvs:
2279
        try:
2280
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2281
                               GetInstanceInfoText(self.instance), False,
2282
                               excl_stor)
2283
        except errors.DeviceCreationError, e:
2284
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2285

    
2286
    return iv_names
2287

    
2288
  def _CheckDevices(self, node_uuid, iv_names):
2289
    for name, (dev, _, _) in iv_names.iteritems():
2290
      self.cfg.SetDiskID(dev, node_uuid)
2291

    
2292
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2293

    
2294
      msg = result.fail_msg
2295
      if msg or not result.payload:
2296
        if not msg:
2297
          msg = "disk not found"
2298
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2299
                                 (name, msg))
2300

    
2301
      if result.payload.is_degraded:
2302
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2303

    
2304
  def _RemoveOldStorage(self, node_uuid, iv_names):
2305
    for name, (_, old_lvs, _) in iv_names.iteritems():
2306
      self.lu.LogInfo("Remove logical volumes for %s", name)
2307

    
2308
      for lv in old_lvs:
2309
        self.cfg.SetDiskID(lv, node_uuid)
2310

    
2311
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2312
        if msg:
2313
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2314
                             hint="remove unused LVs manually")
2315

    
2316
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2317
    """Replace a disk on the primary or secondary for DRBD 8.
2318

2319
    The algorithm for replace is quite complicated:
2320

2321
      1. for each disk to be replaced:
2322

2323
        1. create new LVs on the target node with unique names
2324
        1. detach old LVs from the drbd device
2325
        1. rename old LVs to name_replaced.<time_t>
2326
        1. rename new LVs to old LVs
2327
        1. attach the new LVs (with the old names now) to the drbd device
2328

2329
      1. wait for sync across all devices
2330

2331
      1. for each modified disk:
2332

2333
        1. remove old LVs (which have the name name_replaces.<time_t>)
2334

2335
    Failures are not very well handled.
2336

2337
    """
2338
    steps_total = 6
2339

    
2340
    # Step: check device activation
2341
    self.lu.LogStep(1, steps_total, "Check device existence")
2342
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2343
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2344

    
2345
    # Step: check other node consistency
2346
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2347
    self._CheckDisksConsistency(
2348
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2349
      False)
2350

    
2351
    # Step: create new storage
2352
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2353
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2354

    
2355
    # Step: for each lv, detach+rename*2+attach
2356
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2357
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2358
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2359

    
2360
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2361
                                                     old_lvs)
2362
      result.Raise("Can't detach drbd from local storage on node"
2363
                   " %s for device %s" %
2364
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2365
      #dev.children = []
2366
      #cfg.Update(instance)
2367

    
2368
      # ok, we created the new LVs, so now we know we have the needed
2369
      # storage; as such, we proceed on the target node to rename
2370
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2371
      # using the assumption that logical_id == physical_id (which in
2372
      # turn is the unique_id on that node)
2373

    
2374
      # FIXME(iustin): use a better name for the replaced LVs
2375
      temp_suffix = int(time.time())
2376
      ren_fn = lambda d, suff: (d.physical_id[0],
2377
                                d.physical_id[1] + "_replaced-%s" % suff)
2378

    
2379
      # Build the rename list based on what LVs exist on the node
2380
      rename_old_to_new = []
2381
      for to_ren in old_lvs:
2382
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2383
        if not result.fail_msg and result.payload:
2384
          # device exists
2385
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2386

    
2387
      self.lu.LogInfo("Renaming the old LVs on the target node")
2388
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2389
                                             rename_old_to_new)
2390
      result.Raise("Can't rename old LVs on node %s" %
2391
                   self.cfg.GetNodeName(self.target_node_uuid))
2392

    
2393
      # Now we rename the new LVs to the old LVs
2394
      self.lu.LogInfo("Renaming the new LVs on the target node")
2395
      rename_new_to_old = [(new, old.physical_id)
2396
                           for old, new in zip(old_lvs, new_lvs)]
2397
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2398
                                             rename_new_to_old)
2399
      result.Raise("Can't rename new LVs on node %s" %
2400
                   self.cfg.GetNodeName(self.target_node_uuid))
2401

    
2402
      # Intermediate steps of in memory modifications
2403
      for old, new in zip(old_lvs, new_lvs):
2404
        new.logical_id = old.logical_id
2405
        self.cfg.SetDiskID(new, self.target_node_uuid)
2406

    
2407
      # We need to modify old_lvs so that removal later removes the
2408
      # right LVs, not the newly added ones; note that old_lvs is a
2409
      # copy here
2410
      for disk in old_lvs:
2411
        disk.logical_id = ren_fn(disk, temp_suffix)
2412
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2413

    
2414
      # Now that the new lvs have the old name, we can add them to the device
2415
      self.lu.LogInfo("Adding new mirror component on %s",
2416
                      self.cfg.GetNodeName(self.target_node_uuid))
2417
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2418
                                                  (dev, self.instance), new_lvs)
2419
      msg = result.fail_msg
2420
      if msg:
2421
        for new_lv in new_lvs:
2422
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2423
                                               new_lv).fail_msg
2424
          if msg2:
2425
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2426
                               hint=("cleanup manually the unused logical"
2427
                                     "volumes"))
2428
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2429

    
2430
    cstep = itertools.count(5)
2431

    
2432
    if self.early_release:
2433
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2434
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2435
      # TODO: Check if releasing locks early still makes sense
2436
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2437
    else:
2438
      # Release all resource locks except those used by the instance
2439
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2440
                   keep=self.node_secondary_ip.keys())
2441

    
2442
    # Release all node locks while waiting for sync
2443
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2444

    
2445
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2446
    # shutdown in the caller into consideration.
2447

    
2448
    # Wait for sync
2449
    # This can fail as the old devices are degraded and _WaitForSync
2450
    # does a combined result over all disks, so we don't check its return value
2451
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2452
    WaitForSync(self.lu, self.instance)
2453

    
2454
    # Check all devices manually
2455
    self._CheckDevices(self.instance.primary_node, iv_names)
2456

    
2457
    # Step: remove old storage
2458
    if not self.early_release:
2459
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2460
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2461

    
2462
  def _ExecDrbd8Secondary(self, feedback_fn):
2463
    """Replace the secondary node for DRBD 8.
2464

2465
    The algorithm for replace is quite complicated:
2466
      - for all disks of the instance:
2467
        - create new LVs on the new node with same names
2468
        - shutdown the drbd device on the old secondary
2469
        - disconnect the drbd network on the primary
2470
        - create the drbd device on the new secondary
2471
        - network attach the drbd on the primary, using an artifice:
2472
          the drbd code for Attach() will connect to the network if it
2473
          finds a device which is connected to the good local disks but
2474
          not network enabled
2475
      - wait for sync across all devices
2476
      - remove all disks from the old secondary
2477

2478
    Failures are not very well handled.
2479

2480
    """
2481
    steps_total = 6
2482

    
2483
    pnode = self.instance.primary_node
2484

    
2485
    # Step: check device activation
2486
    self.lu.LogStep(1, steps_total, "Check device existence")
2487
    self._CheckDisksExistence([self.instance.primary_node])
2488
    self._CheckVolumeGroup([self.instance.primary_node])
2489

    
2490
    # Step: check other node consistency
2491
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2492
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2493

    
2494
    # Step: create new storage
2495
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2496
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2497
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2498
                                                  self.new_node_uuid)
2499
    for idx, dev in enumerate(disks):
2500
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2501
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2502
      # we pass force_create=True to force LVM creation
2503
      for new_lv in dev.children:
2504
        try:
2505
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2506
                               new_lv, True, GetInstanceInfoText(self.instance),
2507
                               False, excl_stor)
2508
        except errors.DeviceCreationError, e:
2509
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2510

    
2511
    # Step 4: dbrd minors and drbd setups changes
2512
    # after this, we must manually remove the drbd minors on both the
2513
    # error and the success paths
2514
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2515
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2516
                                         for _ in self.instance.disks],
2517
                                        self.instance.uuid)
2518
    logging.debug("Allocated minors %r", minors)
2519

    
2520
    iv_names = {}
2521
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2522
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2523
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2524
      # create new devices on new_node; note that we create two IDs:
2525
      # one without port, so the drbd will be activated without
2526
      # networking information on the new node at this stage, and one
2527
      # with network, for the latter activation in step 4
2528
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2529
      if self.instance.primary_node == o_node1:
2530
        p_minor = o_minor1
2531
      else:
2532
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2533
        p_minor = o_minor2
2534

    
2535
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2536
                      p_minor, new_minor, o_secret)
2537
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2538
                    p_minor, new_minor, o_secret)
2539

    
2540
      iv_names[idx] = (dev, dev.children, new_net_id)
2541
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2542
                    new_net_id)
2543
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2544
                              logical_id=new_alone_id,
2545
                              children=dev.children,
2546
                              size=dev.size,
2547
                              params={})
2548
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2549
                                            self.cfg)
2550
      try:
2551
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2552
                             anno_new_drbd,
2553
                             GetInstanceInfoText(self.instance), False,
2554
                             excl_stor)
2555
      except errors.GenericError:
2556
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2557
        raise
2558

    
2559
    # We have new devices, shutdown the drbd on the old secondary
2560
    for idx, dev in enumerate(self.instance.disks):
2561
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2562
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2563
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2564
                                            (dev, self.instance)).fail_msg
2565
      if msg:
2566
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2567
                           "node: %s" % (idx, msg),
2568
                           hint=("Please cleanup this device manually as"
2569
                                 " soon as possible"))
2570

    
2571
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2572
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2573
                                               self.instance.disks)[pnode]
2574

    
2575
    msg = result.fail_msg
2576
    if msg:
2577
      # detaches didn't succeed (unlikely)
2578
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2579
      raise errors.OpExecError("Can't detach the disks from the network on"
2580
                               " old node: %s" % (msg,))
2581

    
2582
    # if we managed to detach at least one, we update all the disks of
2583
    # the instance to point to the new secondary
2584
    self.lu.LogInfo("Updating instance configuration")
2585
    for dev, _, new_logical_id in iv_names.itervalues():
2586
      dev.logical_id = new_logical_id
2587
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2588

    
2589
    self.cfg.Update(self.instance, feedback_fn)
2590

    
2591
    # Release all node locks (the configuration has been updated)
2592
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2593

    
2594
    # and now perform the drbd attach
2595
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2596
                    " (standalone => connected)")
2597
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2598
                                            self.new_node_uuid],
2599
                                           self.node_secondary_ip,
2600
                                           (self.instance.disks, self.instance),
2601
                                           self.instance.name,
2602
                                           False)
2603
    for to_node, to_result in result.items():
2604
      msg = to_result.fail_msg
2605
      if msg:
2606
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2607
                           self.cfg.GetNodeName(to_node), msg,
2608
                           hint=("please do a gnt-instance info to see the"
2609
                                 " status of disks"))
2610

    
2611
    cstep = itertools.count(5)
2612

    
2613
    if self.early_release:
2614
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2615
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2616
      # TODO: Check if releasing locks early still makes sense
2617
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2618
    else:
2619
      # Release all resource locks except those used by the instance
2620
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2621
                   keep=self.node_secondary_ip.keys())
2622

    
2623
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2624
    # shutdown in the caller into consideration.
2625

    
2626
    # Wait for sync
2627
    # This can fail as the old devices are degraded and _WaitForSync
2628
    # does a combined result over all disks, so we don't check its return value
2629
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2630
    WaitForSync(self.lu, self.instance)
2631

    
2632
    # Check all devices manually
2633
    self._CheckDevices(self.instance.primary_node, iv_names)
2634

    
2635
    # Step: remove old storage
2636
    if not self.early_release:
2637
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2638
      self._RemoveOldStorage(self.target_node_uuid, iv_names)