Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 0c3d9c7c

History | View | Annotate | Download (99.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
59
                         excl_stor):
60
  """Create a single block device on a given node.
61

62
  This will not recurse over children of the device, so they must be
63
  created in advance.
64

65
  @param lu: the lu on whose behalf we execute
66
  @param node_uuid: the node on which to create the device
67
  @type instance: L{objects.Instance}
68
  @param instance: the instance which owns the device
69
  @type device: L{objects.Disk}
70
  @param device: the device to create
71
  @param info: the extra 'metadata' we should attach to the device
72
      (this will be represented as a LVM tag)
73
  @type force_open: boolean
74
  @param force_open: this parameter will be passes to the
75
      L{backend.BlockdevCreate} function where it specifies
76
      whether we run on primary or not, and it affects both
77
      the child assembly and the device own Open() execution
78
  @type excl_stor: boolean
79
  @param excl_stor: Whether exclusive_storage is active for the node
80

81
  """
82
  lu.cfg.SetDiskID(device, node_uuid)
83
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
84
                                       device.size, instance.name, force_open,
85
                                       info, excl_stor)
86
  result.Raise("Can't create block device %s on"
87
               " node %s for instance %s" % (device,
88
                                             lu.cfg.GetNodeName(node_uuid),
89
                                             instance.name))
90
  if device.physical_id is None:
91
    device.physical_id = result.payload
92

    
93

    
94
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
95
                         info, force_open, excl_stor):
96
  """Create a tree of block devices on a given node.
97

98
  If this device type has to be created on secondaries, create it and
99
  all its children.
100

101
  If not, just recurse to children keeping the same 'force' value.
102

103
  @attention: The device has to be annotated already.
104

105
  @param lu: the lu on whose behalf we execute
106
  @param node_uuid: the node on which to create the device
107
  @type instance: L{objects.Instance}
108
  @param instance: the instance which owns the device
109
  @type device: L{objects.Disk}
110
  @param device: the device to create
111
  @type force_create: boolean
112
  @param force_create: whether to force creation of this device; this
113
      will be change to True whenever we find a device which has
114
      CreateOnSecondary() attribute
115
  @param info: the extra 'metadata' we should attach to the device
116
      (this will be represented as a LVM tag)
117
  @type force_open: boolean
118
  @param force_open: this parameter will be passes to the
119
      L{backend.BlockdevCreate} function where it specifies
120
      whether we run on primary or not, and it affects both
121
      the child assembly and the device own Open() execution
122
  @type excl_stor: boolean
123
  @param excl_stor: Whether exclusive_storage is active for the node
124

125
  @return: list of created devices
126
  """
127
  created_devices = []
128
  try:
129
    if device.CreateOnSecondary():
130
      force_create = True
131

    
132
    if device.children:
133
      for child in device.children:
134
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
135
                                    force_create, info, force_open, excl_stor)
136
        created_devices.extend(devs)
137

    
138
    if not force_create:
139
      return created_devices
140

    
141
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
142
                         excl_stor)
143
    # The device has been completely created, so there is no point in keeping
144
    # its subdevices in the list. We just add the device itself instead.
145
    created_devices = [(node_uuid, device)]
146
    return created_devices
147

    
148
  except errors.DeviceCreationError, e:
149
    e.created_devices.extend(created_devices)
150
    raise e
151
  except errors.OpExecError, e:
152
    raise errors.DeviceCreationError(str(e), created_devices)
153

    
154

    
155
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
156
  """Whether exclusive_storage is in effect for the given node.
157

158
  @type cfg: L{config.ConfigWriter}
159
  @param cfg: The cluster configuration
160
  @type node_uuid: string
161
  @param node_uuid: The node UUID
162
  @rtype: bool
163
  @return: The effective value of exclusive_storage
164
  @raise errors.OpPrereqError: if no node exists with the given name
165

166
  """
167
  ni = cfg.GetNodeInfo(node_uuid)
168
  if ni is None:
169
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
170
                               errors.ECODE_NOENT)
171
  return IsExclusiveStorageEnabledNode(cfg, ni)
172

    
173

    
174
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
175
                    force_open):
176
  """Wrapper around L{_CreateBlockDevInner}.
177

178
  This method annotates the root device first.
179

180
  """
181
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
182
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
183
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
184
                              force_open, excl_stor)
185

    
186

    
187
def _UndoCreateDisks(lu, disks_created, instance):
188
  """Undo the work performed by L{CreateDisks}.
189

190
  This function is called in case of an error to undo the work of
191
  L{CreateDisks}.
192

193
  @type lu: L{LogicalUnit}
194
  @param lu: the logical unit on whose behalf we execute
195
  @param disks_created: the result returned by L{CreateDisks}
196
  @type instance: L{objects.Instance}
197
  @param instance: the instance for which disks were created
198

199
  """
200
  for (node_uuid, disk) in disks_created:
201
    lu.cfg.SetDiskID(disk, node_uuid)
202
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
203
    result.Warn("Failed to remove newly-created disk %s on node %s" %
204
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
205

    
206

    
207
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208
  """Create all disks for an instance.
209

210
  This abstracts away some work from AddInstance.
211

212
  @type lu: L{LogicalUnit}
213
  @param lu: the logical unit on whose behalf we execute
214
  @type instance: L{objects.Instance}
215
  @param instance: the instance whose disks we should create
216
  @type to_skip: list
217
  @param to_skip: list of indices to skip
218
  @type target_node_uuid: string
219
  @param target_node_uuid: if passed, overrides the target node for creation
220
  @type disks: list of {objects.Disk}
221
  @param disks: the disks to create; if not specified, all the disks of the
222
      instance are created
223
  @return: information about the created disks, to be used to call
224
      L{_UndoCreateDisks}
225
  @raise errors.OpPrereqError: in case of error
226

227
  """
228
  info = GetInstanceInfoText(instance)
229
  if target_node_uuid is None:
230
    pnode_uuid = instance.primary_node
231
    all_node_uuids = instance.all_nodes
232
  else:
233
    pnode_uuid = target_node_uuid
234
    all_node_uuids = [pnode_uuid]
235

    
236
  if disks is None:
237
    disks = instance.disks
238

    
239
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
240

    
241
  if instance.disk_template in constants.DTS_FILEBASED:
242
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
243
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
244

    
245
    result.Raise("Failed to create directory '%s' on"
246
                 " node %s" % (file_storage_dir,
247
                               lu.cfg.GetNodeName(pnode_uuid)))
248

    
249
  disks_created = []
250
  for idx, device in enumerate(disks):
251
    if to_skip and idx in to_skip:
252
      continue
253
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
254
    for node_uuid in all_node_uuids:
255
      f_create = node_uuid == pnode_uuid
256
      try:
257
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
258
                        f_create)
259
        disks_created.append((node_uuid, device))
260
      except errors.DeviceCreationError, e:
261
        logging.warning("Creating disk %s for instance '%s' failed",
262
                        idx, instance.name)
263
        disks_created.extend(e.created_devices)
264
        _UndoCreateDisks(lu, disks_created, instance)
265
        raise errors.OpExecError(e.message)
266
  return disks_created
267

    
268

    
269
def ComputeDiskSizePerVG(disk_template, disks):
270
  """Compute disk size requirements in the volume group
271

272
  """
273
  def _compute(disks, payload):
274
    """Universal algorithm.
275

276
    """
277
    vgs = {}
278
    for disk in disks:
279
      vgs[disk[constants.IDISK_VG]] = \
280
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
281

    
282
    return vgs
283

    
284
  # Required free disk space as a function of disk and swap space
285
  req_size_dict = {
286
    constants.DT_DISKLESS: {},
287
    constants.DT_PLAIN: _compute(disks, 0),
288
    # 128 MB are added for drbd metadata for each disk
289
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
290
    constants.DT_FILE: {},
291
    constants.DT_SHARED_FILE: {},
292
    }
293

    
294
  if disk_template not in req_size_dict:
295
    raise errors.ProgrammerError("Disk template '%s' size requirement"
296
                                 " is unknown" % disk_template)
297

    
298
  return req_size_dict[disk_template]
299

    
300

    
301
def ComputeDisks(op, default_vg):
302
  """Computes the instance disks.
303

304
  @param op: The instance opcode
305
  @param default_vg: The default_vg to assume
306

307
  @return: The computed disks
308

309
  """
310
  disks = []
311
  for disk in op.disks:
312
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
313
    if mode not in constants.DISK_ACCESS_SET:
314
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
315
                                 mode, errors.ECODE_INVAL)
316
    size = disk.get(constants.IDISK_SIZE, None)
317
    if size is None:
318
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
319
    try:
320
      size = int(size)
321
    except (TypeError, ValueError):
322
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
323
                                 errors.ECODE_INVAL)
324

    
325
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
326
    if ext_provider and op.disk_template != constants.DT_EXT:
327
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
328
                                 " disk template, not %s" %
329
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
330
                                  op.disk_template), errors.ECODE_INVAL)
331

    
332
    data_vg = disk.get(constants.IDISK_VG, default_vg)
333
    name = disk.get(constants.IDISK_NAME, None)
334
    if name is not None and name.lower() == constants.VALUE_NONE:
335
      name = None
336
    new_disk = {
337
      constants.IDISK_SIZE: size,
338
      constants.IDISK_MODE: mode,
339
      constants.IDISK_VG: data_vg,
340
      constants.IDISK_NAME: name,
341
      }
342

    
343
    for key in [
344
      constants.IDISK_METAVG,
345
      constants.IDISK_ADOPT,
346
      constants.IDISK_SPINDLES,
347
      ]:
348
      if key in disk:
349
        new_disk[key] = disk[key]
350

    
351
    # For extstorage, demand the `provider' option and add any
352
    # additional parameters (ext-params) to the dict
353
    if op.disk_template == constants.DT_EXT:
354
      if ext_provider:
355
        new_disk[constants.IDISK_PROVIDER] = ext_provider
356
        for key in disk:
357
          if key not in constants.IDISK_PARAMS:
358
            new_disk[key] = disk[key]
359
      else:
360
        raise errors.OpPrereqError("Missing provider for template '%s'" %
361
                                   constants.DT_EXT, errors.ECODE_INVAL)
362

    
363
    disks.append(new_disk)
364

    
365
  return disks
366

    
367

    
368
def CheckRADOSFreeSpace():
369
  """Compute disk size requirements inside the RADOS cluster.
370

371
  """
372
  # For the RADOS cluster we assume there is always enough space.
373
  pass
374

    
375

    
376
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
377
                         iv_name, p_minor, s_minor):
378
  """Generate a drbd8 device complete with its children.
379

380
  """
