Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 1f7c8208

History | View | Annotate | Download (99.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
44
  CheckDiskTemplateEnabled
45
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
46
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
47
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48

    
49
import ganeti.masterd.instance
50

    
51

    
52
_DISK_TEMPLATE_NAME_PREFIX = {
53
  constants.DT_PLAIN: "",
54
  constants.DT_RBD: ".rbd",
55
  constants.DT_EXT: ".ext",
56
  }
57

    
58

    
59
_DISK_TEMPLATE_DEVICE_TYPE = {
60
  constants.DT_PLAIN: constants.LD_LV,
61
  constants.DT_FILE: constants.LD_FILE,
62
  constants.DT_SHARED_FILE: constants.LD_FILE,
63
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
64
  constants.DT_RBD: constants.LD_RBD,
65
  constants.DT_EXT: constants.LD_EXT,
66
  }
67

    
68

    
69
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
70
                         excl_stor):
71
  """Create a single block device on a given node.
72

73
  This will not recurse over children of the device, so they must be
74
  created in advance.
75

76
  @param lu: the lu on whose behalf we execute
77
  @param node_uuid: the node on which to create the device
78
  @type instance: L{objects.Instance}
79
  @param instance: the instance which owns the device
80
  @type device: L{objects.Disk}
81
  @param device: the device to create
82
  @param info: the extra 'metadata' we should attach to the device
83
      (this will be represented as a LVM tag)
84
  @type force_open: boolean
85
  @param force_open: this parameter will be passes to the
86
      L{backend.BlockdevCreate} function where it specifies
87
      whether we run on primary or not, and it affects both
88
      the child assembly and the device own Open() execution
89
  @type excl_stor: boolean
90
  @param excl_stor: Whether exclusive_storage is active for the node
91

92
  """
93
  lu.cfg.SetDiskID(device, node_uuid)
94
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
95
                                       instance.name, force_open, info,
96
                                       excl_stor)
97
  result.Raise("Can't create block device %s on"
98
               " node %s for instance %s" % (device,
99
                                             lu.cfg.GetNodeName(node_uuid),
100
                                             instance.name))
101
  if device.physical_id is None:
102
    device.physical_id = result.payload
103

    
104

    
105
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
106
                         info, force_open, excl_stor):
107
  """Create a tree of block devices on a given node.
108

109
  If this device type has to be created on secondaries, create it and
110
  all its children.
111

112
  If not, just recurse to children keeping the same 'force' value.
113

114
  @attention: The device has to be annotated already.
115

116
  @param lu: the lu on whose behalf we execute
117
  @param node_uuid: the node on which to create the device
118
  @type instance: L{objects.Instance}
119
  @param instance: the instance which owns the device
120
  @type device: L{objects.Disk}
121
  @param device: the device to create
122
  @type force_create: boolean
123
  @param force_create: whether to force creation of this device; this
124
      will be change to True whenever we find a device which has
125
      CreateOnSecondary() attribute
126
  @param info: the extra 'metadata' we should attach to the device
127
      (this will be represented as a LVM tag)
128
  @type force_open: boolean
129
  @param force_open: this parameter will be passes to the
130
      L{backend.BlockdevCreate} function where it specifies
131
      whether we run on primary or not, and it affects both
132
      the child assembly and the device own Open() execution
133
  @type excl_stor: boolean
134
  @param excl_stor: Whether exclusive_storage is active for the node
135

136
  @return: list of created devices
137
  """
138
  created_devices = []
139
  try:
140
    if device.CreateOnSecondary():
141
      force_create = True
142

    
143
    if device.children:
144
      for child in device.children:
145
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
146
                                    force_create, info, force_open, excl_stor)
147
        created_devices.extend(devs)
148

    
149
    if not force_create:
150
      return created_devices
151

    
152
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
153
                         excl_stor)
154
    # The device has been completely created, so there is no point in keeping
155
    # its subdevices in the list. We just add the device itself instead.
156
    created_devices = [(node_uuid, device)]
157
    return created_devices
158

    
159
  except errors.DeviceCreationError, e:
160
    e.created_devices.extend(created_devices)
161
    raise e
162
  except errors.OpExecError, e:
163
    raise errors.DeviceCreationError(str(e), created_devices)
164

    
165

    
166
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
167
  """Whether exclusive_storage is in effect for the given node.
168

169
  @type cfg: L{config.ConfigWriter}
170
  @param cfg: The cluster configuration
171
  @type node_uuid: string
172
  @param node_uuid: The node UUID
173
  @rtype: bool
174
  @return: The effective value of exclusive_storage
175
  @raise errors.OpPrereqError: if no node exists with the given name
176

177
  """
178
  ni = cfg.GetNodeInfo(node_uuid)
179
  if ni is None:
180
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
181
                               errors.ECODE_NOENT)
182
  return IsExclusiveStorageEnabledNode(cfg, ni)
183

    
184

    
185
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
186
                    force_open):
187
  """Wrapper around L{_CreateBlockDevInner}.
188

189
  This method annotates the root device first.
190

191
  """
192
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
193
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
194
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
195
                              force_open, excl_stor)
196

    
197

    
198
def _UndoCreateDisks(lu, disks_created):
199
  """Undo the work performed by L{CreateDisks}.
200

201
  This function is called in case of an error to undo the work of
202
  L{CreateDisks}.
203

204
  @type lu: L{LogicalUnit}
205
  @param lu: the logical unit on whose behalf we execute
206
  @param disks_created: the result returned by L{CreateDisks}
207

208
  """
209
  for (node_uuid, disk) in disks_created:
210
    lu.cfg.SetDiskID(disk, node_uuid)
211
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
212
    result.Warn("Failed to remove newly-created disk %s on node %s" %
213
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
214

    
215

    
216
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
217
  """Create all disks for an instance.
218

219
  This abstracts away some work from AddInstance.
220

221
  @type lu: L{LogicalUnit}
222
  @param lu: the logical unit on whose behalf we execute
223
  @type instance: L{objects.Instance}
224
  @param instance: the instance whose disks we should create
225
  @type to_skip: list
226
  @param to_skip: list of indices to skip
227
  @type target_node_uuid: string
228
  @param target_node_uuid: if passed, overrides the target node for creation
229
  @type disks: list of {objects.Disk}
230
  @param disks: the disks to create; if not specified, all the disks of the
231
      instance are created
232
  @return: information about the created disks, to be used to call
233
      L{_UndoCreateDisks}
234
  @raise errors.OpPrereqError: in case of error
235

236
  """
237
  info = GetInstanceInfoText(instance)
238
  if target_node_uuid is None:
239
    pnode_uuid = instance.primary_node
240
    all_node_uuids = instance.all_nodes
241
  else:
242
    pnode_uuid = target_node_uuid
243
    all_node_uuids = [pnode_uuid]
244

    
245
  if disks is None:
246
    disks = instance.disks
247

    
248
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
249

    
250
  if instance.disk_template in constants.DTS_FILEBASED:
251
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
252
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
253

    
254
    result.Raise("Failed to create directory '%s' on"
255
                 " node %s" % (file_storage_dir,
256
                               lu.cfg.GetNodeName(pnode_uuid)))
257

    
258
  disks_created = []
259
  for idx, device in enumerate(disks):
260
    if to_skip and idx in to_skip:
261
      continue
262
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
263
    for node_uuid in all_node_uuids:
264
      f_create = node_uuid == pnode_uuid
265
      try:
266
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
267
                        f_create)
268
        disks_created.append((node_uuid, device))
269
      except errors.DeviceCreationError, e:
270
        logging.warning("Creating disk %s for instance '%s' failed",
271
                        idx, instance.name)
272
        disks_created.extend(e.created_devices)
273
        _UndoCreateDisks(lu, disks_created)
274
        raise errors.OpExecError(e.message)
275
  return disks_created
276

    
277

    
278
def ComputeDiskSizePerVG(disk_template, disks):
279
  """Compute disk size requirements in the volume group
280

281
  """
282
  def _compute(disks, payload):
283
    """Universal algorithm.
284

285
    """
286
    vgs = {}
287
    for disk in disks:
288
      vgs[disk[constants.IDISK_VG]] = \
289
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
290

    
291
    return vgs
292

    
293
  # Required free disk space as a function of disk and swap space
294
  req_size_dict = {
295
    constants.DT_DISKLESS: {},
296
    constants.DT_PLAIN: _compute(disks, 0),
297
    # 128 MB are added for drbd metadata for each disk
298
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
299
    constants.DT_FILE: {},
300
    constants.DT_SHARED_FILE: {},
301
    }
302

    
303
  if disk_template not in req_size_dict:
304
    raise errors.ProgrammerError("Disk template '%s' size requirement"
305
                                 " is unknown" % disk_template)
306

    
307
  return req_size_dict[disk_template]
308

    
309

    
310
def ComputeDisks(op, default_vg):
311
  """Computes the instance disks.
312

313
  @param op: The instance opcode
314
  @param default_vg: The default_vg to assume
315

316
  @return: The computed disks
317

318
  """
319
  disks = []
320
  for disk in op.disks:
321
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
322
    if mode not in constants.DISK_ACCESS_SET:
323
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
324
                                 mode, errors.ECODE_INVAL)
325
    size = disk.get(constants.IDISK_SIZE, None)
326
    if size is None:
327
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
328
    try:
329
      size = int(size)
330
    except (TypeError, ValueError):
331
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
332
                                 errors.ECODE_INVAL)
333

    
334
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
335
    if ext_provider and op.disk_template != constants.DT_EXT:
336
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
337
                                 " disk template, not %s" %
338
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
339
                                  op.disk_template), errors.ECODE_INVAL)
340

    
341
    data_vg = disk.get(constants.IDISK_VG, default_vg)
342
    name = disk.get(constants.IDISK_NAME, None)
343
    if name is not None and name.lower() == constants.VALUE_NONE:
344
      name = None
345
    new_disk = {
346
      constants.IDISK_SIZE: size,
347
      constants.IDISK_MODE: mode,
348
      constants.IDISK_VG: data_vg,
349
      constants.IDISK_NAME: name,
350
      }
351

    
352
    for key in [
353
      constants.IDISK_METAVG,
354
      constants.IDISK_ADOPT,
355
      constants.IDISK_SPINDLES,
356
      ]:
357
      if key in disk:
358
        new_disk[key] = disk[key]
359

    
360
    # For extstorage, demand the `provider' option and add any
361
    # additional parameters (ext-params) to the dict
362
    if op.disk_template == constants.DT_EXT:
363
      if ext_provider:
364
        new_disk[constants.IDISK_PROVIDER] = ext_provider
365
        for key in disk:
366
          if key not in constants.IDISK_PARAMS:
367
            new_disk[key] = disk[key]
368
      else:
369
        raise errors.OpPrereqError("Missing provider for template '%s'" %
370
                                   constants.DT_EXT, errors.ECODE_INVAL)
371

    
372
    disks.append(new_disk)
373

    
374
  return disks
375

    
376

    
377
def CheckRADOSFreeSpace():
378
  """Compute disk size requirements inside the RADOS cluster.
379

380
  """
381
  # For the RADOS cluster we assume there is always enough space.
382
  pass
383

    
384

    
385
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
386
                         iv_name, p_minor, s_minor):
387
  """Generate a drbd8 device complete with its children.
388

389
  """
390
  assert len(vgnames) == len(names) == 2
391
  port = lu.cfg.AllocatePort()
392
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
393

    
394
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
395
                          logical_id=(vgnames[0], names[0]),
396
                          params={})
397
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
398
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
399
                          size=constants.DRBD_META_SIZE,
400
                          logical_id=(vgnames[1], names[1]),
401
                          params={})
402
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
403
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
404
                          logical_id=(primary_uuid, secondary_uuid, port,
405
                                      p_minor, s_minor,
406
                                      shared_secret),
407
                          children=[dev_data, dev_meta],
408
                          iv_name=iv_name, params={})
409
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
410
  return drbd_dev
411

    
412

    
413
def GenerateDiskTemplate(
414
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
415
  disk_info, file_storage_dir, file_driver, base_index,
416
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
417
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
418
  """Generate the entire disk layout for a given template type.
419

420
  """
421
  vgname = lu.cfg.GetVGName()
422
  disk_count = len(disk_info)
423
  disks = []
424

    
425
  if template_name == constants.DT_DISKLESS:
426
    pass
427
  elif template_name == constants.DT_DRBD8:
428
    if len(secondary_node_uuids) != 1:
429
      raise errors.ProgrammerError("Wrong template configuration")
430
    remote_node_uuid = secondary_node_uuids[0]
431
    minors = lu.cfg.AllocateDRBDMinor(
432
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
433

    
434
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
435
                                                       full_disk_params)
436
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
437

    
438
    names = []
439
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
440
                                               for i in range(disk_count)]):
441
      names.append(lv_prefix + "_data")
442
      names.append(lv_prefix + "_meta")
443
    for idx, disk in enumerate(disk_info):
444
      disk_index = idx + base_index
445
      data_vg = disk.get(constants.IDISK_VG, vgname)
446
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
447
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
448
                                      disk[constants.IDISK_SIZE],
449
                                      [data_vg, meta_vg],
450
                                      names[idx * 2:idx * 2 + 2],
451
                                      "disk/%d" % disk_index,
452
                                      minors[idx * 2], minors[idx * 2 + 1])
453
      disk_dev.mode = disk[constants.IDISK_MODE]
454
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
455
      disks.append(disk_dev)
456
  else:
457
    if secondary_node_uuids:
458
      raise errors.ProgrammerError("Wrong template configuration")
459

    
460
    if template_name == constants.DT_FILE:
461
      _req_file_storage()
462
    elif template_name == constants.DT_SHARED_FILE:
463
      _req_shr_file_storage()
464

    
465
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
466
    if name_prefix is None:
467
      names = None
468
    else:
469
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
470
                                        (name_prefix, base_index + i)
471
                                        for i in range(disk_count)])
472

    
473
    if template_name == constants.DT_PLAIN:
474

    
475
      def logical_id_fn(idx, _, disk):
476
        vg = disk.get(constants.IDISK_VG, vgname)
477
        return (vg, names[idx])
478

    
479
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
480
      logical_id_fn = \
481
        lambda _, disk_index, disk: (file_driver,
482
                                     "%s/disk%d" % (file_storage_dir,
483
                                                    disk_index))
484
    elif template_name == constants.DT_BLOCK:
485
      logical_id_fn = \
486
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
487
                                       disk[constants.IDISK_ADOPT])
488
    elif template_name == constants.DT_RBD:
489
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
490
    elif template_name == constants.DT_EXT:
491
      def logical_id_fn(idx, _, disk):
492
        provider = disk.get(constants.IDISK_PROVIDER, None)
493
        if provider is None:
494
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
495
                                       " not found", constants.DT_EXT,
496
                                       constants.IDISK_PROVIDER)
497
        return (provider, names[idx])
498
    else:
499
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
500

    
501
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
502

    
503
    for idx, disk in enumerate(disk_info):
504
      params = {}
505
      # Only for the Ext template add disk_info to params
506
      if template_name == constants.DT_EXT:
507
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
508
        for key in disk:
509
          if key not in constants.IDISK_PARAMS:
510
            params[key] = disk[key]
511
      disk_index = idx + base_index
512
      size = disk[constants.IDISK_SIZE]
513
      feedback_fn("* disk %s, size %s" %
514
                  (disk_index, utils.FormatUnit(size, "h")))
515
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
516
                              logical_id=logical_id_fn(idx, disk_index, disk),
517
                              iv_name="disk/%d" % disk_index,
518
                              mode=disk[constants.IDISK_MODE],
519
                              params=params,
520
                              spindles=disk.get(constants.IDISK_SPINDLES))
521
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
522
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
523
      disks.append(disk_dev)
524

    
525
  return disks
526

    
527

    
528
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
529
  """Check the presence of the spindle options with exclusive_storage.
530

531
  @type diskdict: dict
532
  @param diskdict: disk parameters
533
  @type es_flag: bool
534
  @param es_flag: the effective value of the exlusive_storage flag
535
  @type required: bool
536
  @param required: whether spindles are required or just optional
537
  @raise errors.OpPrereqError when spindles are given and they should not
538

539
  """
540
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
541
      diskdict[constants.IDISK_SPINDLES] is not None):
542
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
543
                               " when exclusive storage is not active",
544
                               errors.ECODE_INVAL)
545
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
546
                                diskdict[constants.IDISK_SPINDLES] is None)):
547
    raise errors.OpPrereqError("You must specify spindles in instance disks"
548
                               " when exclusive storage is active",
549
                               errors.ECODE_INVAL)
550

    
551

    
552
class LUInstanceRecreateDisks(LogicalUnit):
553
  """Recreate an instance's missing disks.
554

555
  """
556
  HPATH = "instance-recreate-disks"
557
  HTYPE = constants.HTYPE_INSTANCE
558
  REQ_BGL = False
559

    
560
  _MODIFYABLE = compat.UniqueFrozenset([
561
    constants.IDISK_SIZE,
562
    constants.IDISK_MODE,
563
    constants.IDISK_SPINDLES,
564
    ])
565

    
566
  # New or changed disk parameters may have different semantics
567
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
568
    constants.IDISK_ADOPT,
569

    
570
    # TODO: Implement support changing VG while recreating
571
    constants.IDISK_VG,
572
    constants.IDISK_METAVG,
573
    constants.IDISK_PROVIDER,
574
    constants.IDISK_NAME,
575
    ]))
576

    
577
  def _RunAllocator(self):
578
    """Run the allocator based on input opcode.
579

580
    """
581
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
582

    
583
    # FIXME
584
    # The allocator should actually run in "relocate" mode, but current
585
    # allocators don't support relocating all the nodes of an instance at
586
    # the same time. As a workaround we use "allocate" mode, but this is
587
    # suboptimal for two reasons:
588
    # - The instance name passed to the allocator is present in the list of
589
    #   existing instances, so there could be a conflict within the
590
    #   internal structures of the allocator. This doesn't happen with the
591
    #   current allocators, but it's a liability.
592
    # - The allocator counts the resources used by the instance twice: once
593
    #   because the instance exists already, and once because it tries to
594
    #   allocate a new instance.
595
    # The allocator could choose some of the nodes on which the instance is
596
    # running, but that's not a problem. If the instance nodes are broken,
597
    # they should be already be marked as drained or offline, and hence
598
    # skipped by the allocator. If instance disks have been lost for other
599
    # reasons, then recreating the disks on the same nodes should be fine.
600
    disk_template = self.instance.disk_template
601
    spindle_use = be_full[constants.BE_SPINDLE_USE]
602
    disks = [{
603
      constants.IDISK_SIZE: d.size,
604
      constants.IDISK_MODE: d.mode,
605
      constants.IDISK_SPINDLES: d.spindles,
606
      } for d in self.instance.disks]
607
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
608
                                        disk_template=disk_template,
609
                                        tags=list(self.instance.GetTags()),
610
                                        os=self.instance.os,
611
                                        nics=[{}],
612
                                        vcpus=be_full[constants.BE_VCPUS],
613
                                        memory=be_full[constants.BE_MAXMEM],
614
                                        spindle_use=spindle_use,
615
                                        disks=disks,
616
                                        hypervisor=self.instance.hypervisor,
617
                                        node_whitelist=None)
618
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
619

    
620
    ial.Run(self.op.iallocator)
621

    
622
    assert req.RequiredNodes() == len(self.instance.all_nodes)
623

    
624
    if not ial.success:
625
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
626
                                 " %s" % (self.op.iallocator, ial.info),
627
                                 errors.ECODE_NORES)
628

    
629
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
630
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
631
                 self.op.instance_name, self.op.iallocator,
632
                 utils.CommaJoin(self.op.nodes))
633

    
634
  def CheckArguments(self):
635
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
636
      # Normalize and convert deprecated list of disk indices
637
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
638

    
639
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
640
    if duplicates:
641
      raise errors.OpPrereqError("Some disks have been specified more than"
642
                                 " once: %s" % utils.CommaJoin(duplicates),
643
                                 errors.ECODE_INVAL)
644

    
645
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
646
    # when neither iallocator nor nodes are specified
647
    if self.op.iallocator or self.op.nodes:
648
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
649

    
650
    for (idx, params) in self.op.disks:
651
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
652
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
653
      if unsupported:
654
        raise errors.OpPrereqError("Parameters for disk %s try to change"
655
                                   " unmodifyable parameter(s): %s" %
656
                                   (idx, utils.CommaJoin(unsupported)),
657
                                   errors.ECODE_INVAL)
658

    
659
  def ExpandNames(self):
660
    self._ExpandAndLockInstance()
661
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
662

    
663
    if self.op.nodes:
664
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
665
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
666
    else:
667
      self.needed_locks[locking.LEVEL_NODE] = []
668
      if self.op.iallocator:
669
        # iallocator will select a new node in the same group
670
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
671
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
672

    
673
    self.needed_locks[locking.LEVEL_NODE_RES] = []
674

    
675
  def DeclareLocks(self, level):
676
    if level == locking.LEVEL_NODEGROUP:
677
      assert self.op.iallocator is not None
678
      assert not self.op.nodes
679
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
680
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
681
      # Lock the primary group used by the instance optimistically; this