381
  assert len(vgnames) == len(names) == 2
382
  port = lu.cfg.AllocatePort()
383
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384

    
385
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
386
                          logical_id=(vgnames[0], names[0]),
387
                          params={})
388
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
390
                          size=constants.DRBD_META_SIZE,
391
                          logical_id=(vgnames[1], names[1]),
392
                          params={})
393
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
395
                          logical_id=(primary_uuid, secondary_uuid, port,
396
                                      p_minor, s_minor,
397
                                      shared_secret),
398
                          children=[dev_data, dev_meta],
399
                          iv_name=iv_name, params={})
400
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401
  return drbd_dev
402

    
403

    
404
def GenerateDiskTemplate(
405
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
406
  disk_info, file_storage_dir, file_driver, base_index,
407
  feedback_fn, full_disk_params):
408
  """Generate the entire disk layout for a given template type.
409

410
  """
411
  vgname = lu.cfg.GetVGName()
412
  disk_count = len(disk_info)
413
  disks = []
414

    
415
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
416

    
417
  if template_name == constants.DT_DISKLESS:
418
    pass
419
  elif template_name == constants.DT_DRBD8:
420
    if len(secondary_node_uuids) != 1:
421
      raise errors.ProgrammerError("Wrong template configuration")
422
    remote_node_uuid = secondary_node_uuids[0]
423
    minors = lu.cfg.AllocateDRBDMinor(
424
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
425

    
426
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
427
                                                       full_disk_params)
428
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
429

    
430
    names = []
431
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
432
                                               for i in range(disk_count)]):
433
      names.append(lv_prefix + "_data")
434
      names.append(lv_prefix + "_meta")
435
    for idx, disk in enumerate(disk_info):
436
      disk_index = idx + base_index
437
      data_vg = disk.get(constants.IDISK_VG, vgname)
438
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
439
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
440
                                      disk[constants.IDISK_SIZE],
441
                                      [data_vg, meta_vg],
442
                                      names[idx * 2:idx * 2 + 2],
443
                                      "disk/%d" % disk_index,
444
                                      minors[idx * 2], minors[idx * 2 + 1])
445
      disk_dev.mode = disk[constants.IDISK_MODE]
446
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
447
      disks.append(disk_dev)
448
  else:
449
    if secondary_node_uuids:
450
      raise errors.ProgrammerError("Wrong template configuration")
451

    
452
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
453
    if name_prefix is None:
454
      names = None
455
    else:
456
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
457
                                        (name_prefix, base_index + i)
458
                                        for i in range(disk_count)])
459

    
460
    if template_name == constants.DT_PLAIN:
461

    
462
      def logical_id_fn(idx, _, disk):
463
        vg = disk.get(constants.IDISK_VG, vgname)
464
        return (vg, names[idx])
465

    
466
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
467
      logical_id_fn = \
468
        lambda _, disk_index, disk: (file_driver,
469
                                     "%s/disk%d" % (file_storage_dir,
470
                                                    disk_index))
471
    elif template_name == constants.DT_BLOCK:
472
      logical_id_fn = \
473
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
474
                                       disk[constants.IDISK_ADOPT])
475
    elif template_name == constants.DT_RBD:
476
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
477
    elif template_name == constants.DT_EXT:
478
      def logical_id_fn(idx, _, disk):
479
        provider = disk.get(constants.IDISK_PROVIDER, None)
480
        if provider is None:
481
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
482
                                       " not found", constants.DT_EXT,
483
                                       constants.IDISK_PROVIDER)
484
        return (provider, names[idx])
485
    else:
486
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
487

    
488
    dev_type = template_name
489

    
490
    for idx, disk in enumerate(disk_info):
491
      params = {}
492
      # Only for the Ext template add disk_info to params
493
      if template_name == constants.DT_EXT:
494
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
495
        for key in disk:
496
          if key not in constants.IDISK_PARAMS:
497
            params[key] = disk[key]
498
      disk_index = idx + base_index
499
      size = disk[constants.IDISK_SIZE]
500
      feedback_fn("* disk %s, size %s" %
501
                  (disk_index, utils.FormatUnit(size, "h")))
502
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
503
                              logical_id=logical_id_fn(idx, disk_index, disk),
504
                              iv_name="disk/%d" % disk_index,
505
                              mode=disk[constants.IDISK_MODE],
506
                              params=params,
507
                              spindles=disk.get(constants.IDISK_SPINDLES))
508
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
509
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
510
      disks.append(disk_dev)
511

    
512
  return disks
513

    
514

    
515
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
516
  """Check the presence of the spindle options with exclusive_storage.
517

518
  @type diskdict: dict
519
  @param diskdict: disk parameters
520
  @type es_flag: bool
521
  @param es_flag: the effective value of the exlusive_storage flag
522
  @type required: bool
523
  @param required: whether spindles are required or just optional
524
  @raise errors.OpPrereqError when spindles are given and they should not
525

526
  """
527
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
528
      diskdict[constants.IDISK_SPINDLES] is not None):
529
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
530
                               " when exclusive storage is not active",
531
                               errors.ECODE_INVAL)
532
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
533
                                diskdict[constants.IDISK_SPINDLES] is None)):
534
    raise errors.OpPrereqError("You must specify spindles in instance disks"
535
                               " when exclusive storage is active",
536
                               errors.ECODE_INVAL)
537

    
538

    
539
class LUInstanceRecreateDisks(LogicalUnit):
540
  """Recreate an instance's missing disks.
541

542
  """
543
  HPATH = "instance-recreate-disks"
544
  HTYPE = constants.HTYPE_INSTANCE
545
  REQ_BGL = False
546

    
547
  _MODIFYABLE = compat.UniqueFrozenset([
548
    constants.IDISK_SIZE,
549
    constants.IDISK_MODE,
550
    constants.IDISK_SPINDLES,
551
    ])
552

    
553
  # New or changed disk parameters may have different semantics
554
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555
    constants.IDISK_ADOPT,
556

    
557
    # TODO: Implement support changing VG while recreating
558
    constants.IDISK_VG,
559
    constants.IDISK_METAVG,
560
    constants.IDISK_PROVIDER,
561
    constants.IDISK_NAME,
562
    ]))
563

    
564
  def _RunAllocator(self):
565
    """Run the allocator based on input opcode.
566

567
    """
568
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569

    
570
    # FIXME
571
    # The allocator should actually run in "relocate" mode, but current
572
    # allocators don't support relocating all the nodes of an instance at
573
    # the same time. As a workaround we use "allocate" mode, but this is
574
    # suboptimal for two reasons:
575
    # - The instance name passed to the allocator is present in the list of
576
    #   existing instances, so there could be a conflict within the
577
    #   internal structures of the allocator. This doesn't happen with the
578
    #   current allocators, but it's a liability.
579
    # - The allocator counts the resources used by the instance twice: once
580
    #   because the instance exists already, and once because it tries to
581
    #   allocate a new instance.
582
    # The allocator could choose some of the nodes on which the instance is
583
    # running, but that's not a problem. If the instance nodes are broken,
584
    # they should be already be marked as drained or offline, and hence
585
    # skipped by the allocator. If instance disks have been lost for other
586
    # reasons, then recreating the disks on the same nodes should be fine.
587
    disk_template = self.instance.disk_template
588
    spindle_use = be_full[constants.BE_SPINDLE_USE]
589
    disks = [{
590
      constants.IDISK_SIZE: d.size,
591
      constants.IDISK_MODE: d.mode,
592
      constants.IDISK_SPINDLES: d.spindles,
593
      } for d in self.instance.disks]
594
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
595
                                        disk_template=disk_template,
596
                                        tags=list(self.instance.GetTags()),
597
                                        os=self.instance.os,
598
                                        nics=[{}],
599
                                        vcpus=be_full[constants.BE_VCPUS],
600
                                        memory=be_full[constants.BE_MAXMEM],
601
                                        spindle_use=spindle_use,
602
                                        disks=disks,
603
                                        hypervisor=self.instance.hypervisor,
604
                                        node_whitelist=None)
605
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
606

    
607
    ial.Run(self.op.iallocator)
608

    
609
    assert req.RequiredNodes() == len(self.instance.all_nodes)
610

    
611
    if not ial.success:
612
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
613
                                 " %s" % (self.op.iallocator, ial.info),
614
                                 errors.ECODE_NORES)
615

    
616
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
617
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
618
                 self.op.instance_name, self.op.iallocator,
619
                 utils.CommaJoin(self.op.nodes))
620

    
621
  def CheckArguments(self):
622
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
623
      # Normalize and convert deprecated list of disk indices
624
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
625

    
626
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
627
    if duplicates:
628
      raise errors.OpPrereqError("Some disks have been specified more than"
629
                                 " once: %s" % utils.CommaJoin(duplicates),
630
                                 errors.ECODE_INVAL)
631

    
632
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
633
    # when neither iallocator nor nodes are specified
634
    if self.op.iallocator or self.op.nodes:
635
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
636

    
637
    for (idx, params) in self.op.disks:
638
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
639
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
640
      if unsupported:
641
        raise errors.OpPrereqError("Parameters for disk %s try to change"
642
                                   " unmodifyable parameter(s): %s" %
643
                                   (idx, utils.CommaJoin(unsupported)),
644
                                   errors.ECODE_INVAL)
645

    
646
  def ExpandNames(self):
647
    self._ExpandAndLockInstance()
648
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
649

    
650
    if self.op.nodes:
651
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
652
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
653
    else:
654
      self.needed_locks[locking.LEVEL_NODE] = []
655
      if self.op.iallocator:
656
        # iallocator will select a new node in the same group
657
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
658
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
659

    
660
    self.needed_locks[locking.LEVEL_NODE_RES] = []
661

    
662
  def DeclareLocks(self, level):
663
    if level == locking.LEVEL_NODEGROUP:
664
      assert self.op.iallocator is not None
665
      assert not self.op.nodes
666
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
667
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
668
      # Lock the primary group used by the instance optimistically; this
669
      # requires going via the node before it's locked, requiring
670
      # verification later on
671
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
672
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
673

    
674
    elif level == locking.LEVEL_NODE:
675
      # If an allocator is used, then we lock all the nodes in the current
676
      # instance group, as we don't know yet which ones will be selected;
677
      # if we replace the nodes without using an allocator, locks are
678
      # already declared in ExpandNames; otherwise, we need to lock all the
679
      # instance nodes for disk re-creation
680
      if self.op.iallocator:
681
        assert not self.op.nodes
682
        assert not self.needed_locks[locking.LEVEL_NODE]
683
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
684

    
685
        # Lock member nodes of the group of the primary node