682
      # requires going via the node before it's locked, requiring
683
      # verification later on
684
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
685
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
686

    
687
    elif level == locking.LEVEL_NODE:
688
      # If an allocator is used, then we lock all the nodes in the current
689
      # instance group, as we don't know yet which ones will be selected;
690
      # if we replace the nodes without using an allocator, locks are
691
      # already declared in ExpandNames; otherwise, we need to lock all the
692
      # instance nodes for disk re-creation
693
      if self.op.iallocator:
694
        assert not self.op.nodes
695
        assert not self.needed_locks[locking.LEVEL_NODE]
696
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
697

    
698
        # Lock member nodes of the group of the primary node
699
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
700
          self.needed_locks[locking.LEVEL_NODE].extend(
701
            self.cfg.GetNodeGroup(group_uuid).members)
702

    
703
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
704
      elif not self.op.nodes:
705
        self._LockInstancesNodes(primary_only=False)
706
    elif level == locking.LEVEL_NODE_RES:
707
      # Copy node locks
708
      self.needed_locks[locking.LEVEL_NODE_RES] = \
709
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
710

    
711
  def BuildHooksEnv(self):
712
    """Build hooks env.
713

714
    This runs on master, primary and secondary nodes of the instance.
715

716
    """
717
    return BuildInstanceHookEnvByObject(self, self.instance)
718

    
719
  def BuildHooksNodes(self):
720
    """Build hooks nodes.
721

722
    """
723
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
724
    return (nl, nl)
725

    
726
  def CheckPrereq(self):
727
    """Check prerequisites.
728

729
    This checks that the instance is in the cluster and is not running.
730

731
    """
732
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
733
    assert instance is not None, \
734
      "Cannot retrieve locked instance %s" % self.op.instance_name
735
    if self.op.node_uuids:
736
      if len(self.op.node_uuids) != len(instance.all_nodes):
737
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
738
                                   " %d replacement nodes were specified" %
739
                                   (instance.name, len(instance.all_nodes),
740
                                    len(self.op.node_uuids)),
741
                                   errors.ECODE_INVAL)
742
      assert instance.disk_template != constants.DT_DRBD8 or \
743
             len(self.op.node_uuids) == 2
744
      assert instance.disk_template != constants.DT_PLAIN or \
745
             len(self.op.node_uuids) == 1
746
      primary_node = self.op.node_uuids[0]
747
    else:
748
      primary_node = instance.primary_node
749
    if not self.op.iallocator:
750
      CheckNodeOnline(self, primary_node)
751

    
752
    if instance.disk_template == constants.DT_DISKLESS:
753
      raise errors.OpPrereqError("Instance '%s' has no disks" %
754
                                 self.op.instance_name, errors.ECODE_INVAL)
755

    
756
    # Verify if node group locks are still correct
757
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
758
    if owned_groups:
759
      # Node group locks are acquired only for the primary node (and only
760
      # when the allocator is used)
761
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
762
                              primary_only=True)
763

    
764
    # if we replace nodes *and* the old primary is offline, we don't
765
    # check the instance state
766
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
767
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
768
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
769
                         msg="cannot recreate disks")
770

    
771
    if self.op.disks:
772
      self.disks = dict(self.op.disks)
773
    else:
774
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
775

    
776
    maxidx = max(self.disks.keys())
777
    if maxidx >= len(instance.disks):
778
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
779
                                 errors.ECODE_INVAL)
780

    
781
    if ((self.op.node_uuids or self.op.iallocator) and
782
         sorted(self.disks.keys()) != range(len(instance.disks))):
783
      raise errors.OpPrereqError("Can't recreate disks partially and"
784
                                 " change the nodes at the same time",
785
                                 errors.ECODE_INVAL)
786

    
787
    self.instance = instance
788

    
789
    if self.op.iallocator:
790
      self._RunAllocator()
791
      # Release unneeded node and node resource locks
792
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
793
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
794
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
795

    
796
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
797

    
798
    if self.op.node_uuids:
799
      node_uuids = self.op.node_uuids
800
    else:
801
      node_uuids = instance.all_nodes
802
    excl_stor = compat.any(
803
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
804
      )
805
    for new_params in self.disks.values():
806
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
807

    
808
  def Exec(self, feedback_fn):
809
    """Recreate the disks.
810

811
    """
812
    assert (self.owned_locks(locking.LEVEL_NODE) ==
813
            self.owned_locks(locking.LEVEL_NODE_RES))
814

    
815
    to_skip = []
816
    mods = [] # keeps track of needed changes
817

    
818
    for idx, disk in enumerate(self.instance.disks):
819
      try:
820
        changes = self.disks[idx]
821
      except KeyError:
822
        # Disk should not be recreated
823
        to_skip.append(idx)
824
        continue
825

    
826
      # update secondaries for disks, if needed
827
      if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
828
        # need to update the nodes and minors
829
        assert len(self.op.node_uuids) == 2
830
        assert len(disk.logical_id) == 6 # otherwise disk internals
831
                                         # have changed
832
        (_, _, old_port, _, _, old_secret) = disk.logical_id
833
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
834
                                                self.instance.uuid)
835
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
836
                  new_minors[0], new_minors[1], old_secret)
837
        assert len(disk.logical_id) == len(new_id)
838
      else:
839
        new_id = None
840

    
841
      mods.append((idx, new_id, changes))
842

    
843
    # now that we have passed all asserts above, we can apply the mods
844
    # in a single run (to avoid partial changes)
845
    for idx, new_id, changes in mods:
846
      disk = self.instance.disks[idx]
847
      if new_id is not None:
848
        assert disk.dev_type == constants.LD_DRBD8
849
        disk.logical_id = new_id
850
      if changes:
851
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
852
                    mode=changes.get(constants.IDISK_MODE, None),
853
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
854

    
855
    # change primary node, if needed
856
    if self.op.node_uuids:
857
      self.instance.primary_node = self.op.node_uuids[0]
858
      self.LogWarning("Changing the instance's nodes, you will have to"
859
                      " remove any disks left on the older nodes manually")
860

    
861
    if self.op.node_uuids:
862
      self.cfg.Update(self.instance, feedback_fn)
863

    
864
    # All touched nodes must be locked
865
    mylocks = self.owned_locks(locking.LEVEL_NODE)
866
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
867
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
868

    
869
    # TODO: Release node locks before wiping, or explain why it's not possible
870
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
871
      wipedisks = [(idx, disk, 0)
872
                   for (idx, disk) in enumerate(self.instance.disks)
873
                   if idx not in to_skip]
874
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
875
                         cleanup=new_disks)
876

    
877

    
878
def _PerformNodeInfoCall(lu, node_uuids, vg):
879
  """Prepares the input and performs a node info call.
880

881
  @type lu: C{LogicalUnit}
882
  @param lu: a logical unit from which we get configuration data
883
  @type node_uuids: list of string
884
  @param node_uuids: list of node UUIDs to perform the call for
885
  @type vg: string
886
  @param vg: the volume group's name
887

888
  """
889
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
890
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
891
                                                  node_uuids)
892
  hvname = lu.cfg.GetHypervisorType()
893
  hvparams = lu.cfg.GetClusterInfo().hvparams
894
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
895
                                   [(hvname, hvparams[hvname])])
896
  return nodeinfo
897

    
898

    
899
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
900
  """Checks the vg capacity for a given node.
901

902
  @type node_info: tuple (_, list of dicts, _)
903
  @param node_info: the result of the node info call for one node
904
  @type node_name: string
905
  @param node_name: the name of the node
906
  @type vg: string
907
  @param vg: volume group name
908
  @type requested: int
909
  @param requested: the amount of disk in MiB to check for
910
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
911
      or we cannot check the node
912

913
  """
914
  (_, space_info, _) = node_info
915
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
916
      space_info, constants.ST_LVM_VG)
917
  if not lvm_vg_info:
918
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
919
  vg_free = lvm_vg_info.get("storage_free", None)
920
  if not isinstance(vg_free, int):
921
    raise errors.OpPrereqError("Can't compute free disk space on node"
922
                               " %s for vg %s, result was '%s'" %
923
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
924
  if requested > vg_free:
925
    raise errors.OpPrereqError("Not enough disk space on target node %s"
926
                               " vg %s: required %d MiB, available %d MiB" %
927
                               (node_name, vg, requested, vg_free),
928
                               errors.ECODE_NORES)
929

    
930

    
931
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
932
  """Checks if nodes have enough free disk space in the specified VG.
933

934
  This function checks if all given nodes have the needed amount of
935
  free disk. In case any node has less disk or we cannot get the
936
  information from the node, this function raises an OpPrereqError
937
  exception.
938

939
  @type lu: C{LogicalUnit}
940
  @param lu: a logical unit from which we get configuration data
941
  @type node_uuids: C{list}
942
  @param node_uuids: the list of node UUIDs to check
943
  @type vg: C{str}
944
  @param vg: the volume group to check
945
  @type requested: C{int}
946
  @param requested: the amount of disk in MiB to check for
947
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
948
      or we cannot check the node
949

950
  """
951
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
952
  for node in node_uuids:
953
    node_name = lu.cfg.GetNodeName(node)
954
    info = nodeinfo[node]
955
    info.Raise("Cannot get current information from node %s" % node_name,
956
               prereq=True, ecode=errors.ECODE_ENVIRON)
957
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
958

    
959

    
960
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
961
  """Checks if nodes have enough free disk space in all the VGs.
962

963
  This function checks if all given nodes have the needed amount of
964
  free disk. In case any node has less disk or we cannot get the
965
  information from the node, this function raises an OpPrereqError
966
  exception.
967

968
  @type lu: C{LogicalUnit}
969
  @param lu: a logical unit from which we get configuration data
970
  @type node_uuids: C{list}
971
  @param node_uuids: the list of node UUIDs to check
972
  @type req_sizes: C{dict}
973
  @param req_sizes: the hash of vg and corresponding amount of disk in
974
      MiB to check for
975
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
976
      or we cannot check the node
977

978
  """
979
  for vg, req_size in req_sizes.items():
980
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
981

    
982

    
983
def _DiskSizeInBytesToMebibytes(lu, size):
984
  """Converts a disk size in bytes to mebibytes.
985

986
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
987

988
  """
989
  (mib, remainder) = divmod(size, 1024 * 1024)
990

    
991
  if remainder != 0:
992
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
993
                  " to not overwrite existing data (%s bytes will not be"
994
                  " wiped)", (1024 * 1024) - remainder)
995
    mib += 1
996

    
997
  return mib
998

    
999

    
1000
def _CalcEta(time_taken, written, total_size):
1001
  """Calculates the ETA based on size written and total size.
1002

1003
  @param time_taken: The time taken so far
1004
  @param written: amount written so far
1005
  @param total_size: The total size of data to be written
1006
  @return: The remaining time in seconds
1007

1008
  """
1009
  avg_time = time_taken / float(written)
1010
  return (total_size - written) * avg_time
1011

    
1012

    
1013
def WipeDisks(lu, instance, disks=None):
1014
  """Wipes instance disks.
1015

1016
  @type lu: L{LogicalUnit}
1017
  @param lu: the logical unit on whose behalf we execute
1018
  @type instance: L{objects.Instance}
1019
  @param instance: the instance whose disks we should create
1020
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1021
  @param disks: Disk details; tuple contains disk index, disk object and the
1022
    start offset
1023

1024
  """
1025
  node_uuid = instance.primary_node
1026
  node_name = lu.cfg.GetNodeName(node_uuid)
1027

    
1028
  if disks is None:
1029
    disks = [(idx, disk, 0)
1030
             for (idx, disk) in enumerate(instance.disks)]
1031

    
1032
  for (_, device, _) in disks:
1033
    lu.cfg.SetDiskID(device, node_uuid)
1034

    
1035
  logging.info("Pausing synchronization of disks of instance '%s'",
1036
               instance.name)
1037
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1038
                                                  (map(compat.snd, disks),
1039
                                                   instance),
1040
                                                  True)
1041
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1042

    
1043
  for idx, success in enumerate(result.payload):
1044
    if not success:
1045
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1046
                   " failed", idx, instance.name)
1047

    
1048
  try:
1049
    for (idx, device, offset) in disks:
1050
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1051
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1052
      wipe_chunk_size = \
1053
        int(min(constants.MAX_WIPE_CHUNK,
1054
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1055

    
1056
      size = device.size
1057
      last_output = 0
1058
      start_time = time.time()
1059

    
1060
      if offset == 0:
1061
        info_text = ""
1062
      else:
1063
        info_text = (" (from %s to %s)" %
1064
                     (utils.FormatUnit(offset, "h"),
1065
                      utils.FormatUnit(size, "h")))
1066

    
1067
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1068

    
1069
      logging.info("Wiping disk %d for instance %s on node %s using"
1070
                   " chunk size %s", idx, instance.name, node_name,
1071
                   wipe_chunk_size)
1072

    
1073
      while offset < size:
1074
        wipe_size = min(wipe_chunk_size, size - offset)
1075

    
1076
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1077
                      idx, offset, wipe_size)
1078

    
1079
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1080
                                           offset, wipe_size)
1081
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1082
                     (idx, offset, wipe_size))
1083

    
1084
        now = time.time()
1085
        offset += wipe_size
1086
        if now - last_output >= 60:
1087
          eta = _CalcEta(now - start_time, offset, size)
1088
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1089
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1090
          last_output = now
1091
  finally:
1092
    logging.info("Resuming synchronization of disks for instance '%s'",
1093
                 instance.name)
1094

    
1095
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1096
                                                    (map(compat.snd, disks),
1097
                                                     instance),
1098
                                                    False)
1099

    
1100
    if result.fail_msg:
1101
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1102
                    node_name, result.fail_msg)
1103
    else:
1104
      for idx, success in enumerate(result.payload):
1105
        if not success:
1106
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1107
                        " failed", idx, instance.name)
1108

    
1109

    
1110
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1111
  """Wrapper for L{WipeDisks} that handles errors.
1112

1113
  @type lu: L{LogicalUnit}
1114
  @param lu: the logical unit on whose behalf we execute
1115
  @type instance: L{objects.Instance}
1116
  @param instance: the instance whose disks we should wipe
1117
  @param disks: see L{WipeDisks}
1118
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1119
      case of error
1120
  @raise errors.OpPrereqError: in case of failure
1121

1122
  """
1123
  try:
1124
    WipeDisks(lu, instance, disks=disks)
1125
  except errors.OpExecError:
1126
    logging.warning("Wiping disks for instance '%s' failed",
1127
                    instance.name)
1128
    _UndoCreateDisks(lu, cleanup)
1129
    raise
1130

    
1131

    
1132
def ExpandCheckDisks(instance, disks):
1133
  """Return the instance disks selected by the disks list
1134

1135
  @type disks: list of L{objects.Disk} or None
1136
  @param disks: selected disks
1137
  @rtype: list of L{objects.Disk}
1138
  @return: selected instance disks to act on
1139

1140
  """
1141
  if disks is None:
1142
    return instance.disks
1143
  else:
1144
    if not set(disks).issubset(instance.disks):
1145
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1146
                                   " target instance: expected a subset of %r,"
1147
                                   " got %r" % (instance.disks, disks))
1148
    return disks
1149

    
1150

    
1151
def WaitForSync(lu, instance, disks=None, oneshot=False):
1152
  """Sleep and poll for an instance's disk to sync.
1153

1154
  """
1155
  if not instance.disks or disks is not None and not disks:
1156
    return True
1157

    
1158
  disks = ExpandCheckDisks(instance, disks)
1159

    
1160
  if not oneshot:
1161
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1162

    
1163
  node_uuid = instance.primary_node
1164
  node_name = lu.cfg.GetNodeName(node_uuid)
1165

    
1166
  for dev in disks:
1167
    lu.cfg.SetDiskID(dev, node_uuid)
1168

    
1169
  # TODO: Convert to utils.Retry
1170

    
1171
  retries = 0
1172
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1173
  while True:
1174
    max_time = 0
1175
    done = True
1176
    cumul_degraded = False
1177
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1178
    msg = rstats.fail_msg
1179
    if msg:
1180
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1181
      retries += 1
1182
      if retries >= 10:
1183
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1184
                                 " aborting." % node_name)
1185
      time.sleep(6)
1186
      continue
1187
    rstats = rstats.payload
1188
    retries = 0
1189
    for i, mstat in enumerate(rstats):
1190
      if mstat is None:
1191
        lu.LogWarning("Can't compute data for node %s/%s",
1192
                      node_name, disks[i].iv_name)
1193
        continue
1194

    
1195
      cumul_degraded = (cumul_degraded or
1196
                        (mstat.is_degraded and mstat.sync_percent is None))
1197
      if mstat.sync_percent is not None:
1198
        done = False
1199
        if mstat.estimated_time is not None:
1200
          rem_time = ("%s remaining (estimated)" %
1201
                      utils.FormatSeconds(mstat.estimated_time))
1202
          max_time = mstat.estimated_time
1203
        else:
1204
          rem_time = "no time estimate"
1205
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1206
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1207

    
1208
    # if we're done but degraded, let's do a few small retries, to
1209
    # make sure we see a stable and not transient situation; therefore
1210
    # we force restart of the loop
1211
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1212
      logging.info("Degraded disks found, %d retries left", degr_retries)
1213
      degr_retries -= 1
1214
      time.sleep(1)
1215
      continue
1216

    
1217
    if done or oneshot:
1218
      break
1219

    
1220
    time.sleep(min(60, max_time))
1221

    
1222
  if done:
1223
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1224

    
1225
  return not cumul_degraded
1226

    
1227

    
1228
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1229
  """Shutdown block devices of an instance.
1230

1231
  This does the shutdown on all nodes of the instance.
1232

1233
  If the ignore_primary is false, errors on the primary node are
1234
  ignored.
1235

1236
  """
1237
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1238
  all_result = True
1239
  disks = ExpandCheckDisks(instance, disks)
1240

    
1241
  for disk in disks:
1242
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1243
      lu.cfg.SetDiskID(top_disk, node_uuid)
1244
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1245
      msg = result.fail_msg
1246
      if msg:
1247
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1248
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1249
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1250
            (node_uuid != instance.primary_node and not result.offline)):
1251
          all_result = False
1252
  return all_result
1253

    
1254

    
1255
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1256
  """Shutdown block devices of an instance.
1257

1258
  This function checks if an instance is running, before calling
1259
  _ShutdownInstanceDisks.
1260

1261
  """
1262
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1263
  ShutdownInstanceDisks(lu, instance, disks=disks)
1264

    
1265

    
1266
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1267
                           ignore_size=False):
1268
  """Prepare the block devices for an instance.
1269

1270
  This sets up the block devices on all nodes.
1271

1272
  @type lu: L{LogicalUnit}
1273
  @param lu: the logical unit on whose behalf we execute
1274
  @type instance: L{objects.Instance}
1275
  @param instance: the instance for whose disks we assemble
1276
  @type disks: list of L{objects.Disk} or None
1277
  @param disks: which disks to assemble (or all, if None)
1278
  @type ignore_secondaries: boolean
1279
  @param ignore_secondaries: if true, errors on secondary nodes
1280
      won't result in an error return from the function
1281
  @type ignore_size: boolean
1282
  @param ignore_size: if true, the current known size of the disk
1283
      will not be used during the disk activation, useful for cases
1284
      when the size is wrong
1285
  @return: False if the operation failed, otherwise a list of
1286
      (host, instance_visible_name, node_visible_name)
1287
      with the mapping from node devices to instance devices
1288

1289
  """
1290
  device_info = []
1291
  disks_ok = True
1292
  disks = ExpandCheckDisks(instance, disks)
1293

    
1294
  # With the two passes mechanism we try to reduce the window of
1295
  # opportunity for the race condition of switching DRBD to primary
1296
  # before handshaking occured, but we do not eliminate it
1297

    
1298
  # The proper fix would be to wait (with some limits) until the
1299
  # connection has been made and drbd transitions from WFConnection
1300
  # into any other network-connected state (Connected, SyncTarget,
1301
  # SyncSource, etc.)
1302

    
1303
  # mark instance disks as active before doing actual work, so watcher does
1304
  # not try to shut them down erroneously
1305
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1306

    
1307
  # 1st pass, assemble on all nodes in secondary mode
1308
  for idx, inst_disk in enumerate(disks):
1309
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1310
                                  instance.primary_node):
1311
      if ignore_size:
1312
        node_disk = node_disk.Copy()
1313
        node_disk.UnsetSize()
1314
      lu.cfg.SetDiskID(node_disk, node_uuid)
1315
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1316
                                             instance.name, False, idx)
1317
      msg = result.fail_msg
1318
      if msg:
1319
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1320
                                result.offline)
1321
        lu.LogWarning("Could not prepare block device %s on node %s"
1322
                      " (is_primary=False, pass=1): %s",
1323
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1324
        if not (ignore_secondaries or is_offline_secondary):
1325
          disks_ok = False
1326

    
1327
  # FIXME: race condition on drbd migration to primary
1328

    
1329
  # 2nd pass, do only the primary node
1330
  for idx, inst_disk in enumerate(disks):
1331
    dev_path = None
1332

    
1333
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1334
                                  instance.primary_node):