686
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
687
          self.needed_locks[locking.LEVEL_NODE].extend(
688
            self.cfg.GetNodeGroup(group_uuid).members)
689

    
690
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
691
      elif not self.op.nodes:
692
        self._LockInstancesNodes(primary_only=False)
693
    elif level == locking.LEVEL_NODE_RES:
694
      # Copy node locks
695
      self.needed_locks[locking.LEVEL_NODE_RES] = \
696
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
697

    
698
  def BuildHooksEnv(self):
699
    """Build hooks env.
700

701
    This runs on master, primary and secondary nodes of the instance.
702

703
    """
704
    return BuildInstanceHookEnvByObject(self, self.instance)
705

    
706
  def BuildHooksNodes(self):
707
    """Build hooks nodes.
708

709
    """
710
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
711
    return (nl, nl)
712

    
713
  def CheckPrereq(self):
714
    """Check prerequisites.
715

716
    This checks that the instance is in the cluster and is not running.
717

718
    """
719
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
720
    assert instance is not None, \
721
      "Cannot retrieve locked instance %s" % self.op.instance_name
722
    if self.op.node_uuids:
723
      if len(self.op.node_uuids) != len(instance.all_nodes):
724
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
725
                                   " %d replacement nodes were specified" %
726
                                   (instance.name, len(instance.all_nodes),
727
                                    len(self.op.node_uuids)),
728
                                   errors.ECODE_INVAL)
729
      assert instance.disk_template != constants.DT_DRBD8 or \
730
             len(self.op.node_uuids) == 2
731
      assert instance.disk_template != constants.DT_PLAIN or \
732
             len(self.op.node_uuids) == 1
733
      primary_node = self.op.node_uuids[0]
734
    else:
735
      primary_node = instance.primary_node
736
    if not self.op.iallocator:
737
      CheckNodeOnline(self, primary_node)
738

    
739
    if instance.disk_template == constants.DT_DISKLESS:
740
      raise errors.OpPrereqError("Instance '%s' has no disks" %
741
                                 self.op.instance_name, errors.ECODE_INVAL)
742

    
743
    # Verify if node group locks are still correct
744
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
745
    if owned_groups:
746
      # Node group locks are acquired only for the primary node (and only
747
      # when the allocator is used)
748
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
749
                              primary_only=True)
750

    
751
    # if we replace nodes *and* the old primary is offline, we don't
752
    # check the instance state
753
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
754
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
755
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
756
                         msg="cannot recreate disks")
757

    
758
    if self.op.disks:
759
      self.disks = dict(self.op.disks)
760
    else:
761
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
762

    
763
    maxidx = max(self.disks.keys())
764
    if maxidx >= len(instance.disks):
765
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
766
                                 errors.ECODE_INVAL)
767

    
768
    if ((self.op.node_uuids or self.op.iallocator) and
769
         sorted(self.disks.keys()) != range(len(instance.disks))):
770
      raise errors.OpPrereqError("Can't recreate disks partially and"
771
                                 " change the nodes at the same time",
772
                                 errors.ECODE_INVAL)
773

    
774
    self.instance = instance
775

    
776
    if self.op.iallocator:
777
      self._RunAllocator()
778
      # Release unneeded node and node resource locks
779
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
780
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
781
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
782

    
783
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
784

    
785
    if self.op.node_uuids:
786
      node_uuids = self.op.node_uuids
787
    else:
788
      node_uuids = instance.all_nodes
789
    excl_stor = compat.any(
790
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
791
      )
792
    for new_params in self.disks.values():
793
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
794

    
795
  def Exec(self, feedback_fn):
796
    """Recreate the disks.
797

798
    """
799
    assert (self.owned_locks(locking.LEVEL_NODE) ==
800
            self.owned_locks(locking.LEVEL_NODE_RES))
801

    
802
    to_skip = []
803
    mods = [] # keeps track of needed changes
804

    
805
    for idx, disk in enumerate(self.instance.disks):
806
      try:
807
        changes = self.disks[idx]
808
      except KeyError:
809
        # Disk should not be recreated
810
        to_skip.append(idx)
811
        continue
812

    
813
      # update secondaries for disks, if needed
814
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
815
        # need to update the nodes and minors
816
        assert len(self.op.node_uuids) == 2
817
        assert len(disk.logical_id) == 6 # otherwise disk internals
818
                                         # have changed
819
        (_, _, old_port, _, _, old_secret) = disk.logical_id
820
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
821
                                                self.instance.uuid)
822
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
823
                  new_minors[0], new_minors[1], old_secret)
824
        assert len(disk.logical_id) == len(new_id)
825
      else:
826
        new_id = None
827

    
828
      mods.append((idx, new_id, changes))
829

    
830
    # now that we have passed all asserts above, we can apply the mods
831
    # in a single run (to avoid partial changes)
832
    for idx, new_id, changes in mods:
833
      disk = self.instance.disks[idx]
834
      if new_id is not None:
835
        assert disk.dev_type == constants.DT_DRBD8
836
        disk.logical_id = new_id
837
      if changes:
838
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
839
                    mode=changes.get(constants.IDISK_MODE, None),
840
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
841

    
842
    # change primary node, if needed
843
    if self.op.node_uuids:
844
      self.instance.primary_node = self.op.node_uuids[0]
845
      self.LogWarning("Changing the instance's nodes, you will have to"
846
                      " remove any disks left on the older nodes manually")
847

    
848
    if self.op.node_uuids:
849
      self.cfg.Update(self.instance, feedback_fn)
850

    
851
    # All touched nodes must be locked
852
    mylocks = self.owned_locks(locking.LEVEL_NODE)
853
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
854
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
855

    
856
    # TODO: Release node locks before wiping, or explain why it's not possible
857
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
858
      wipedisks = [(idx, disk, 0)
859
                   for (idx, disk) in enumerate(self.instance.disks)
860
                   if idx not in to_skip]
861
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
862
                         cleanup=new_disks)
863

    
864

    
865
def _PerformNodeInfoCall(lu, node_uuids, vg):
866
  """Prepares the input and performs a node info call.
867

868
  @type lu: C{LogicalUnit}
869
  @param lu: a logical unit from which we get configuration data
870
  @type node_uuids: list of string
871
  @param node_uuids: list of node UUIDs to perform the call for
872
  @type vg: string
873
  @param vg: the volume group's name
874

875
  """
876
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
877
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
878
                                                  node_uuids)
879
  hvname = lu.cfg.GetHypervisorType()
880
  hvparams = lu.cfg.GetClusterInfo().hvparams
881
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
882
                                   [(hvname, hvparams[hvname])])
883
  return nodeinfo
884

    
885

    
886
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
887
  """Checks the vg capacity for a given node.
888

889
  @type node_info: tuple (_, list of dicts, _)
890
  @param node_info: the result of the node info call for one node
891
  @type node_name: string
892
  @param node_name: the name of the node
893
  @type vg: string
894
  @param vg: volume group name
895
  @type requested: int
896
  @param requested: the amount of disk in MiB to check for
897
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
898
      or we cannot check the node
899

900
  """
901
  (_, space_info, _) = node_info
902
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
903
      space_info, constants.ST_LVM_VG)
904
  if not lvm_vg_info:
905
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
906
  vg_free = lvm_vg_info.get("storage_free", None)
907
  if not isinstance(vg_free, int):
908
    raise errors.OpPrereqError("Can't compute free disk space on node"
909
                               " %s for vg %s, result was '%s'" %
910
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
911
  if requested > vg_free:
912
    raise errors.OpPrereqError("Not enough disk space on target node %s"
913
                               " vg %s: required %d MiB, available %d MiB" %
914
                               (node_name, vg, requested, vg_free),
915
                               errors.ECODE_NORES)
916

    
917

    
918
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
919
  """Checks if nodes have enough free disk space in the specified VG.
920

921
  This function checks if all given nodes have the needed amount of
922
  free disk. In case any node has less disk or we cannot get the
923
  information from the node, this function raises an OpPrereqError
924
  exception.
925

926
  @type lu: C{LogicalUnit}
927
  @param lu: a logical unit from which we get configuration data
928
  @type node_uuids: C{list}
929
  @param node_uuids: the list of node UUIDs to check
930
  @type vg: C{str}
931
  @param vg: the volume group to check
932
  @type requested: C{int}
933
  @param requested: the amount of disk in MiB to check for
934
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
935
      or we cannot check the node
936

937
  """
938
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
939
  for node_uuid in node_uuids:
940
    node_name = lu.cfg.GetNodeName(node_uuid)
941
    info = nodeinfo[node_uuid]
942
    info.Raise("Cannot get current information from node %s" % node_name,
943
               prereq=True, ecode=errors.ECODE_ENVIRON)
944
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
945

    
946

    
947
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
948
  """Checks if nodes have enough free disk space in all the VGs.
949

950
  This function checks if all given nodes have the needed amount of
951
  free disk. In case any node has less disk or we cannot get the
952
  information from the node, this function raises an OpPrereqError
953
  exception.
954

955
  @type lu: C{LogicalUnit}
956
  @param lu: a logical unit from which we get configuration data
957
  @type node_uuids: C{list}
958
  @param node_uuids: the list of node UUIDs to check
959
  @type req_sizes: C{dict}
960
  @param req_sizes: the hash of vg and corresponding amount of disk in
961
      MiB to check for
962
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
963
      or we cannot check the node
964

965
  """
966
  for vg, req_size in req_sizes.items():
967
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
968

    
969

    
970
def _DiskSizeInBytesToMebibytes(lu, size):
971
  """Converts a disk size in bytes to mebibytes.
972

973
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
974

975
  """
976
  (mib, remainder) = divmod(size, 1024 * 1024)
977

    
978
  if remainder != 0:
979
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
980
                  " to not overwrite existing data (%s bytes will not be"
981
                  " wiped)", (1024 * 1024) - remainder)
982
    mib += 1
983

    
984
  return mib
985

    
986

    
987
def _CalcEta(time_taken, written, total_size):
988
  """Calculates the ETA based on size written and total size.
989

990
  @param time_taken: The time taken so far
991
  @param written: amount written so far
992
  @param total_size: The total size of data to be written
993
  @return: The remaining time in seconds
994

995
  """
996
  avg_time = time_taken / float(written)
997
  return (total_size - written) * avg_time
998

    
999

    
1000
def WipeDisks(lu, instance, disks=None):
1001
  """Wipes instance disks.
1002

1003
  @type lu: L{LogicalUnit}
1004
  @param lu: the logical unit on whose behalf we execute
1005
  @type instance: L{objects.Instance}
1006
  @param instance: the instance whose disks we should create
1007
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1008
  @param disks: Disk details; tuple contains disk index, disk object and the
1009
    start offset
1010

1011
  """
1012
  node_uuid = instance.primary_node
1013
  node_name = lu.cfg.GetNodeName(node_uuid)
1014

    
1015
  if disks is None:
1016
    disks = [(idx, disk, 0)
1017
             for (idx, disk) in enumerate(instance.disks)]
1018

    
1019
  for (_, device, _) in disks:
1020
    lu.cfg.SetDiskID(device, node_uuid)
1021

    
1022
  logging.info("Pausing synchronization of disks of instance '%s'",
1023
               instance.name)
1024
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025
                                                  (map(compat.snd, disks),
1026
                                                   instance),
1027
                                                  True)
1028
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1029

    
1030
  for idx, success in enumerate(result.payload):
1031
    if not success:
1032
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033
                   " failed", idx, instance.name)
1034

    
1035
  try:
1036
    for (idx, device, offset) in disks:
1037
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1038
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1039
      wipe_chunk_size = \
1040
        int(min(constants.MAX_WIPE_CHUNK,
1041
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1042

    
1043
      size = device.size
1044
      last_output = 0
1045
      start_time = time.time()
1046

    
1047
      if offset == 0:
1048
        info_text = ""
1049
      else:
1050
        info_text = (" (from %s to %s)" %
1051
                     (utils.FormatUnit(offset, "h"),
1052
                      utils.FormatUnit(size, "h")))
1053

    
1054
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1055

    
1056
      logging.info("Wiping disk %d for instance %s on node %s using"
1057
                   " chunk size %s", idx, instance.name, node_name,
1058
                   wipe_chunk_size)
1059

    
1060
      while offset < size:
1061
        wipe_size = min(wipe_chunk_size, size - offset)
1062

    
1063
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1064
                      idx, offset, wipe_size)
1065

    
1066
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1067
                                           offset, wipe_size)
1068
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069
                     (idx, offset, wipe_size))
1070

    
1071
        now = time.time()
1072
        offset += wipe_size
1073
        if now - last_output >= 60:
1074
          eta = _CalcEta(now - start_time, offset, size)
1075
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1076
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1077
          last_output = now
1078
  finally:
1079
    logging.info("Resuming synchronization of disks for instance '%s'",
1080
                 instance.name)
1081

    
1082
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083
                                                    (map(compat.snd, disks),
1084
                                                     instance),
1085
                                                    False)
1086

    
1087
    if result.fail_msg:
1088
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089
                    node_name, result.fail_msg)
1090
    else:
1091
      for idx, success in enumerate(result.payload):
1092
        if not success:
1093
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094
                        " failed", idx, instance.name)
1095

    
1096

    
1097
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098
  """Wrapper for L{WipeDisks} that handles errors.
1099

1100
  @type lu: L{LogicalUnit}
1101
  @param lu: the logical unit on whose behalf we execute
1102
  @type instance: L{objects.Instance}
1103
  @param instance: the instance whose disks we should wipe
1104
  @param disks: see L{WipeDisks}
1105
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1106
      case of error
1107
  @raise errors.OpPrereqError: in case of failure
1108

1109
  """
1110
  try:
1111
    WipeDisks(lu, instance, disks=disks)
1112
  except errors.OpExecError:
1113
    logging.warning("Wiping disks for instance '%s' failed",
1114
                    instance.name)
1115
    _UndoCreateDisks(lu, cleanup, instance)
1116
    raise
1117

    
1118

    
1119
def ExpandCheckDisks(instance, disks):
1120
  """Return the instance disks selected by the disks list
1121

1122
  @type disks: list of L{objects.Disk} or None
1123
  @param disks: selected disks
1124
  @rtype: list of L{objects.Disk}
1125
  @return: selected instance disks to act on
1126

1127
  """
1128
  if disks is None:
1129
    return instance.disks
1130
  else:
1131
    if not set(disks).issubset(instance.disks):
1132
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1133
                                   " target instance: expected a subset of %r,"
1134
                                   " got %r" % (instance.disks, disks))
1135
    return disks
1136

    
1137

    
1138
def WaitForSync(lu, instance, disks=None, oneshot=False):
1139
  """Sleep and poll for an instance's disk to sync.
1140

1141
  """
1142
  if not instance.disks or disks is not None and not disks:
1143
    return True
1144

    
1145
  disks = ExpandCheckDisks(instance, disks)
1146

    
1147
  if not oneshot:
1148
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1149

    
1150
  node_uuid = instance.primary_node
1151
  node_name = lu.cfg.GetNodeName(node_uuid)
1152

    
1153
  for dev in disks:
1154
    lu.cfg.SetDiskID(dev, node_uuid)
1155

    
1156
  # TODO: Convert to utils.Retry
1157

    
1158
  retries = 0
1159
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1160
  while True:
1161
    max_time = 0
1162
    done = True
1163
    cumul_degraded = False
1164
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1165
    msg = rstats.fail_msg
1166
    if msg:
1167
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1168
      retries += 1
1169
      if retries >= 10:
1170
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1171
                                 " aborting." % node_name)
1172
      time.sleep(6)
1173
      continue
1174
    rstats = rstats.payload
1175
    retries = 0
1176
    for i, mstat in enumerate(rstats):
1177
      if mstat is None:
1178
        lu.LogWarning("Can't compute data for node %s/%s",
1179
                      node_name, disks[i].iv_name)
1180
        continue
1181

    
1182
      cumul_degraded = (cumul_degraded or
1183
                        (mstat.is_degraded and mstat.sync_percent is None))
1184
      if mstat.sync_percent is not None:
1185
        done = False
1186
        if mstat.estimated_time is not None:
1187
          rem_time = ("%s remaining (estimated)" %
1188
                      utils.FormatSeconds(mstat.estimated_time))
1189
          max_time = mstat.estimated_time
1190
        else:
1191
          rem_time = "no time estimate"
1192
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1193
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1194

    
1195
    # if we're done but degraded, let's do a few small retries, to
1196
    # make sure we see a stable and not transient situation; therefore
1197
    # we force restart of the loop
1198
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1199
      logging.info("Degraded disks found, %d retries left", degr_retries)
1200
      degr_retries -= 1
1201
      time.sleep(1)
1202
      continue
1203

    
1204
    if done or oneshot:
1205
      break
1206

    
1207
    time.sleep(min(60, max_time))
1208

    
1209
  if done:
1210
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1211

    
1212
  return not cumul_degraded
1213

    
1214

    
1215
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1216
  """Shutdown block devices of an instance.
1217

1218
  This does the shutdown on all nodes of the instance.
1219

1220
  If the ignore_primary is false, errors on the primary node are
1221
  ignored.
1222

1223
  """
1224
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225
  all_result = True
1226
  disks = ExpandCheckDisks(instance, disks)
1227

    
1228
  for disk in disks:
1229
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1230
      lu.cfg.SetDiskID(top_disk, node_uuid)
1231
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232
      msg = result.fail_msg
1233
      if msg:
1234
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1237
            (node_uuid != instance.primary_node and not result.offline)):
1238
          all_result = False
1239
  return all_result
1240

    
1241

    
1242
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243
  """Shutdown block devices of an instance.
1244

1245
  This function checks if an instance is running, before calling
1246
  _ShutdownInstanceDisks.
1247

1248
  """
1249
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250
  ShutdownInstanceDisks(lu, instance, disks=disks)
1251

    
1252

    
1253
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254
                           ignore_size=False):
1255
  """Prepare the block devices for an instance.
1256

1257
  This sets up the block devices on all nodes.
1258

1259
  @type lu: L{LogicalUnit}
1260
  @param lu: the logical unit on whose behalf we execute
1261
  @type instance: L{objects.Instance}
1262
  @param instance: the instance for whose disks we assemble
1263
  @type disks: list of L{objects.Disk} or None
1264
  @param disks: which disks to assemble (or all, if None)
1265
  @type ignore_secondaries: boolean
1266
  @param ignore_secondaries: if true, errors on secondary nodes
1267
      won't result in an error return from the function
1268
  @type ignore_size: boolean
1269
  @param ignore_size: if true, the current known size of the disk
1270
      will not be used during the disk activation, useful for cases
1271
      when the size is wrong
1272
  @return: False if the operation failed, otherwise a list of
1273
      (host, instance_visible_name, node_visible_name)
1274
      with the mapping from node devices to instance devices
1275

1276
  """
1277
  device_info = []
1278
  disks_ok = True
1279
  disks = ExpandCheckDisks(instance, disks)
1280

    
1281
  # With the two passes mechanism we try to reduce the window of
1282
  # opportunity for the race condition of switching DRBD to primary
1283
  # before handshaking occured, but we do not eliminate it
1284

    
1285
  # The proper fix would be to wait (with some limits) until the
1286
  # connection has been made and drbd transitions from WFConnection
1287
  # into any other network-connected state (Connected, SyncTarget,
1288
  # SyncSource, etc.)
1289

    
1290
  # mark instance disks as active before doing actual work, so watcher does
1291
  # not try to shut them down erroneously
1292
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1293

    
1294
  # 1st pass, assemble on all nodes in secondary mode
1295
  for idx, inst_disk in enumerate(disks):
1296
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297
                                  instance.primary_node):
1298
      if ignore_size:
1299
        node_disk = node_disk.Copy()
1300
        node_disk.UnsetSize()
1301
      lu.cfg.SetDiskID(node_disk, node_uuid)
1302
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303
                                             instance.name, False, idx)
1304
      msg = result.fail_msg
1305
      if msg:
1306
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1307
                                result.offline)
1308
        lu.LogWarning("Could not prepare block device %s on node %s"
1309
                      " (is_primary=False, pass=1): %s",
1310
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311
        if not (ignore_secondaries or is_offline_secondary):
1312
          disks_ok = False
1313

    
1314
  # FIXME: race condition on drbd migration to primary
1315

    
1316
  # 2nd pass, do only the primary node
1317
  for idx, inst_disk in enumerate(disks):
1318
    dev_path = None
1319

    
1320
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321
                                  instance.primary_node):
1322
      if node_uuid != instance.primary_node:
1323
        continue
1324
      if ignore_size:
1325
        node_disk = node_disk.Copy()
1326
        node_disk.UnsetSize()
1327
      lu.cfg.SetDiskID(node_disk, node_uuid)
1328
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329
                                             instance.name, True, idx)
1330
      msg = result.fail_msg
1331
      if msg:
1332
        lu.LogWarning("Could not prepare block device %s on node %s"
1333
                      " (is_primary=True, pass=2): %s",
1334
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1335
        disks_ok = False
1336
      else:
1337
        dev_path = result.payload
1338

    
1339
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340
                        inst_disk.iv_name, dev_path))