1335
      if node_uuid != instance.primary_node:
1336
        continue
1337
      if ignore_size:
1338
        node_disk = node_disk.Copy()
1339
        node_disk.UnsetSize()
1340
      lu.cfg.SetDiskID(node_disk, node_uuid)
1341
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1342
                                             instance.name, True, idx)
1343
      msg = result.fail_msg
1344
      if msg:
1345
        lu.LogWarning("Could not prepare block device %s on node %s"
1346
                      " (is_primary=True, pass=2): %s",
1347
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1348
        disks_ok = False
1349
      else:
1350
        dev_path = result.payload
1351

    
1352
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1353
                        inst_disk.iv_name, dev_path))
1354

    
1355
  # leave the disks configured for the primary node
1356
  # this is a workaround that would be fixed better by
1357
  # improving the logical/physical id handling
1358
  for disk in disks:
1359
    lu.cfg.SetDiskID(disk, instance.primary_node)
1360

    
1361
  if not disks_ok:
1362
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1363

    
1364
  return disks_ok, device_info
1365

    
1366

    
1367
def StartInstanceDisks(lu, instance, force):
1368
  """Start the disks of an instance.
1369

1370
  """
1371
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1372
                                      ignore_secondaries=force)
1373
  if not disks_ok:
1374
    ShutdownInstanceDisks(lu, instance)
1375
    if force is not None and not force:
1376
      lu.LogWarning("",
1377
                    hint=("If the message above refers to a secondary node,"
1378
                          " you can retry the operation using '--force'"))
1379
    raise errors.OpExecError("Disk consistency error")
1380

    
1381

    
1382
class LUInstanceGrowDisk(LogicalUnit):
1383
  """Grow a disk of an instance.
1384

1385
  """
1386
  HPATH = "disk-grow"
1387
  HTYPE = constants.HTYPE_INSTANCE
1388
  REQ_BGL = False
1389

    
1390
  def ExpandNames(self):
1391
    self._ExpandAndLockInstance()
1392
    self.needed_locks[locking.LEVEL_NODE] = []
1393
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1394
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1395
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1396

    
1397
  def DeclareLocks(self, level):
1398
    if level == locking.LEVEL_NODE:
1399
      self._LockInstancesNodes()
1400
    elif level == locking.LEVEL_NODE_RES:
1401
      # Copy node locks
1402
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1403
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1404

    
1405
  def BuildHooksEnv(self):
1406
    """Build hooks env.
1407

1408
    This runs on the master, the primary and all the secondaries.
1409

1410
    """
1411
    env = {
1412
      "DISK": self.op.disk,
1413
      "AMOUNT": self.op.amount,
1414
      "ABSOLUTE": self.op.absolute,
1415
      }
1416
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1417
    return env
1418

    
1419
  def BuildHooksNodes(self):
1420
    """Build hooks nodes.
1421

1422
    """
1423
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1424
    return (nl, nl)
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks that the instance is in the cluster.
1430

1431
    """
1432
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1433
    assert self.instance is not None, \
1434
      "Cannot retrieve locked instance %s" % self.op.instance_name
1435
    node_uuids = list(self.instance.all_nodes)
1436
    for node_uuid in node_uuids:
1437
      CheckNodeOnline(self, node_uuid)
1438
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1439

    
1440
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1441
      raise errors.OpPrereqError("Instance's disk layout does not support"
1442
                                 " growing", errors.ECODE_INVAL)
1443

    
1444
    self.disk = self.instance.FindDisk(self.op.disk)
1445

    
1446
    if self.op.absolute:
1447
      self.target = self.op.amount
1448
      self.delta = self.target - self.disk.size
1449
      if self.delta < 0:
1450
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1451
                                   "current disk size (%s)" %
1452
                                   (utils.FormatUnit(self.target, "h"),
1453
                                    utils.FormatUnit(self.disk.size, "h")),
1454
                                   errors.ECODE_STATE)
1455
    else:
1456
      self.delta = self.op.amount
1457
      self.target = self.disk.size + self.delta
1458
      if self.delta < 0:
1459
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1460
                                   utils.FormatUnit(self.delta, "h"),
1461
                                   errors.ECODE_INVAL)
1462

    
1463
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1464

    
1465
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1466
    template = self.instance.disk_template
1467
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1468
        not any(self.node_es_flags.values())):
1469
      # TODO: check the free disk space for file, when that feature will be
1470
      # supported
1471
      # With exclusive storage we need to do something smarter than just looking
1472
      # at free space, which, in the end, is basically a dry run. So we rely on
1473
      # the dry run performed in Exec() instead.
1474
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1475

    
1476
  def Exec(self, feedback_fn):
1477
    """Execute disk grow.
1478

1479
    """
1480
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1481
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1482
            self.owned_locks(locking.LEVEL_NODE_RES))
1483

    
1484
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1485

    
1486
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1487
    if not disks_ok:
1488
      raise errors.OpExecError("Cannot activate block device to grow")
1489

    
1490
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1491
                (self.op.disk, self.instance.name,
1492
                 utils.FormatUnit(self.delta, "h"),
1493
                 utils.FormatUnit(self.target, "h")))
1494

    
1495
    # First run all grow ops in dry-run mode
1496
    for node_uuid in self.instance.all_nodes:
1497
      self.cfg.SetDiskID(self.disk, node_uuid)
1498
      result = self.rpc.call_blockdev_grow(node_uuid,
1499
                                           (self.disk, self.instance),
1500
                                           self.delta, True, True,
1501
                                           self.node_es_flags[node_uuid])
1502
      result.Raise("Dry-run grow request failed to node %s" %
1503
                   self.cfg.GetNodeName(node_uuid))
1504

    
1505
    if wipe_disks:
1506
      # Get disk size from primary node for wiping
1507
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1508
      result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1509
                                                    [self.disk])
1510
      result.Raise("Failed to retrieve disk size from node '%s'" %
1511
                   self.instance.primary_node)
1512

    
1513
      (disk_dimensions, ) = result.payload
1514

    
1515
      if disk_dimensions is None:
1516
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1517
                                 " node '%s'" % self.instance.primary_node)
1518
      (disk_size_in_bytes, _) = disk_dimensions
1519

    
1520
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1521

    
1522
      assert old_disk_size >= self.disk.size, \
1523
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1524
         (old_disk_size, self.disk.size))
1525
    else:
1526
      old_disk_size = None
1527

    
1528
    # We know that (as far as we can test) operations across different
1529
    # nodes will succeed, time to run it for real on the backing storage
1530
    for node_uuid in self.instance.all_nodes:
1531
      self.cfg.SetDiskID(self.disk, node_uuid)
1532
      result = self.rpc.call_blockdev_grow(node_uuid,
1533
                                           (self.disk, self.instance),
1534
                                           self.delta, False, True,
1535
                                           self.node_es_flags[node_uuid])
1536
      result.Raise("Grow request failed to node %s" %
1537
                   self.cfg.GetNodeName(node_uuid))
1538

    
1539
    # And now execute it for logical storage, on the primary node
1540
    node_uuid = self.instance.primary_node
1541
    self.cfg.SetDiskID(self.disk, node_uuid)
1542
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1543
                                         self.delta, False, False,
1544
                                         self.node_es_flags[node_uuid])
1545
    result.Raise("Grow request failed to node %s" %
1546
                 self.cfg.GetNodeName(node_uuid))
1547

    
1548
    self.disk.RecordGrow(self.delta)
1549
    self.cfg.Update(self.instance, feedback_fn)
1550

    
1551
    # Changes have been recorded, release node lock
1552
    ReleaseLocks(self, locking.LEVEL_NODE)
1553

    
1554
    # Downgrade lock while waiting for sync
1555
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1556

    
1557
    assert wipe_disks ^ (old_disk_size is None)
1558

    
1559
    if wipe_disks:
1560
      assert self.instance.disks[self.op.disk] == self.disk
1561

    
1562
      # Wipe newly added disk space
1563
      WipeDisks(self, self.instance,
1564
                disks=[(self.op.disk, self.disk, old_disk_size)])
1565

    
1566
    if self.op.wait_for_sync:
1567
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1568
      if disk_abort:
1569
        self.LogWarning("Disk syncing has not returned a good status; check"
1570
                        " the instance")
1571
      if not self.instance.disks_active:
1572
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1573
    elif not self.instance.disks_active:
1574
      self.LogWarning("Not shutting down the disk even if the instance is"
1575
                      " not supposed to be running because no wait for"
1576
                      " sync mode was requested")
1577

    
1578
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1579
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1580

    
1581

    
1582
class LUInstanceReplaceDisks(LogicalUnit):
1583
  """Replace the disks of an instance.
1584

1585
  """
1586
  HPATH = "mirrors-replace"
1587
  HTYPE = constants.HTYPE_INSTANCE
1588
  REQ_BGL = False
1589

    
1590
  def CheckArguments(self):
1591
    """Check arguments.
1592

1593
    """
1594
    if self.op.mode == constants.REPLACE_DISK_CHG:
1595
      if self.op.remote_node is None and self.op.iallocator is None:
1596
        raise errors.OpPrereqError("When changing the secondary either an"
1597
                                   " iallocator script must be used or the"
1598
                                   " new node given", errors.ECODE_INVAL)
1599
      else:
1600
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1601

    
1602
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1603
      # Not replacing the secondary
1604
      raise errors.OpPrereqError("The iallocator and new node options can"
1605
                                 " only be used when changing the"
1606
                                 " secondary node", errors.ECODE_INVAL)
1607

    
1608
  def ExpandNames(self):
1609
    self._ExpandAndLockInstance()
1610

    
1611
    assert locking.LEVEL_NODE not in self.needed_locks
1612
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1613
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1614

    
1615
    assert self.op.iallocator is None or self.op.remote_node is None, \
1616
      "Conflicting options"
1617

    
1618
    if self.op.remote_node is not None:
1619
      (self.op.remote_node_uuid, self.op.remote_node) = \
1620
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1621
                              self.op.remote_node)
1622

    
1623
      # Warning: do not remove the locking of the new secondary here
1624
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1625
      # currently it doesn't since parallel invocations of
1626
      # FindUnusedMinor will conflict
1627
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1628
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1629
    else:
1630
      self.needed_locks[locking.LEVEL_NODE] = []
1631
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1632

    
1633
      if self.op.iallocator is not None:
1634
        # iallocator will select a new node in the same group
1635
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1636
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1637

    
1638
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1639

    
1640
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1641
                                   self.op.instance_name, self.op.mode,
1642
                                   self.op.iallocator, self.op.remote_node_uuid,
1643
                                   self.op.disks, self.op.early_release,
1644
                                   self.op.ignore_ipolicy)
1645

    
1646
    self.tasklets = [self.replacer]
1647

    
1648
  def DeclareLocks(self, level):
1649
    if level == locking.LEVEL_NODEGROUP:
1650
      assert self.op.remote_node_uuid is None
1651
      assert self.op.iallocator is not None
1652
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1653

    
1654
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1655
      # Lock all groups used by instance optimistically; this requires going
1656
      # via the node before it's locked, requiring verification later on
1657
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1658
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1659

    
1660
    elif level == locking.LEVEL_NODE:
1661
      if self.op.iallocator is not None:
1662
        assert self.op.remote_node_uuid is None
1663
        assert not self.needed_locks[locking.LEVEL_NODE]
1664
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1665

    
1666
        # Lock member nodes of all locked groups
1667
        self.needed_locks[locking.LEVEL_NODE] = \
1668
          [node_uuid
1669
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1670
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1671
      else:
1672
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1673

    
1674
        self._LockInstancesNodes()
1675

    
1676
    elif level == locking.LEVEL_NODE_RES:
1677
      # Reuse node locks
1678
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1679
        self.needed_locks[locking.LEVEL_NODE]
1680

    
1681
  def BuildHooksEnv(self):
1682
    """Build hooks env.