1341

    
1342
  # leave the disks configured for the primary node
1343
  # this is a workaround that would be fixed better by
1344
  # improving the logical/physical id handling
1345
  for disk in disks:
1346
    lu.cfg.SetDiskID(disk, instance.primary_node)
1347

    
1348
  if not disks_ok:
1349
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1350

    
1351
  return disks_ok, device_info
1352

    
1353

    
1354
def StartInstanceDisks(lu, instance, force):
1355
  """Start the disks of an instance.
1356

1357
  """
1358
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1359
                                      ignore_secondaries=force)
1360
  if not disks_ok:
1361
    ShutdownInstanceDisks(lu, instance)
1362
    if force is not None and not force:
1363
      lu.LogWarning("",
1364
                    hint=("If the message above refers to a secondary node,"
1365
                          " you can retry the operation using '--force'"))
1366
    raise errors.OpExecError("Disk consistency error")
1367

    
1368

    
1369
class LUInstanceGrowDisk(LogicalUnit):
1370
  """Grow a disk of an instance.
1371

1372
  """
1373
  HPATH = "disk-grow"
1374
  HTYPE = constants.HTYPE_INSTANCE
1375
  REQ_BGL = False
1376

    
1377
  def ExpandNames(self):
1378
    self._ExpandAndLockInstance()
1379
    self.needed_locks[locking.LEVEL_NODE] = []
1380
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1381
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1382
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1383

    
1384
  def DeclareLocks(self, level):
1385
    if level == locking.LEVEL_NODE:
1386
      self._LockInstancesNodes()
1387
    elif level == locking.LEVEL_NODE_RES:
1388
      # Copy node locks
1389
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1390
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1391

    
1392
  def BuildHooksEnv(self):
1393
    """Build hooks env.
1394

1395
    This runs on the master, the primary and all the secondaries.
1396

1397
    """
1398
    env = {
1399
      "DISK": self.op.disk,
1400
      "AMOUNT": self.op.amount,
1401
      "ABSOLUTE": self.op.absolute,
1402
      }
1403
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1404
    return env
1405

    
1406
  def BuildHooksNodes(self):
1407
    """Build hooks nodes.
1408

1409
    """
1410
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1411
    return (nl, nl)
1412

    
1413
  def CheckPrereq(self):
1414
    """Check prerequisites.
1415

1416
    This checks that the instance is in the cluster.
1417

1418
    """
1419
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1420
    assert self.instance is not None, \
1421
      "Cannot retrieve locked instance %s" % self.op.instance_name
1422
    node_uuids = list(self.instance.all_nodes)
1423
    for node_uuid in node_uuids:
1424
      CheckNodeOnline(self, node_uuid)
1425
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1426

    
1427
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1428
      raise errors.OpPrereqError("Instance's disk layout does not support"
1429
                                 " growing", errors.ECODE_INVAL)
1430

    
1431
    self.disk = self.instance.FindDisk(self.op.disk)
1432

    
1433
    if self.op.absolute:
1434
      self.target = self.op.amount
1435
      self.delta = self.target - self.disk.size
1436
      if self.delta < 0:
1437
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1438
                                   "current disk size (%s)" %
1439
                                   (utils.FormatUnit(self.target, "h"),
1440
                                    utils.FormatUnit(self.disk.size, "h")),
1441
                                   errors.ECODE_STATE)
1442
    else:
1443
      self.delta = self.op.amount
1444
      self.target = self.disk.size + self.delta
1445
      if self.delta < 0:
1446
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1447
                                   utils.FormatUnit(self.delta, "h"),
1448
                                   errors.ECODE_INVAL)
1449

    
1450
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1451

    
1452
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1453
    template = self.instance.disk_template
1454
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1455
        not any(self.node_es_flags.values())):
1456
      # TODO: check the free disk space for file, when that feature will be
1457
      # supported
1458
      # With exclusive storage we need to do something smarter than just looking
1459
      # at free space, which, in the end, is basically a dry run. So we rely on
1460
      # the dry run performed in Exec() instead.
1461
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1462

    
1463
  def Exec(self, feedback_fn):
1464
    """Execute disk grow.
1465

1466
    """
1467
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1468
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1469
            self.owned_locks(locking.LEVEL_NODE_RES))
1470

    
1471
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1472

    
1473
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1474
    if not disks_ok:
1475
      raise errors.OpExecError("Cannot activate block device to grow")
1476

    
1477
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1478
                (self.op.disk, self.instance.name,
1479
                 utils.FormatUnit(self.delta, "h"),
1480
                 utils.FormatUnit(self.target, "h")))
1481

    
1482
    # First run all grow ops in dry-run mode
1483
    for node_uuid in self.instance.all_nodes:
1484
      self.cfg.SetDiskID(self.disk, node_uuid)
1485
      result = self.rpc.call_blockdev_grow(node_uuid,
1486
                                           (self.disk, self.instance),
1487
                                           self.delta, True, True,
1488
                                           self.node_es_flags[node_uuid])
1489
      result.Raise("Dry-run grow request failed to node %s" %
1490
                   self.cfg.GetNodeName(node_uuid))
1491

    
1492
    if wipe_disks:
1493
      # Get disk size from primary node for wiping
1494
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1495
      result = self.rpc.call_blockdev_getdimensions(
1496
                 self.instance.primary_node, ([self.disk], self.instance))
1497
      result.Raise("Failed to retrieve disk size from node '%s'" %
1498
                   self.instance.primary_node)
1499

    
1500
      (disk_dimensions, ) = result.payload
1501

    
1502
      if disk_dimensions is None:
1503
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1504
                                 " node '%s'" % self.instance.primary_node)
1505
      (disk_size_in_bytes, _) = disk_dimensions
1506

    
1507
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1508

    
1509
      assert old_disk_size >= self.disk.size, \
1510
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1511
         (old_disk_size, self.disk.size))
1512
    else:
1513
      old_disk_size = None
1514

    
1515
    # We know that (as far as we can test) operations across different
1516
    # nodes will succeed, time to run it for real on the backing storage
1517
    for node_uuid in self.instance.all_nodes:
1518
      self.cfg.SetDiskID(self.disk, node_uuid)
1519
      result = self.rpc.call_blockdev_grow(node_uuid,
1520
                                           (self.disk, self.instance),
1521
                                           self.delta, False, True,
1522
                                           self.node_es_flags[node_uuid])
1523
      result.Raise("Grow request failed to node %s" %
1524
                   self.cfg.GetNodeName(node_uuid))
1525

    
1526
    # And now execute it for logical storage, on the primary node
1527
    node_uuid = self.instance.primary_node
1528
    self.cfg.SetDiskID(self.disk, node_uuid)
1529
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1530
                                         self.delta, False, False,
1531
                                         self.node_es_flags[node_uuid])
1532
    result.Raise("Grow request failed to node %s" %
1533
                 self.cfg.GetNodeName(node_uuid))
1534

    
1535
    self.disk.RecordGrow(self.delta)
1536
    self.cfg.Update(self.instance, feedback_fn)
1537

    
1538
    # Changes have been recorded, release node lock
1539
    ReleaseLocks(self, locking.LEVEL_NODE)
1540

    
1541
    # Downgrade lock while waiting for sync
1542
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1543

    
1544
    assert wipe_disks ^ (old_disk_size is None)
1545

    
1546
    if wipe_disks:
1547
      assert self.instance.disks[self.op.disk] == self.disk
1548

    
1549
      # Wipe newly added disk space
1550
      WipeDisks(self, self.instance,
1551
                disks=[(self.op.disk, self.disk, old_disk_size)])
1552

    
1553
    if self.op.wait_for_sync:
1554
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1555
      if disk_abort:
1556
        self.LogWarning("Disk syncing has not returned a good status; check"
1557
                        " the instance")
1558
      if not self.instance.disks_active:
1559
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1560
    elif not self.instance.disks_active:
1561
      self.LogWarning("Not shutting down the disk even if the instance is"
1562
                      " not supposed to be running because no wait for"
1563
                      " sync mode was requested")
1564

    
1565
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1566
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1567

    
1568

    
1569
class LUInstanceReplaceDisks(LogicalUnit):
1570
  """Replace the disks of an instance.
1571

1572
  """
1573
  HPATH = "mirrors-replace"
1574
  HTYPE = constants.HTYPE_INSTANCE
1575
  REQ_BGL = False
1576

    
1577
  def CheckArguments(self):
1578
    """Check arguments.
1579

1580
    """
1581
    if self.op.mode == constants.REPLACE_DISK_CHG:
1582
      if self.op.remote_node is None and self.op.iallocator is None:
1583
        raise errors.OpPrereqError("When changing the secondary either an"
1584
                                   " iallocator script must be used or the"
1585
                                   " new node given", errors.ECODE_INVAL)
1586
      else:
1587
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1588

    
1589
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1590
      # Not replacing the secondary
1591
      raise errors.OpPrereqError("The iallocator and new node options can"
1592
                                 " only be used when changing the"
1593
                                 " secondary node", errors.ECODE_INVAL)
1594

    
1595
  def ExpandNames(self):
1596
    self._ExpandAndLockInstance()
1597

    
1598
    assert locking.LEVEL_NODE not in self.needed_locks
1599
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1600
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1601

    
1602
    assert self.op.iallocator is None or self.op.remote_node is None, \
1603
      "Conflicting options"
1604

    
1605
    if self.op.remote_node is not None:
1606
      (self.op.remote_node_uuid, self.op.remote_node) = \
1607
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1608
                              self.op.remote_node)
1609

    
1610
      # Warning: do not remove the locking of the new secondary here
1611
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1612
      # currently it doesn't since parallel invocations of
1613
      # FindUnusedMinor will conflict
1614
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1615
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1616
    else:
1617
      self.needed_locks[locking.LEVEL_NODE] = []
1618
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1619

    
1620
      if self.op.iallocator is not None:
1621
        # iallocator will select a new node in the same group
1622
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1623
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1624

    
1625
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1626

    
1627
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1628
                                   self.op.instance_name, self.op.mode,
1629
                                   self.op.iallocator, self.op.remote_node_uuid,
1630
                                   self.op.disks, self.op.early_release,
1631
                                   self.op.ignore_ipolicy)
1632

    
1633
    self.tasklets = [self.replacer]
1634

    
1635
  def DeclareLocks(self, level):
1636
    if level == locking.LEVEL_NODEGROUP:
1637
      assert self.op.remote_node_uuid is None
1638
      assert self.op.iallocator is not None
1639
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1640

    
1641
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1642
      # Lock all groups used by instance optimistically; this requires going
1643
      # via the node before it's locked, requiring verification later on
1644
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1645
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1646

    
1647
    elif level == locking.LEVEL_NODE:
1648
      if self.op.iallocator is not None:
1649
        assert self.op.remote_node_uuid is None
1650
        assert not self.needed_locks[locking.LEVEL_NODE]
1651
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1652

    
1653
        # Lock member nodes of all locked groups
1654
        self.needed_locks[locking.LEVEL_NODE] = \
1655
          [node_uuid
1656
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1657
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1658
      else:
1659
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1660

    
1661
        self._LockInstancesNodes()
1662

    
1663
    elif level == locking.LEVEL_NODE_RES:
1664
      # Reuse node locks
1665
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1666
        self.needed_locks[locking.LEVEL_NODE]
1667

    
1668
  def BuildHooksEnv(self):
1669
    """Build hooks env.
1670

1671
    This runs on the master, the primary and all the secondaries.
1672

1673
    """
1674
    instance = self.replacer.instance
1675
    env = {
1676
      "MODE": self.op.mode,
1677
      "NEW_SECONDARY": self.op.remote_node,
1678
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1679
      }
1680
    env.update(BuildInstanceHookEnvByObject(self, instance))
1681
    return env
1682

    
1683
  def BuildHooksNodes(self):
1684
    """Build hooks nodes.
1685

1686
    """
1687
    instance = self.replacer.instance
1688
    nl = [
1689
      self.cfg.GetMasterNode(),
1690
      instance.primary_node,
1691
      ]
1692
    if self.op.remote_node_uuid is not None:
1693
      nl.append(self.op.remote_node_uuid)
1694
    return nl, nl
1695

    
1696
  def CheckPrereq(self):
1697
    """Check prerequisites.
1698

1699
    """
1700
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1701
            self.op.iallocator is None)
1702

    
1703
    # Verify if node group locks are still correct
1704
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1705
    if owned_groups:
1706
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1707

    
1708
    return LogicalUnit.CheckPrereq(self)
1709

    
1710

    
1711
class LUInstanceActivateDisks(NoHooksLU):
1712
  """Bring up an instance's disks.
1713

1714
  """
1715
  REQ_BGL = False
1716

    
1717
  def ExpandNames(self):
1718
    self._ExpandAndLockInstance()
1719
    self.needed_locks[locking.LEVEL_NODE] = []
1720
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1721

    
1722
  def DeclareLocks(self, level):
1723
    if level == locking.LEVEL_NODE:
1724
      self._LockInstancesNodes()
1725

    
1726
  def CheckPrereq(self):
1727
    """Check prerequisites.
1728

1729
    This checks that the instance is in the cluster.
1730

1731
    """
1732
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733
    assert self.instance is not None, \
1734
      "Cannot retrieve locked instance %s" % self.op.instance_name
1735
    CheckNodeOnline(self, self.instance.primary_node)
1736

    
1737
  def Exec(self, feedback_fn):
1738
    """Activate the disks.
1739

1740
    """
1741
    disks_ok, disks_info = \
1742
              AssembleInstanceDisks(self, self.instance,
1743
                                    ignore_size=self.op.ignore_size)
1744
    if not disks_ok:
1745
      raise errors.OpExecError("Cannot activate block devices")
1746

    
1747
    if self.op.wait_for_sync:
1748
      if not WaitForSync(self, self.instance):
1749
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1750
        raise errors.OpExecError("Some disks of the instance are degraded!")
1751

    
1752
    return disks_info
1753

    
1754

    
1755
class LUInstanceDeactivateDisks(NoHooksLU):
1756
  """Shutdown an instance's disks.
1757

1758
  """
1759
  REQ_BGL = False
1760

    
1761
  def ExpandNames(self):
1762
    self._ExpandAndLockInstance()
1763
    self.needed_locks[locking.LEVEL_NODE] = []
1764
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1765

    
1766
  def DeclareLocks(self, level):
1767
    if level == locking.LEVEL_NODE:
1768
      self._LockInstancesNodes()
1769

    
1770
  def CheckPrereq(self):
1771
    """Check prerequisites.
1772

1773
    This checks that the instance is in the cluster.
1774

1775
    """
1776
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1777
    assert self.instance is not None, \
1778
      "Cannot retrieve locked instance %s" % self.op.instance_name
1779

    
1780
  def Exec(self, feedback_fn):
1781
    """Deactivate the disks
1782

1783
    """
1784
    if self.op.force:
1785
      ShutdownInstanceDisks(self, self.instance)
1786
    else:
1787
      _SafeShutdownInstanceDisks(self, self.instance)
1788

    
1789

    
1790
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1791
                               ldisk=False):
1792
  """Check that mirrors are not degraded.
1793

1794
  @attention: The device has to be annotated already.
1795

1796
  The ldisk parameter, if True, will change the test from the
1797
  is_degraded attribute (which represents overall non-ok status for
1798
  the device(s)) to the ldisk (representing the local storage status).
1799

1800
  """
1801
  lu.cfg.SetDiskID(dev, node_uuid)
1802

    
1803
  result = True
1804

    
1805
  if on_primary or dev.AssembleOnSecondary():
1806
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1807
    msg = rstats.fail_msg
1808
    if msg:
1809
      lu.LogWarning("Can't find disk on node %s: %s",
1810
                    lu.cfg.GetNodeName(node_uuid), msg)
1811
      result = False
1812
    elif not rstats.payload:
1813
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1814
      result = False
1815
    else:
1816
      if ldisk:
1817
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1818
      else:
1819
        result = result and not rstats.payload.is_degraded
1820

    
1821
  if dev.children:
1822
    for child in dev.children:
1823
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1824
                                                     node_uuid, on_primary)
1825

    
1826
  return result
1827

    
1828

    
1829
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1830
  """Wrapper around L{_CheckDiskConsistencyInner}.
1831

1832
  """
1833
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1834
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1835
                                    ldisk=ldisk)
1836

    
1837

    
1838
def _BlockdevFind(lu, node_uuid, dev, instance):
1839
  """Wrapper around call_blockdev_find to annotate diskparams.
1840

1841
  @param lu: A reference to the lu object
1842
  @param node_uuid: The node to call out
1843
  @param dev: The device to find
1844
  @param instance: The instance object the device belongs to
1845
  @returns The result of the rpc call
1846

1847
  """
1848
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1849
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1850

    
1851

    
1852
def _GenerateUniqueNames(lu, exts):
1853
  """Generate a suitable LV name.
1854

1855
  This will generate a logical volume name for the given instance.
1856

1857
  """
1858
  results = []
1859
  for val in exts:
1860
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1861
    results.append("%s%s" % (new_id, val))
1862
  return results
1863

    
1864

    
1865
class TLReplaceDisks(Tasklet):
1866
  """Replaces disks for an instance.
1867

1868
  Note: Locking is not within the scope of this class.
1869

1870
  """
1871
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1872
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1873
    """Initializes this class.
1874

1875
    """
1876
    Tasklet.__init__(self, lu)
1877

    
1878
    # Parameters
1879
    self.instance_uuid = instance_uuid
1880
    self.instance_name = instance_name
1881
    self.mode = mode
1882
    self.iallocator_name = iallocator_name
1883
    self.remote_node_uuid = remote_node_uuid
1884
    self.disks = disks
1885
    self.early_release = early_release
1886
    self.ignore_ipolicy = ignore_ipolicy
1887

    
1888
    # Runtime data
1889
    self.instance = None
1890
    self.new_node_uuid = None
1891
    self.target_node_uuid = None
1892
    self.other_node_uuid = None
1893
    self.remote_node_info = None
1894
    self.node_secondary_ip = None
1895

    
1896
  @staticmethod
1897
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1898
                    relocate_from_node_uuids):
1899
    """Compute a new secondary node using an IAllocator.
1900

1901
    """
1902
    req = iallocator.IAReqRelocate(
1903
          inst_uuid=instance_uuid,
1904
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1905
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1906

    
1907
    ial.Run(iallocator_name)
1908

    
1909
    if not ial.success:
1910
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1911
                                 " %s" % (iallocator_name, ial.info),
1912
                                 errors.ECODE_NORES)
1913

    
1914
    remote_node_name = ial.result[0]
1915
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1916

    
1917
    if remote_node is None:
1918
      raise errors.OpPrereqError("Node %s not found in configuration" %
1919
                                 remote_node_name, errors.ECODE_NOENT)
1920

    
1921
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1922
               instance_uuid, remote_node_name)
1923

    
1924
    return remote_node.uuid
1925

    
1926
  def _FindFaultyDisks(self, node_uuid):
1927
    """Wrapper for L{FindFaultyInstanceDisks}.
1928

1929
    """
1930
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1931
                                   node_uuid, True)
1932

    
1933
  def _CheckDisksActivated(self, instance):
1934
    """Checks if the instance disks are activated.
1935

1936
    @param instance: The instance to check disks
1937
    @return: True if they are activated, False otherwise
1938

1939
    """
1940
    node_uuids = instance.all_nodes
1941

    
1942
    for idx, dev in enumerate(instance.disks):
1943
      for node_uuid in node_uuids:
1944
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1945
                        self.cfg.GetNodeName(node_uuid))
1946
        self.cfg.SetDiskID(dev, node_uuid)
1947

    
1948
        result = _BlockdevFind(self, node_uuid, dev, instance)
1949

    
1950
        if result.offline:
1951
          continue
1952
        elif result.fail_msg or not result.payload:
1953
          return False
1954

    
1955
    return True
1956

    
1957
  def CheckPrereq(self):
1958
    """Check prerequisites.
1959

1960
    This checks that the instance is in the cluster.
1961

1962
    """
1963
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1964
    assert self.instance is not None, \
1965
      "Cannot retrieve locked instance %s" % self.instance_name
1966

    
1967
    if self.instance.disk_template != constants.DT_DRBD8:
1968
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1969
                                 " instances", errors.ECODE_INVAL)
1970

    
1971
    if len(self.instance.secondary_nodes) != 1:
1972
      raise errors.OpPrereqError("The instance has a strange layout,"
1973
                                 " expected one secondary but found %d" %
1974
                                 len(self.instance.secondary_nodes),
1975
                                 errors.ECODE_FAULT)
1976

    
1977
    secondary_node_uuid = self.instance.secondary_nodes[0]
1978

    
1979
    if self.iallocator_name is None:
1980
      remote_node_uuid = self.remote_node_uuid
1981
    else:
1982
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1983
                                            self.instance.uuid,
1984
                                            self.instance.secondary_nodes)
1985

    
1986
    if remote_node_uuid is None:
1987
      self.remote_node_info = None