1683

1684
    This runs on the master, the primary and all the secondaries.
1685

1686
    """
1687
    instance = self.replacer.instance
1688
    env = {
1689
      "MODE": self.op.mode,
1690
      "NEW_SECONDARY": self.op.remote_node,
1691
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1692
      }
1693
    env.update(BuildInstanceHookEnvByObject(self, instance))
1694
    return env
1695

    
1696
  def BuildHooksNodes(self):
1697
    """Build hooks nodes.
1698

1699
    """
1700
    instance = self.replacer.instance
1701
    nl = [
1702
      self.cfg.GetMasterNode(),
1703
      instance.primary_node,
1704
      ]
1705
    if self.op.remote_node_uuid is not None:
1706
      nl.append(self.op.remote_node_uuid)
1707
    return nl, nl
1708

    
1709
  def CheckPrereq(self):
1710
    """Check prerequisites.
1711

1712
    """
1713
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1714
            self.op.iallocator is None)
1715

    
1716
    # Verify if node group locks are still correct
1717
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1718
    if owned_groups:
1719
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1720

    
1721
    return LogicalUnit.CheckPrereq(self)
1722

    
1723

    
1724
class LUInstanceActivateDisks(NoHooksLU):
1725
  """Bring up an instance's disks.
1726

1727
  """
1728
  REQ_BGL = False
1729

    
1730
  def ExpandNames(self):
1731
    self._ExpandAndLockInstance()
1732
    self.needed_locks[locking.LEVEL_NODE] = []
1733
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1734

    
1735
  def DeclareLocks(self, level):
1736
    if level == locking.LEVEL_NODE:
1737
      self._LockInstancesNodes()
1738

    
1739
  def CheckPrereq(self):
1740
    """Check prerequisites.
1741

1742
    This checks that the instance is in the cluster.
1743

1744
    """
1745
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1746
    assert self.instance is not None, \
1747
      "Cannot retrieve locked instance %s" % self.op.instance_name
1748
    CheckNodeOnline(self, self.instance.primary_node)
1749

    
1750
  def Exec(self, feedback_fn):
1751
    """Activate the disks.
1752

1753
    """
1754
    disks_ok, disks_info = \
1755
              AssembleInstanceDisks(self, self.instance,
1756
                                    ignore_size=self.op.ignore_size)
1757
    if not disks_ok:
1758
      raise errors.OpExecError("Cannot activate block devices")
1759

    
1760
    if self.op.wait_for_sync:
1761
      if not WaitForSync(self, self.instance):
1762
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1763
        raise errors.OpExecError("Some disks of the instance are degraded!")
1764

    
1765
    return disks_info
1766

    
1767

    
1768
class LUInstanceDeactivateDisks(NoHooksLU):
1769
  """Shutdown an instance's disks.
1770

1771
  """
1772
  REQ_BGL = False
1773

    
1774
  def ExpandNames(self):
1775
    self._ExpandAndLockInstance()
1776
    self.needed_locks[locking.LEVEL_NODE] = []
1777
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1778

    
1779
  def DeclareLocks(self, level):
1780
    if level == locking.LEVEL_NODE:
1781
      self._LockInstancesNodes()
1782

    
1783
  def CheckPrereq(self):
1784
    """Check prerequisites.
1785

1786
    This checks that the instance is in the cluster.
1787

1788
    """
1789
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1790
    assert self.instance is not None, \
1791
      "Cannot retrieve locked instance %s" % self.op.instance_name
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Deactivate the disks
1795

1796
    """
1797
    if self.op.force:
1798
      ShutdownInstanceDisks(self, self.instance)
1799
    else:
1800
      _SafeShutdownInstanceDisks(self, self.instance)
1801

    
1802

    
1803
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1804
                               ldisk=False):
1805
  """Check that mirrors are not degraded.
1806

1807
  @attention: The device has to be annotated already.
1808

1809
  The ldisk parameter, if True, will change the test from the
1810
  is_degraded attribute (which represents overall non-ok status for
1811
  the device(s)) to the ldisk (representing the local storage status).
1812

1813
  """
1814
  lu.cfg.SetDiskID(dev, node_uuid)
1815

    
1816
  result = True
1817

    
1818
  if on_primary or dev.AssembleOnSecondary():
1819
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1820
    msg = rstats.fail_msg
1821
    if msg:
1822
      lu.LogWarning("Can't find disk on node %s: %s",
1823
                    lu.cfg.GetNodeName(node_uuid), msg)
1824
      result = False
1825
    elif not rstats.payload:
1826
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1827
      result = False
1828
    else:
1829
      if ldisk:
1830
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1831
      else:
1832
        result = result and not rstats.payload.is_degraded
1833

    
1834
  if dev.children:
1835
    for child in dev.children:
1836
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1837
                                                     node_uuid, on_primary)
1838

    
1839
  return result
1840

    
1841

    
1842
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1843
  """Wrapper around L{_CheckDiskConsistencyInner}.
1844

1845
  """
1846
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1847
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1848
                                    ldisk=ldisk)
1849

    
1850

    
1851
def _BlockdevFind(lu, node_uuid, dev, instance):
1852
  """Wrapper around call_blockdev_find to annotate diskparams.
1853

1854
  @param lu: A reference to the lu object
1855
  @param node_uuid: The node to call out
1856
  @param dev: The device to find
1857
  @param instance: The instance object the device belongs to
1858
  @returns The result of the rpc call
1859

1860
  """
1861
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1862
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1863

    
1864

    
1865
def _GenerateUniqueNames(lu, exts):
1866
  """Generate a suitable LV name.
1867

1868
  This will generate a logical volume name for the given instance.
1869

1870
  """
1871
  results = []
1872
  for val in exts:
1873
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1874
    results.append("%s%s" % (new_id, val))
1875
  return results
1876

    
1877

    
1878
class TLReplaceDisks(Tasklet):
1879
  """Replaces disks for an instance.
1880

1881
  Note: Locking is not within the scope of this class.
1882

1883
  """
1884
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1885
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1886
    """Initializes this class.
1887

1888
    """
1889
    Tasklet.__init__(self, lu)
1890

    
1891
    # Parameters
1892
    self.instance_uuid = instance_uuid
1893
    self.instance_name = instance_name
1894
    self.mode = mode
1895
    self.iallocator_name = iallocator_name
1896
    self.remote_node_uuid = remote_node_uuid
1897
    self.disks = disks
1898
    self.early_release = early_release
1899
    self.ignore_ipolicy = ignore_ipolicy
1900

    
1901
    # Runtime data
1902
    self.instance = None
1903
    self.new_node_uuid = None
1904
    self.target_node_uuid = None
1905
    self.other_node_uuid = None
1906
    self.remote_node_info = None
1907
    self.node_secondary_ip = None
1908

    
1909
  @staticmethod
1910
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1911
                    relocate_from_node_uuids):
1912
    """Compute a new secondary node using an IAllocator.
1913

1914
    """
1915
    req = iallocator.IAReqRelocate(
1916
          inst_uuid=instance_uuid,
1917
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1918
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1919

    
1920
    ial.Run(iallocator_name)
1921

    
1922
    if not ial.success:
1923
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1924
                                 " %s" % (iallocator_name, ial.info),
1925
                                 errors.ECODE_NORES)
1926

    
1927
    remote_node_name = ial.result[0]
1928
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1929

    
1930
    if remote_node is None:
1931
      raise errors.OpPrereqError("Node %s not found in configuration" %
1932
                                 remote_node_name, errors.ECODE_NOENT)
1933

    
1934
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1935
               instance_uuid, remote_node_name)
1936

    
1937
    return remote_node.uuid
1938

    
1939
  def _FindFaultyDisks(self, node_uuid):
1940
    """Wrapper for L{FindFaultyInstanceDisks}.
1941

1942
    """
1943
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1944
                                   node_uuid, True)
1945

    
1946
  def _CheckDisksActivated(self, instance):
1947
    """Checks if the instance disks are activated.
1948

1949
    @param instance: The instance to check disks
1950
    @return: True if they are activated, False otherwise
1951

1952
    """
1953
    node_uuids = instance.all_nodes
1954

    
1955
    for idx, dev in enumerate(instance.disks):
1956
      for node_uuid in node_uuids:
1957
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1958
                        self.cfg.GetNodeName(node_uuid))
1959
        self.cfg.SetDiskID(dev, node_uuid)
1960

    
1961
        result = _BlockdevFind(self, node_uuid, dev, instance)
1962

    
1963
        if result.offline:
1964
          continue
1965
        elif result.fail_msg or not result.payload:
1966
          return False
1967

    
1968
    return True
1969

    
1970
  def CheckPrereq(self):
1971
    """Check prerequisites.
1972

1973
    This checks that the instance is in the cluster.
1974

1975
    """
1976
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1977
    assert self.instance is not None, \
1978
      "Cannot retrieve locked instance %s" % self.instance_name
1979

    
1980
    if self.instance.disk_template != constants.DT_DRBD8:
1981
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1982
                                 " instances", errors.ECODE_INVAL)
1983

    
1984
    if len(self.instance.secondary_nodes) != 1:
1985
      raise errors.OpPrereqError("The instance has a strange layout,"
1986
                                 " expected one secondary but found %d" %
1987
                                 len(self.instance.secondary_nodes),
1988
                                 errors.ECODE_FAULT)
1989

    
1990
    secondary_node_uuid = self.instance.secondary_nodes[0]
1991

    
1992
    if self.iallocator_name is None:
1993
      remote_node_uuid = self.remote_node_uuid
1994
    else:
1995
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1996
                                            self.instance.uuid,
1997
                                            self.instance.secondary_nodes)
1998

    
1999
    if remote_node_uuid is None:
2000
      self.remote_node_info = None
2001
    else:
2002
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
2003
             "Remote node '%s' is not locked" % remote_node_uuid
2004

    
2005
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2006
      assert self.remote_node_info is not None, \
2007
        "Cannot retrieve locked node %s" % remote_node_uuid
2008

    
2009
    if remote_node_uuid == self.instance.primary_node:
2010
      raise errors.OpPrereqError("The specified node is the primary node of"
2011
                                 " the instance", errors.ECODE_INVAL)
2012

    
2013
    if remote_node_uuid == secondary_node_uuid:
2014
      raise errors.OpPrereqError("The specified node is already the"
2015
                                 " secondary node of the instance",
2016
                                 errors.ECODE_INVAL)
2017

    
2018
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2019
                                    constants.REPLACE_DISK_CHG):
2020
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2021
                                 errors.ECODE_INVAL)
2022

    
2023
    if self.mode == constants.REPLACE_DISK_AUTO:
2024
      if not self._CheckDisksActivated(self.instance):
2025
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2026
                                   " first" % self.instance_name,
2027
                                   errors.ECODE_STATE)
2028
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2029
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2030

    
2031
      if faulty_primary and faulty_secondary:
2032
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2033
                                   " one node and can not be repaired"
2034
                                   " automatically" % self.instance_name,
2035
                                   errors.ECODE_STATE)
2036

    
2037
      if faulty_primary:
2038
        self.disks = faulty_primary
2039
        self.target_node_uuid = self.instance.primary_node
2040
        self.other_node_uuid = secondary_node_uuid
2041
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2042
      elif faulty_secondary:
2043
        self.disks = faulty_secondary
2044
        self.target_node_uuid = secondary_node_uuid
2045
        self.other_node_uuid = self.instance.primary_node
2046
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2047
      else:
2048
        self.disks = []
2049
        check_nodes = []
2050

    
2051
    else:
2052
      # Non-automatic modes
2053
      if self.mode == constants.REPLACE_DISK_PRI:
2054
        self.target_node_uuid = self.instance.primary_node
2055
        self.other_node_uuid = secondary_node_uuid
2056
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2057

    
2058
      elif self.mode == constants.REPLACE_DISK_SEC:
2059
        self.target_node_uuid = secondary_node_uuid
2060
        self.other_node_uuid = self.instance.primary_node
2061
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2062

    
2063
      elif self.mode == constants.REPLACE_DISK_CHG:
2064
        self.new_node_uuid = remote_node_uuid
2065
        self.other_node_uuid = self.instance.primary_node
2066
        self.target_node_uuid = secondary_node_uuid
2067
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2068

    
2069
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2070
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2071

    
2072
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2073
        assert old_node_info is not None
2074
        if old_node_info.offline and not self.early_release:
2075
          # doesn't make sense to delay the release
2076
          self.early_release = True
2077
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2078
                          " early-release mode", secondary_node_uuid)
2079

    
2080
      else:
2081
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2082
                                     self.mode)
2083

    
2084
      # If not specified all disks should be replaced
2085
      if not self.disks:
2086
        self.disks = range(len(self.instance.disks))
2087

    
2088
    # TODO: This is ugly, but right now we can't distinguish between internal
2089
    # submitted opcode and external one. We should fix that.
2090
    if self.remote_node_info:
2091
      # We change the node, lets verify it still meets instance policy
2092
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2093
      cluster = self.cfg.GetClusterInfo()
2094
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2095
                                                              new_group_info)
2096
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2097
                             self.remote_node_info, self.cfg,
2098
                             ignore=self.ignore_ipolicy)
2099

    
2100
    for node_uuid in check_nodes:
2101
      CheckNodeOnline(self.lu, node_uuid)
2102

    
2103
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2104
                                                          self.other_node_uuid,
2105
                                                          self.target_node_uuid]
2106
                              if node_uuid is not None)
2107

    
2108
    # Release unneeded node and node resource locks
2109
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2110
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2111
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2112

    
2113
    # Release any owned node group
2114
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2115

    
2116
    # Check whether disks are valid
2117
    for disk_idx in self.disks:
2118
      self.instance.FindDisk(disk_idx)
2119

    
2120
    # Get secondary node IP addresses
2121
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2122
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2123

    
2124
  def Exec(self, feedback_fn):
2125
    """Execute disk replacement.
2126

2127
    This dispatches the disk replacement to the appropriate handler.
2128

2129
    """
2130
    if __debug__:
2131
      # Verify owned locks before starting operation
2132
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2133
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2134
          ("Incorrect node locks, owning %s, expected %s" %
2135
           (owned_nodes, self.node_secondary_ip.keys()))
2136
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2137
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2138
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2139

    
2140
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2141
      assert list(owned_instances) == [self.instance_name], \
2142
          "Instance '%s' not locked" % self.instance_name
2143

    
2144
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2145
          "Should not own any node group lock at this point"
2146

    
2147
    if not self.disks:
2148
      feedback_fn("No disks need replacement for instance '%s'" %
2149
                  self.instance.name)
2150
      return
2151

    
2152
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2153
                (utils.CommaJoin(self.disks), self.instance.name))
2154
    feedback_fn("Current primary node: %s" %
2155
                self.cfg.GetNodeName(self.instance.primary_node))
2156
    feedback_fn("Current seconary node: %s" %
2157
                utils.CommaJoin(self.cfg.GetNodeNames(
2158
                                  self.instance.secondary_nodes)))
2159

    
2160
    activate_disks = not self.instance.disks_active
2161

    
2162
    # Activate the instance disks if we're replacing them on a down instance
2163
    if activate_disks:
2164
      StartInstanceDisks(self.lu, self.instance, True)
2165

    
2166
    try:
2167
      # Should we replace the secondary node?
2168
      if self.new_node_uuid is not None:
2169
        fn = self._ExecDrbd8Secondary
2170
      else:
2171
        fn = self._ExecDrbd8DiskOnly
2172

    
2173
      result = fn(feedback_fn)
2174
    finally:
2175
      # Deactivate the instance disks if we're replacing them on a
2176
      # down instance
2177
      if activate_disks:
2178
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2179

    
2180
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2181

    
2182
    if __debug__:
2183
      # Verify owned locks
2184
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2185
      nodes = frozenset(self.node_secondary_ip)
2186
      assert ((self.early_release and not owned_nodes) or
2187
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2188
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2189
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2190

    
2191
    return result
2192

    
2193
  def _CheckVolumeGroup(self, node_uuids):
2194
    self.lu.LogInfo("Checking volume groups")
2195

    
2196
    vgname = self.cfg.GetVGName()
2197

    
2198
    # Make sure volume group exists on all involved nodes
2199
    results = self.rpc.call_vg_list(node_uuids)
2200
    if not results:
2201
      raise errors.OpExecError("Can't list volume groups on the nodes")
2202

    
2203
    for node_uuid in node_uuids:
2204
      res = results[node_uuid]
2205
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2206
      if vgname not in res.payload:
2207
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2208
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2209

    
2210
  def _CheckDisksExistence(self, node_uuids):
2211
    # Check disk existence
2212
    for idx, dev in enumerate(self.instance.disks):
2213
      if idx not in self.disks:
2214
        continue
2215

    
2216
      for node_uuid in node_uuids:
2217
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2218
                        self.cfg.GetNodeName(node_uuid))
2219
        self.cfg.SetDiskID(dev, node_uuid)
2220

    
2221
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2222

    
2223
        msg = result.fail_msg
2224
        if msg or not result.payload:
2225
          if not msg:
2226
            msg = "disk not found"
2227
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2228
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2229

    
2230
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2231
    for idx, dev in enumerate(self.instance.disks):
2232
      if idx not in self.disks:
2233
        continue
2234

    
2235
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2236
                      (idx, self.cfg.GetNodeName(node_uuid)))
2237

    
2238
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2239
                                  on_primary, ldisk=ldisk):
2240
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2241
                                 " replace disks for instance %s" %
2242
                                 (self.cfg.GetNodeName(node_uuid),
2243
                                  self.instance.name))
2244

    
2245
  def _CreateNewStorage(self, node_uuid):
2246
    """Create new storage on the primary or secondary node.
2247

2248
    This is only used for same-node replaces, not for changing the
2249
    secondary node, hence we don't want to modify the existing disk.
2250

2251
    """
2252
    iv_names = {}
2253

    
2254
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2255
    for idx, dev in enumerate(disks):
2256
      if idx not in self.disks:
2257
        continue
2258

    
2259
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2260
                      self.cfg.GetNodeName(node_uuid), idx)
2261

    
2262
      self.cfg.SetDiskID(dev, node_uuid)
2263

    
2264
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2265
      names = _GenerateUniqueNames(self.lu, lv_names)
2266

    
2267
      (data_disk, meta_disk) = dev.children
2268
      vg_data = data_disk.logical_id[0]
2269
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2270
                             logical_id=(vg_data, names[0]),
2271
                             params=data_disk.params)
2272
      vg_meta = meta_disk.logical_id[0]
2273
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2274
                             size=constants.DRBD_META_SIZE,
2275
                             logical_id=(vg_meta, names[1]),
2276
                             params=meta_disk.params)
2277

    
2278
      new_lvs = [lv_data, lv_meta]
2279
      old_lvs = [child.Copy() for child in dev.children]
2280
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2281
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2282

    
2283
      # we pass force_create=True to force the LVM creation
2284
      for new_lv in new_lvs:
2285
        try:
2286
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2287
                               GetInstanceInfoText(self.instance), False,
2288
                               excl_stor)
2289
        except errors.DeviceCreationError, e:
2290
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2291

    
2292
    return iv_names
2293

    
2294
  def _CheckDevices(self, node_uuid, iv_names):
2295
    for name, (dev, _, _) in iv_names.iteritems():
2296
      self.cfg.SetDiskID(dev, node_uuid)
2297

    
2298
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2299

    
2300
      msg = result.fail_msg
2301
      if msg or not result.payload:
2302
        if not msg:
2303
          msg = "disk not found"
2304
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2305
                                 (name, msg))
2306

    
2307
      if result.payload.is_degraded:
2308
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2309

    
2310
  def _RemoveOldStorage(self, node_uuid, iv_names):
2311
    for name, (_, old_lvs, _) in iv_names.iteritems():
2312
      self.lu.LogInfo("Remove logical volumes for %s", name)
2313

    
2314
      for lv in old_lvs:
2315
        self.cfg.SetDiskID(lv, node_uuid)
2316

    
2317
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2318
        if msg:
2319
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2320
                             hint="remove unused LVs manually")
2321

    
2322
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2323
    """Replace a disk on the primary or secondary for DRBD 8.
2324

2325
    The algorithm for replace is quite complicated:
2326

2327
      1. for each disk to be replaced:
2328

2329
        1. create new LVs on the target node with unique names
2330
        1. detach old LVs from the drbd device
2331
        1. rename old LVs to name_replaced.<time_t>
2332
        1. rename new LVs to old LVs
2333
        1. attach the new LVs (with the old names now) to the drbd device
2334

2335
      1. wait for sync across all devices
2336

2337
      1. for each modified disk:
2338

2339
        1. remove old LVs (which have the name name_replaces.<time_t>)
2340

2341
    Failures are not very well handled.
2342

2343
    """
2344
    steps_total = 6
2345

    
2346
    # Step: check device activation
2347
    self.lu.LogStep(1, steps_total, "Check device existence")
2348
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2349
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2350

    
2351
    # Step: check other node consistency
2352
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2353
    self._CheckDisksConsistency(
2354
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2355
      False)
2356

    
2357
    # Step: create new storage
2358
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2359
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2360

    
2361
    # Step: for each lv, detach+rename*2+attach
2362
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2363
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2364
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2365

    
2366
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2367
                                                     old_lvs)
2368
      result.Raise("Can't detach drbd from local storage on node"
2369
                   " %s for device %s" %
2370
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2371
      #dev.children = []
2372
      #cfg.Update(instance)
2373

    
2374
      # ok, we created the new LVs, so now we know we have the needed
2375
      # storage; as such, we proceed on the target node to rename
2376
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2377
      # using the assumption that logical_id == physical_id (which in
2378
      # turn is the unique_id on that node)
2379

    
2380
      # FIXME(iustin): use a better name for the replaced LVs
2381
      temp_suffix = int(time.time())
2382
      ren_fn = lambda d, suff: (d.physical_id[0],
2383
                                d.physical_id[1] + "_replaced-%s" % suff)
2384

    
2385
      # Build the rename list based on what LVs exist on the node
2386
      rename_old_to_new = []
2387
      for to_ren in old_lvs:
2388
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2389
        if not result.fail_msg and result.payload:
2390
          # device exists
2391
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2392

    
2393
      self.lu.LogInfo("Renaming the old LVs on the target node")
2394
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2395
                                             rename_old_to_new)
2396
      result.Raise("Can't rename old LVs on node %s" %
2397
                   self.cfg.GetNodeName(self.target_node_uuid))
2398

    
2399
      # Now we rename the new LVs to the old LVs
2400
      self.lu.LogInfo("Renaming the new LVs on the target node")
2401
      rename_new_to_old = [(new, old.physical_id)
2402
                           for old, new in zip(old_lvs, new_lvs)]
2403
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2404
                                             rename_new_to_old)
2405
      result.Raise("Can't rename new LVs on node %s" %
2406
                   self.cfg.GetNodeName(self.target_node_uuid))
2407

    
2408
      # Intermediate steps of in memory modifications
2409
      for old, new in zip(old_lvs, new_lvs):
2410
        new.logical_id = old.logical_id
2411
        self.cfg.SetDiskID(new, self.target_node_uuid)
2412

    
2413
      # We need to modify old_lvs so that removal later removes the
2414
      # right LVs, not the newly added ones; note that old_lvs is a
2415
      # copy here
2416
      for disk in old_lvs:
2417
        disk.logical_id = ren_fn(disk, temp_suffix)
2418
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2419

    
2420
      # Now that the new lvs have the old name, we can add them to the device
2421
      self.lu.LogInfo("Adding new mirror component on %s",
2422
                      self.cfg.GetNodeName(self.target_node_uuid))
2423
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2424
                                                  (dev, self.instance), new_lvs)
2425
      msg = result.fail_msg
2426
      if msg:
2427
        for new_lv in new_lvs:
2428
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2429
                                               new_lv).fail_msg
2430
          if msg2:
2431
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2432
                               hint=("cleanup manually the unused logical"
2433
                                     "volumes"))
2434
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2435

    
2436
    cstep = itertools.count(5)
2437

    
2438
    if self.early_release:
2439
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2440
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2441
      # TODO: Check if releasing locks early still makes sense
2442
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2443
    else:
2444
      # Release all resource locks except those used by the instance
2445
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2446
                   keep=self.node_secondary_ip.keys())
2447

    
2448
    # Release all node locks while waiting for sync
2449
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2450

    
2451
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2452
    # shutdown in the caller into consideration.
2453

    
2454
    # Wait for sync
2455
    # This can fail as the old devices are degraded and _WaitForSync
2456
    # does a combined result over all disks, so we don't check its return value
2457
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2458
    WaitForSync(self.lu, self.instance)
2459

    
2460
    # Check all devices manually
2461
    self._CheckDevices(self.instance.primary_node, iv_names)
2462

    
2463
    # Step: remove old storage
2464
    if not self.early_release:
2465
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2466
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2467

    
2468
  def _ExecDrbd8Secondary(self, feedback_fn):
2469
    """Replace the secondary node for DRBD 8.
2470

2471
    The algorithm for replace is quite complicated:
2472
      - for all disks of the instance:
2473
        - create new LVs on the new node with same names
2474
        - shutdown the drbd device on the old secondary
2475
        - disconnect the drbd network on the primary
2476
        - create the drbd device on the new secondary
2477
        - network attach the drbd on the primary, using an artifice:
2478
          the drbd code for Attach() will connect to the network if it
2479
          finds a device which is connected to the good local disks but
2480
          not network enabled
2481
      - wait for sync across all devices
2482
      - remove all disks from the old secondary
2483

2484
    Failures are not very well handled.
2485

2486
    """
2487
    steps_total = 6
2488

    
2489
    pnode = self.instance.primary_node
2490

    
2491
    # Step: check device activation
2492
    self.lu.LogStep(1, steps_total, "Check device existence")
2493
    self._CheckDisksExistence([self.instance.primary_node])
2494
    self._CheckVolumeGroup([self.instance.primary_node])
2495

    
2496
    # Step: check other node consistency
2497
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2498
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2499

    
2500
    # Step: create new storage
2501
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2502
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2503
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2504
                                                  self.new_node_uuid)
2505
    for idx, dev in enumerate(disks):
2506
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2507
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2508
      # we pass force_create=True to force LVM creation
2509
      for new_lv in dev.children:
2510
        try:
2511
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2512
                               new_lv, True, GetInstanceInfoText(self.instance),
2513
                               False, excl_stor)
2514
        except errors.DeviceCreationError, e:
2515
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2516

    
2517
    # Step 4: dbrd minors and drbd setups changes
2518
    # after this, we must manually remove the drbd minors on both the
2519
    # error and the success paths
2520
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2521
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2522
                                         for _ in self.instance.disks],
2523
                                        self.instance.uuid)
2524
    logging.debug("Allocated minors %r", minors)
2525

    
2526
    iv_names = {}
2527
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2528
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2529
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2530
      # create new devices on new_node; note that we create two IDs:
2531
      # one without port, so the drbd will be activated without
2532
      # networking information on the new node at this stage, and one
2533
      # with network, for the latter activation in step 4
2534
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2535
      if self.instance.primary_node == o_node1:
2536
        p_minor = o_minor1
2537
      else:
2538
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2539
        p_minor = o_minor2
2540

    
2541
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2542
                      p_minor, new_minor, o_secret)
2543
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2544
                    p_minor, new_minor, o_secret)
2545

    
2546
      iv_names[idx] = (dev, dev.children, new_net_id)
2547
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2548
                    new_net_id)
2549
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2550
                              logical_id=new_alone_id,
2551
                              children=dev.children,
2552
                              size=dev.size,
2553
                              params={})
2554
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2555
                                            self.cfg)
2556
      try:
2557
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2558
                             anno_new_drbd,
2559
                             GetInstanceInfoText(self.instance), False,
2560
                             excl_stor)
2561
      except errors.GenericError:
2562
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2563
        raise
2564

    
2565
    # We have new devices, shutdown the drbd on the old secondary
2566
    for idx, dev in enumerate(self.instance.disks):
2567
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2568
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2569
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2570
                                            (dev, self.instance)).fail_msg
2571
      if msg:
2572
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2573
                           "node: %s" % (idx, msg),
2574
                           hint=("Please cleanup this device manually as"
2575
                                 " soon as possible"))
2576

    
2577
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2578
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2579
                                               self.instance.disks)[pnode]
2580

    
2581
    msg = result.fail_msg
2582
    if msg:
2583
      # detaches didn't succeed (unlikely)
2584
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2585
      raise errors.OpExecError("Can't detach the disks from the network on"
2586
                               " old node: %s" % (msg,))
2587

    
2588
    # if we managed to detach at least one, we update all the disks of
2589
    # the instance to point to the new secondary
2590
    self.lu.LogInfo("Updating instance configuration")
2591
    for dev, _, new_logical_id in iv_names.itervalues():
2592
      dev.logical_id = new_logical_id
2593
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2594

    
2595
    self.cfg.Update(self.instance, feedback_fn)
2596

    
2597
    # Release all node locks (the configuration has been updated)
2598
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2599

    
2600
    # and now perform the drbd attach
2601
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2602
                    " (standalone => connected)")
2603
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2604
                                            self.new_node_uuid],
2605
                                           self.node_secondary_ip,
2606
                                           (self.instance.disks, self.instance),
2607
                                           self.instance.name,
2608
                                           False)
2609
    for to_node, to_result in result.items():
2610
      msg = to_result.fail_msg
2611
      if msg:
2612
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2613
                           self.cfg.GetNodeName(to_node), msg,
2614
                           hint=("please do a gnt-instance info to see the"
2615
                                 " status of disks"))
2616

    
2617
    cstep = itertools.count(5)
2618

    
2619
    if self.early_release:
2620
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2621
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2622
      # TODO: Check if releasing locks early still makes sense
2623
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2624
    else:
2625
      # Release all resource locks except those used by the instance
2626
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2627
                   keep=self.node_secondary_ip.keys())
2628

    
2629
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2630
    # shutdown in the caller into consideration.
2631

    
2632
    # Wait for sync
2633
    # This can fail as the old devices are degraded and _WaitForSync
2634
    # does a combined result over all disks, so we don't check its return value
2635
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2636
    WaitForSync(self.lu, self.instance)
2637

    
2638
    # Check all devices manually
2639
    self._CheckDevices(self.instance.primary_node, iv_names)
2640

    
2641
    # Step: remove old storage
2642
    if not self.early_release:
2643
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2644
      self._RemoveOldStorage(self.target_node_uuid, iv_names)