1988
    else:
1989
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1990
             "Remote node '%s' is not locked" % remote_node_uuid
1991

    
1992
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1993
      assert self.remote_node_info is not None, \
1994
        "Cannot retrieve locked node %s" % remote_node_uuid
1995

    
1996
    if remote_node_uuid == self.instance.primary_node:
1997
      raise errors.OpPrereqError("The specified node is the primary node of"
1998
                                 " the instance", errors.ECODE_INVAL)
1999

    
2000
    if remote_node_uuid == secondary_node_uuid:
2001
      raise errors.OpPrereqError("The specified node is already the"
2002
                                 " secondary node of the instance",
2003
                                 errors.ECODE_INVAL)
2004

    
2005
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2006
                                    constants.REPLACE_DISK_CHG):
2007
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2008
                                 errors.ECODE_INVAL)
2009

    
2010
    if self.mode == constants.REPLACE_DISK_AUTO:
2011
      if not self._CheckDisksActivated(self.instance):
2012
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2013
                                   " first" % self.instance_name,
2014
                                   errors.ECODE_STATE)
2015
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2016
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2017

    
2018
      if faulty_primary and faulty_secondary:
2019
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2020
                                   " one node and can not be repaired"
2021
                                   " automatically" % self.instance_name,
2022
                                   errors.ECODE_STATE)
2023

    
2024
      if faulty_primary:
2025
        self.disks = faulty_primary
2026
        self.target_node_uuid = self.instance.primary_node
2027
        self.other_node_uuid = secondary_node_uuid
2028
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029
      elif faulty_secondary:
2030
        self.disks = faulty_secondary
2031
        self.target_node_uuid = secondary_node_uuid
2032
        self.other_node_uuid = self.instance.primary_node
2033
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2034
      else:
2035
        self.disks = []
2036
        check_nodes = []
2037

    
2038
    else:
2039
      # Non-automatic modes
2040
      if self.mode == constants.REPLACE_DISK_PRI:
2041
        self.target_node_uuid = self.instance.primary_node
2042
        self.other_node_uuid = secondary_node_uuid
2043
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2044

    
2045
      elif self.mode == constants.REPLACE_DISK_SEC:
2046
        self.target_node_uuid = secondary_node_uuid
2047
        self.other_node_uuid = self.instance.primary_node
2048
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2049

    
2050
      elif self.mode == constants.REPLACE_DISK_CHG:
2051
        self.new_node_uuid = remote_node_uuid
2052
        self.other_node_uuid = self.instance.primary_node
2053
        self.target_node_uuid = secondary_node_uuid
2054
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2055

    
2056
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2057
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2058

    
2059
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2060
        assert old_node_info is not None
2061
        if old_node_info.offline and not self.early_release:
2062
          # doesn't make sense to delay the release
2063
          self.early_release = True
2064
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2065
                          " early-release mode", secondary_node_uuid)
2066

    
2067
      else:
2068
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2069
                                     self.mode)
2070

    
2071
      # If not specified all disks should be replaced
2072
      if not self.disks:
2073
        self.disks = range(len(self.instance.disks))
2074

    
2075
    # TODO: This is ugly, but right now we can't distinguish between internal
2076
    # submitted opcode and external one. We should fix that.
2077
    if self.remote_node_info:
2078
      # We change the node, lets verify it still meets instance policy
2079
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2080
      cluster = self.cfg.GetClusterInfo()
2081
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2082
                                                              new_group_info)
2083
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2084
                             self.remote_node_info, self.cfg,
2085
                             ignore=self.ignore_ipolicy)
2086

    
2087
    for node_uuid in check_nodes:
2088
      CheckNodeOnline(self.lu, node_uuid)
2089

    
2090
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2091
                                                          self.other_node_uuid,
2092
                                                          self.target_node_uuid]
2093
                              if node_uuid is not None)
2094

    
2095
    # Release unneeded node and node resource locks
2096
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2097
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2098
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2099

    
2100
    # Release any owned node group
2101
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2102

    
2103
    # Check whether disks are valid
2104
    for disk_idx in self.disks:
2105
      self.instance.FindDisk(disk_idx)
2106

    
2107
    # Get secondary node IP addresses
2108
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2109
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2110

    
2111
  def Exec(self, feedback_fn):
2112
    """Execute disk replacement.
2113

2114
    This dispatches the disk replacement to the appropriate handler.
2115

2116
    """
2117
    if __debug__:
2118
      # Verify owned locks before starting operation
2119
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2120
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2121
          ("Incorrect node locks, owning %s, expected %s" %
2122
           (owned_nodes, self.node_secondary_ip.keys()))
2123
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2124
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2125
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2126

    
2127
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2128
      assert list(owned_instances) == [self.instance_name], \
2129
          "Instance '%s' not locked" % self.instance_name
2130

    
2131
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2132
          "Should not own any node group lock at this point"
2133

    
2134
    if not self.disks:
2135
      feedback_fn("No disks need replacement for instance '%s'" %
2136
                  self.instance.name)
2137
      return
2138

    
2139
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2140
                (utils.CommaJoin(self.disks), self.instance.name))
2141
    feedback_fn("Current primary node: %s" %
2142
                self.cfg.GetNodeName(self.instance.primary_node))
2143
    feedback_fn("Current seconary node: %s" %
2144
                utils.CommaJoin(self.cfg.GetNodeNames(
2145
                                  self.instance.secondary_nodes)))
2146

    
2147
    activate_disks = not self.instance.disks_active
2148

    
2149
    # Activate the instance disks if we're replacing them on a down instance
2150
    if activate_disks:
2151
      StartInstanceDisks(self.lu, self.instance, True)
2152

    
2153
    try:
2154
      # Should we replace the secondary node?
2155
      if self.new_node_uuid is not None:
2156
        fn = self._ExecDrbd8Secondary
2157
      else:
2158
        fn = self._ExecDrbd8DiskOnly
2159

    
2160
      result = fn(feedback_fn)
2161
    finally:
2162
      # Deactivate the instance disks if we're replacing them on a
2163
      # down instance
2164
      if activate_disks:
2165
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2166

    
2167
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2168

    
2169
    if __debug__:
2170
      # Verify owned locks
2171
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2172
      nodes = frozenset(self.node_secondary_ip)
2173
      assert ((self.early_release and not owned_nodes) or
2174
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2175
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2176
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2177

    
2178
    return result
2179

    
2180
  def _CheckVolumeGroup(self, node_uuids):
2181
    self.lu.LogInfo("Checking volume groups")
2182

    
2183
    vgname = self.cfg.GetVGName()
2184

    
2185
    # Make sure volume group exists on all involved nodes
2186
    results = self.rpc.call_vg_list(node_uuids)
2187
    if not results:
2188
      raise errors.OpExecError("Can't list volume groups on the nodes")
2189

    
2190
    for node_uuid in node_uuids:
2191
      res = results[node_uuid]
2192
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2193
      if vgname not in res.payload:
2194
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2195
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2196

    
2197
  def _CheckDisksExistence(self, node_uuids):
2198
    # Check disk existence
2199
    for idx, dev in enumerate(self.instance.disks):
2200
      if idx not in self.disks:
2201
        continue
2202

    
2203
      for node_uuid in node_uuids:
2204
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2205
                        self.cfg.GetNodeName(node_uuid))
2206
        self.cfg.SetDiskID(dev, node_uuid)
2207

    
2208
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2209

    
2210
        msg = result.fail_msg
2211
        if msg or not result.payload:
2212
          if not msg:
2213
            msg = "disk not found"
2214
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2215
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2216

    
2217
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2218
    for idx, dev in enumerate(self.instance.disks):
2219
      if idx not in self.disks:
2220
        continue
2221

    
2222
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2223
                      (idx, self.cfg.GetNodeName(node_uuid)))
2224

    
2225
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2226
                                  on_primary, ldisk=ldisk):
2227
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2228
                                 " replace disks for instance %s" %
2229
                                 (self.cfg.GetNodeName(node_uuid),
2230
                                  self.instance.name))
2231

    
2232
  def _CreateNewStorage(self, node_uuid):
2233
    """Create new storage on the primary or secondary node.
2234

2235
    This is only used for same-node replaces, not for changing the
2236
    secondary node, hence we don't want to modify the existing disk.
2237

2238
    """
2239
    iv_names = {}
2240

    
2241
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2242
    for idx, dev in enumerate(disks):
2243
      if idx not in self.disks:
2244
        continue
2245

    
2246
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2247
                      self.cfg.GetNodeName(node_uuid), idx)
2248

    
2249
      self.cfg.SetDiskID(dev, node_uuid)
2250

    
2251
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2252
      names = _GenerateUniqueNames(self.lu, lv_names)
2253

    
2254
      (data_disk, meta_disk) = dev.children
2255
      vg_data = data_disk.logical_id[0]
2256
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2257
                             logical_id=(vg_data, names[0]),
2258
                             params=data_disk.params)
2259
      vg_meta = meta_disk.logical_id[0]
2260
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2261
                             size=constants.DRBD_META_SIZE,
2262
                             logical_id=(vg_meta, names[1]),
2263
                             params=meta_disk.params)
2264

    
2265
      new_lvs = [lv_data, lv_meta]
2266
      old_lvs = [child.Copy() for child in dev.children]
2267
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2268
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2269

    
2270
      # we pass force_create=True to force the LVM creation
2271
      for new_lv in new_lvs:
2272
        try:
2273
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2274
                               GetInstanceInfoText(self.instance), False,
2275
                               excl_stor)
2276
        except errors.DeviceCreationError, e:
2277
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2278

    
2279
    return iv_names
2280

    
2281
  def _CheckDevices(self, node_uuid, iv_names):
2282
    for name, (dev, _, _) in iv_names.iteritems():
2283
      self.cfg.SetDiskID(dev, node_uuid)
2284

    
2285
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2286

    
2287
      msg = result.fail_msg
2288
      if msg or not result.payload:
2289
        if not msg:
2290
          msg = "disk not found"
2291
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2292
                                 (name, msg))
2293

    
2294
      if result.payload.is_degraded:
2295
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2296

    
2297
  def _RemoveOldStorage(self, node_uuid, iv_names):
2298
    for name, (_, old_lvs, _) in iv_names.iteritems():
2299
      self.lu.LogInfo("Remove logical volumes for %s", name)
2300

    
2301
      for lv in old_lvs:
2302
        self.cfg.SetDiskID(lv, node_uuid)
2303
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2304
                .fail_msg
2305
        if msg:
2306
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2307
                             hint="remove unused LVs manually")
2308

    
2309
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2310
    """Replace a disk on the primary or secondary for DRBD 8.
2311

2312
    The algorithm for replace is quite complicated:
2313

2314
      1. for each disk to be replaced:
2315

2316
        1. create new LVs on the target node with unique names
2317
        1. detach old LVs from the drbd device
2318
        1. rename old LVs to name_replaced.<time_t>
2319
        1. rename new LVs to old LVs
2320
        1. attach the new LVs (with the old names now) to the drbd device
2321

2322
      1. wait for sync across all devices
2323

2324
      1. for each modified disk:
2325

2326
        1. remove old LVs (which have the name name_replaces.<time_t>)
2327

2328
    Failures are not very well handled.
2329

2330
    """
2331
    steps_total = 6
2332

    
2333
    # Step: check device activation
2334
    self.lu.LogStep(1, steps_total, "Check device existence")
2335
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2336
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2337

    
2338
    # Step: check other node consistency
2339
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2340
    self._CheckDisksConsistency(
2341
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2342
      False)
2343

    
2344
    # Step: create new storage
2345
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2346
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2347

    
2348
    # Step: for each lv, detach+rename*2+attach
2349
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2350
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2351
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2352

    
2353
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2354
                                                     (dev, self.instance),
2355
                                                     (old_lvs, self.instance))
2356
      result.Raise("Can't detach drbd from local storage on node"
2357
                   " %s for device %s" %
2358
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2359
      #dev.children = []
2360
      #cfg.Update(instance)
2361

    
2362
      # ok, we created the new LVs, so now we know we have the needed
2363
      # storage; as such, we proceed on the target node to rename
2364
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2365
      # using the assumption that logical_id == physical_id (which in
2366
      # turn is the unique_id on that node)
2367

    
2368
      # FIXME(iustin): use a better name for the replaced LVs
2369
      temp_suffix = int(time.time())
2370
      ren_fn = lambda d, suff: (d.physical_id[0],
2371
                                d.physical_id[1] + "_replaced-%s" % suff)
2372

    
2373
      # Build the rename list based on what LVs exist on the node
2374
      rename_old_to_new = []
2375
      for to_ren in old_lvs:
2376
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2377
                                             (to_ren, self.instance))
2378
        if not result.fail_msg and result.payload:
2379
          # device exists
2380
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2381

    
2382
      self.lu.LogInfo("Renaming the old LVs on the target node")
2383
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2384
                                             rename_old_to_new)
2385
      result.Raise("Can't rename old LVs on node %s" %
2386
                   self.cfg.GetNodeName(self.target_node_uuid))
2387

    
2388
      # Now we rename the new LVs to the old LVs
2389
      self.lu.LogInfo("Renaming the new LVs on the target node")
2390
      rename_new_to_old = [(new, old.physical_id)
2391
                           for old, new in zip(old_lvs, new_lvs)]
2392
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2393
                                             rename_new_to_old)
2394
      result.Raise("Can't rename new LVs on node %s" %
2395
                   self.cfg.GetNodeName(self.target_node_uuid))
2396

    
2397
      # Intermediate steps of in memory modifications
2398
      for old, new in zip(old_lvs, new_lvs):
2399
        new.logical_id = old.logical_id
2400

    
2401
      # We need to modify old_lvs so that removal later removes the
2402
      # right LVs, not the newly added ones; note that old_lvs is a
2403
      # copy here
2404
      for disk in old_lvs:
2405
        disk.logical_id = ren_fn(disk, temp_suffix)
2406

    
2407
      # Now that the new lvs have the old name, we can add them to the device
2408
      self.lu.LogInfo("Adding new mirror component on %s",
2409
                      self.cfg.GetNodeName(self.target_node_uuid))
2410
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2411
                                                  (dev, self.instance),
2412
                                                  (new_lvs, self.instance))
2413
      msg = result.fail_msg
2414
      if msg:
2415
        for new_lv in new_lvs:
2416
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2417
                                               (new_lv, self.instance)).fail_msg
2418
          if msg2:
2419
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2420
                               hint=("cleanup manually the unused logical"
2421
                                     "volumes"))
2422
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2423

    
2424
    cstep = itertools.count(5)
2425

    
2426
    if self.early_release:
2427
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2428
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2429
      # TODO: Check if releasing locks early still makes sense
2430
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2431
    else:
2432
      # Release all resource locks except those used by the instance
2433
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2434
                   keep=self.node_secondary_ip.keys())
2435

    
2436
    # Release all node locks while waiting for sync
2437
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2438

    
2439
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2440
    # shutdown in the caller into consideration.
2441

    
2442
    # Wait for sync
2443
    # This can fail as the old devices are degraded and _WaitForSync
2444
    # does a combined result over all disks, so we don't check its return value
2445
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2446
    WaitForSync(self.lu, self.instance)
2447

    
2448
    # Check all devices manually
2449
    self._CheckDevices(self.instance.primary_node, iv_names)
2450

    
2451
    # Step: remove old storage
2452
    if not self.early_release:
2453
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2454
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2455

    
2456
  def _ExecDrbd8Secondary(self, feedback_fn):
2457
    """Replace the secondary node for DRBD 8.
2458

2459
    The algorithm for replace is quite complicated:
2460
      - for all disks of the instance:
2461
        - create new LVs on the new node with same names
2462
        - shutdown the drbd device on the old secondary
2463
        - disconnect the drbd network on the primary
2464
        - create the drbd device on the new secondary
2465
        - network attach the drbd on the primary, using an artifice:
2466
          the drbd code for Attach() will connect to the network if it
2467
          finds a device which is connected to the good local disks but
2468
          not network enabled
2469
      - wait for sync across all devices
2470
      - remove all disks from the old secondary
2471

2472
    Failures are not very well handled.
2473

2474
    """
2475
    steps_total = 6
2476

    
2477
    pnode = self.instance.primary_node
2478

    
2479
    # Step: check device activation
2480
    self.lu.LogStep(1, steps_total, "Check device existence")
2481
    self._CheckDisksExistence([self.instance.primary_node])
2482
    self._CheckVolumeGroup([self.instance.primary_node])
2483

    
2484
    # Step: check other node consistency
2485
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2486
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2487

    
2488
    # Step: create new storage
2489
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2490
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2491
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2492
                                                  self.new_node_uuid)
2493
    for idx, dev in enumerate(disks):
2494
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2495
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2496
      # we pass force_create=True to force LVM creation
2497
      for new_lv in dev.children:
2498
        try:
2499
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2500
                               new_lv, True, GetInstanceInfoText(self.instance),
2501
                               False, excl_stor)
2502
        except errors.DeviceCreationError, e:
2503
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2504

    
2505
    # Step 4: dbrd minors and drbd setups changes
2506
    # after this, we must manually remove the drbd minors on both the
2507
    # error and the success paths
2508
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2509
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2510
                                         for _ in self.instance.disks],
2511
                                        self.instance.uuid)
2512
    logging.debug("Allocated minors %r", minors)
2513

    
2514
    iv_names = {}
2515
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2516
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2517
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2518
      # create new devices on new_node; note that we create two IDs:
2519
      # one without port, so the drbd will be activated without
2520
      # networking information on the new node at this stage, and one
2521
      # with network, for the latter activation in step 4
2522
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2523
      if self.instance.primary_node == o_node1:
2524
        p_minor = o_minor1
2525
      else:
2526
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2527
        p_minor = o_minor2
2528

    
2529
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2530
                      p_minor, new_minor, o_secret)
2531
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2532
                    p_minor, new_minor, o_secret)
2533

    
2534
      iv_names[idx] = (dev, dev.children, new_net_id)
2535
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2536
                    new_net_id)
2537
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2538
                              logical_id=new_alone_id,
2539
                              children=dev.children,
2540
                              size=dev.size,
2541
                              params={})
2542
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2543
                                            self.cfg)
2544
      try:
2545
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2546
                             anno_new_drbd,
2547
                             GetInstanceInfoText(self.instance), False,
2548
                             excl_stor)
2549
      except errors.GenericError:
2550
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2551
        raise
2552

    
2553
    # We have new devices, shutdown the drbd on the old secondary
2554
    for idx, dev in enumerate(self.instance.disks):
2555
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2556
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2557
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2558
                                            (dev, self.instance)).fail_msg
2559
      if msg:
2560
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2561
                           "node: %s" % (idx, msg),
2562
                           hint=("Please cleanup this device manually as"
2563
                                 " soon as possible"))
2564

    
2565
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2566
    result = self.rpc.call_drbd_disconnect_net(
2567
               [pnode], (self.instance.disks, self.instance))[pnode]
2568

    
2569
    msg = result.fail_msg
2570
    if msg:
2571
      # detaches didn't succeed (unlikely)
2572
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2573
      raise errors.OpExecError("Can't detach the disks from the network on"
2574
                               " old node: %s" % (msg,))
2575

    
2576
    # if we managed to detach at least one, we update all the disks of
2577
    # the instance to point to the new secondary
2578
    self.lu.LogInfo("Updating instance configuration")
2579
    for dev, _, new_logical_id in iv_names.itervalues():
2580
      dev.logical_id = new_logical_id
2581
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2582

    
2583
    self.cfg.Update(self.instance, feedback_fn)
2584

    
2585
    # Release all node locks (the configuration has been updated)
2586
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2587

    
2588
    # and now perform the drbd attach
2589
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2590
                    " (standalone => connected)")
2591
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2592
                                            self.new_node_uuid],
2593
                                           (self.instance.disks, self.instance),
2594
                                           self.instance.name,
2595
                                           False)
2596
    for to_node, to_result in result.items():
2597
      msg = to_result.fail_msg
2598
      if msg:
2599
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2600
                           self.cfg.GetNodeName(to_node), msg,
2601
                           hint=("please do a gnt-instance info to see the"
2602
                                 " status of disks"))
2603

    
2604
    cstep = itertools.count(5)
2605

    
2606
    if self.early_release:
2607
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2608
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2609
      # TODO: Check if releasing locks early still makes sense
2610
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2611
    else:
2612
      # Release all resource locks except those used by the instance
2613
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2614
                   keep=self.node_secondary_ip.keys())
2615

    
2616
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2617
    # shutdown in the caller into consideration.
2618

    
2619
    # Wait for sync
2620
    # This can fail as the old devices are degraded and _WaitForSync
2621
    # does a combined result over all disks, so we don't check its return value
2622
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2623
    WaitForSync(self.lu, self.instance)
2624

    
2625
    # Check all devices manually
2626
    self._CheckDevices(self.instance.primary_node, iv_names)
2627

    
2628
    # Step: remove old storage
2629
    if not self.early_release:
2630
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2631
      self._RemoveOldStorage(self.target_node_uuid, iv_names)