Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 0247d20f

History | View | Annotate | Download (101.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
import ganeti.rpc.node as rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  constants.DT_FILE: ".file",
56
  constants.DT_SHARED_FILE: ".sharedfile",
57
  }
58

    
59

    
60
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61
                         excl_stor):
62
  """Create a single block device on a given node.
63

64
  This will not recurse over children of the device, so they must be
65
  created in advance.
66

67
  @param lu: the lu on whose behalf we execute
68
  @param node_uuid: the node on which to create the device
69
  @type instance: L{objects.Instance}
70
  @param instance: the instance which owns the device
71
  @type device: L{objects.Disk}
72
  @param device: the device to create
73
  @param info: the extra 'metadata' we should attach to the device
74
      (this will be represented as a LVM tag)
75
  @type force_open: boolean
76
  @param force_open: this parameter will be passes to the
77
      L{backend.BlockdevCreate} function where it specifies
78
      whether we run on primary or not, and it affects both
79
      the child assembly and the device own Open() execution
80
  @type excl_stor: boolean
81
  @param excl_stor: Whether exclusive_storage is active for the node
82

83
  """
84
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
85
                                       device.size, instance.name, force_open,
86
                                       info, excl_stor)
87
  result.Raise("Can't create block device %s on"
88
               " node %s for instance %s" % (device,
89
                                             lu.cfg.GetNodeName(node_uuid),
90
                                             instance.name))
91

    
92

    
93
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
94
                         info, force_open, excl_stor):
95
  """Create a tree of block devices on a given node.
96

97
  If this device type has to be created on secondaries, create it and
98
  all its children.
99

100
  If not, just recurse to children keeping the same 'force' value.
101

102
  @attention: The device has to be annotated already.
103

104
  @param lu: the lu on whose behalf we execute
105
  @param node_uuid: the node on which to create the device
106
  @type instance: L{objects.Instance}
107
  @param instance: the instance which owns the device
108
  @type device: L{objects.Disk}
109
  @param device: the device to create
110
  @type force_create: boolean
111
  @param force_create: whether to force creation of this device; this
112
      will be change to True whenever we find a device which has
113
      CreateOnSecondary() attribute
114
  @param info: the extra 'metadata' we should attach to the device
115
      (this will be represented as a LVM tag)
116
  @type force_open: boolean
117
  @param force_open: this parameter will be passes to the
118
      L{backend.BlockdevCreate} function where it specifies
119
      whether we run on primary or not, and it affects both
120
      the child assembly and the device own Open() execution
121
  @type excl_stor: boolean
122
  @param excl_stor: Whether exclusive_storage is active for the node
123

124
  @return: list of created devices
125
  """
126
  created_devices = []
127
  try:
128
    if device.CreateOnSecondary():
129
      force_create = True
130

    
131
    if device.children:
132
      for child in device.children:
133
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
134
                                    force_create, info, force_open, excl_stor)
135
        created_devices.extend(devs)
136

    
137
    if not force_create:
138
      return created_devices
139

    
140
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
141
                         excl_stor)
142
    # The device has been completely created, so there is no point in keeping
143
    # its subdevices in the list. We just add the device itself instead.
144
    created_devices = [(node_uuid, device)]
145
    return created_devices
146

    
147
  except errors.DeviceCreationError, e:
148
    e.created_devices.extend(created_devices)
149
    raise e
150
  except errors.OpExecError, e:
151
    raise errors.DeviceCreationError(str(e), created_devices)
152

    
153

    
154
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
155
  """Whether exclusive_storage is in effect for the given node.
156

157
  @type cfg: L{config.ConfigWriter}
158
  @param cfg: The cluster configuration
159
  @type node_uuid: string
160
  @param node_uuid: The node UUID
161
  @rtype: bool
162
  @return: The effective value of exclusive_storage
163
  @raise errors.OpPrereqError: if no node exists with the given name
164

165
  """
166
  ni = cfg.GetNodeInfo(node_uuid)
167
  if ni is None:
168
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
169
                               errors.ECODE_NOENT)
170
  return IsExclusiveStorageEnabledNode(cfg, ni)
171

    
172

    
173
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
174
                    force_open):
175
  """Wrapper around L{_CreateBlockDevInner}.
176

177
  This method annotates the root device first.
178

179
  """
180
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
181
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
182
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
183
                              force_open, excl_stor)
184

    
185

    
186
def _UndoCreateDisks(lu, disks_created, instance):
187
  """Undo the work performed by L{CreateDisks}.
188

189
  This function is called in case of an error to undo the work of
190
  L{CreateDisks}.
191

192
  @type lu: L{LogicalUnit}
193
  @param lu: the logical unit on whose behalf we execute
194
  @param disks_created: the result returned by L{CreateDisks}
195
  @type instance: L{objects.Instance}
196
  @param instance: the instance for which disks were created
197

198
  """
199
  for (node_uuid, disk) in disks_created:
200
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
201
    result.Warn("Failed to remove newly-created disk %s on node %s" %
202
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
203

    
204

    
205
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
206
  """Create all disks for an instance.
207

208
  This abstracts away some work from AddInstance.
209

210
  @type lu: L{LogicalUnit}
211
  @param lu: the logical unit on whose behalf we execute
212
  @type instance: L{objects.Instance}
213
  @param instance: the instance whose disks we should create
214
  @type to_skip: list
215
  @param to_skip: list of indices to skip
216
  @type target_node_uuid: string
217
  @param target_node_uuid: if passed, overrides the target node for creation
218
  @type disks: list of {objects.Disk}
219
  @param disks: the disks to create; if not specified, all the disks of the
220
      instance are created
221
  @return: information about the created disks, to be used to call
222
      L{_UndoCreateDisks}
223
  @raise errors.OpPrereqError: in case of error
224

225
  """
226
  info = GetInstanceInfoText(instance)
227
  if target_node_uuid is None:
228
    pnode_uuid = instance.primary_node
229
    all_node_uuids = instance.all_nodes
230
  else:
231
    pnode_uuid = target_node_uuid
232
    all_node_uuids = [pnode_uuid]
233

    
234
  if disks is None:
235
    disks = instance.disks
236

    
237
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
238

    
239
  if instance.disk_template in constants.DTS_FILEBASED:
240
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
241
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
242

    
243
    result.Raise("Failed to create directory '%s' on"
244
                 " node %s" % (file_storage_dir,
245
                               lu.cfg.GetNodeName(pnode_uuid)))
246

    
247
  disks_created = []
248
  for idx, device in enumerate(disks):
249
    if to_skip and idx in to_skip:
250
      continue
251
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
252
    for node_uuid in all_node_uuids:
253
      f_create = node_uuid == pnode_uuid
254
      try:
255
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
256
                        f_create)
257
        disks_created.append((node_uuid, device))
258
      except errors.DeviceCreationError, e:
259
        logging.warning("Creating disk %s for instance '%s' failed",
260
                        idx, instance.name)
261
        disks_created.extend(e.created_devices)
262
        _UndoCreateDisks(lu, disks_created, instance)
263
        raise errors.OpExecError(e.message)
264
  return disks_created
265

    
266

    
267
def ComputeDiskSizePerVG(disk_template, disks):
268
  """Compute disk size requirements in the volume group
269

270
  """
271
  def _compute(disks, payload):
272
    """Universal algorithm.
273

274
    """
275
    vgs = {}
276
    for disk in disks:
277
      vgs[disk[constants.IDISK_VG]] = \
278
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
279

    
280
    return vgs
281

    
282
  # Required free disk space as a function of disk and swap space
283
  req_size_dict = {
284
    constants.DT_DISKLESS: {},
285
    constants.DT_PLAIN: _compute(disks, 0),
286
    # 128 MB are added for drbd metadata for each disk
287
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
288
    constants.DT_FILE: {},
289
    constants.DT_SHARED_FILE: {},
290
    constants.DT_GLUSTER: {},
291
    }
292

    
293
  if disk_template not in req_size_dict:
294
    raise errors.ProgrammerError("Disk template '%s' size requirement"
295
                                 " is unknown" % disk_template)
296

    
297
  return req_size_dict[disk_template]
298

    
299

    
300
def ComputeDisks(op, default_vg):
301
  """Computes the instance disks.
302

303
  @param op: The instance opcode
304
  @param default_vg: The default_vg to assume
305

306
  @return: The computed disks
307

308
  """
309
  disks = []
310
  for disk in op.disks:
311
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
312
    if mode not in constants.DISK_ACCESS_SET:
313
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
314
                                 mode, errors.ECODE_INVAL)
315
    size = disk.get(constants.IDISK_SIZE, None)
316
    if size is None:
317
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
318
    try:
319
      size = int(size)
320
    except (TypeError, ValueError):
321
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
322
                                 errors.ECODE_INVAL)
323

    
324
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
325
    if ext_provider and op.disk_template != constants.DT_EXT:
326
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
327
                                 " disk template, not %s" %
328
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
329
                                  op.disk_template), errors.ECODE_INVAL)
330

    
331
    data_vg = disk.get(constants.IDISK_VG, default_vg)
332
    name = disk.get(constants.IDISK_NAME, None)
333
    if name is not None and name.lower() == constants.VALUE_NONE:
334
      name = None
335
    new_disk = {
336
      constants.IDISK_SIZE: size,
337
      constants.IDISK_MODE: mode,
338
      constants.IDISK_VG: data_vg,
339
      constants.IDISK_NAME: name,
340
      }
341

    
342
    for key in [
343
      constants.IDISK_METAVG,
344
      constants.IDISK_ADOPT,
345
      constants.IDISK_SPINDLES,
346
      ]:
347
      if key in disk:
348
        new_disk[key] = disk[key]
349

    
350
    # For extstorage, demand the `provider' option and add any
351
    # additional parameters (ext-params) to the dict
352
    if op.disk_template == constants.DT_EXT:
353
      if ext_provider:
354
        new_disk[constants.IDISK_PROVIDER] = ext_provider
355
        for key in disk:
356
          if key not in constants.IDISK_PARAMS:
357
            new_disk[key] = disk[key]
358
      else:
359
        raise errors.OpPrereqError("Missing provider for template '%s'" %
360
                                   constants.DT_EXT, errors.ECODE_INVAL)
361

    
362
    disks.append(new_disk)
363

    
364
  return disks
365

    
366

    
367
def CheckRADOSFreeSpace():
368
  """Compute disk size requirements inside the RADOS cluster.
369

370
  """
371
  # For the RADOS cluster we assume there is always enough space.
372
  pass
373

    
374

    
375
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
376
                         iv_name, p_minor, s_minor):
377
  """Generate a drbd8 device complete with its children.
378

379
  """
380
  assert len(vgnames) == len(names) == 2
381
  port = lu.cfg.AllocatePort()
382
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
383

    
384
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
385
                          logical_id=(vgnames[0], names[0]),
386
                          params={})
387
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
388
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
389
                          size=constants.DRBD_META_SIZE,
390
                          logical_id=(vgnames[1], names[1]),
391
                          params={})
392
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
393
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
394
                          logical_id=(primary_uuid, secondary_uuid, port,
395
                                      p_minor, s_minor,
396
                                      shared_secret),
397
                          children=[dev_data, dev_meta],
398
                          iv_name=iv_name, params={})
399
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400
  return drbd_dev
401

    
402

    
403
def GenerateDiskTemplate(
404
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
405
  disk_info, file_storage_dir, file_driver, base_index,
406
  feedback_fn, full_disk_params):
407
  """Generate the entire disk layout for a given template type.
408

409
  """
410
  vgname = lu.cfg.GetVGName()
411
  disk_count = len(disk_info)
412
  disks = []
413

    
414
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
415

    
416
  if template_name == constants.DT_DISKLESS:
417
    pass
418
  elif template_name == constants.DT_DRBD8:
419
    if len(secondary_node_uuids) != 1:
420
      raise errors.ProgrammerError("Wrong template configuration")
421
    remote_node_uuid = secondary_node_uuids[0]
422
    minors = lu.cfg.AllocateDRBDMinor(
423
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
424

    
425
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426
                                                       full_disk_params)
427
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428

    
429
    names = []
430
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431
                                               for i in range(disk_count)]):
432
      names.append(lv_prefix + "_data")
433
      names.append(lv_prefix + "_meta")
434
    for idx, disk in enumerate(disk_info):
435
      disk_index = idx + base_index
436
      data_vg = disk.get(constants.IDISK_VG, vgname)
437
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
439
                                      disk[constants.IDISK_SIZE],
440
                                      [data_vg, meta_vg],
441
                                      names[idx * 2:idx * 2 + 2],
442
                                      "disk/%d" % disk_index,
443
                                      minors[idx * 2], minors[idx * 2 + 1])
444
      disk_dev.mode = disk[constants.IDISK_MODE]
445
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
446
      disks.append(disk_dev)
447
  else:
448
    if secondary_node_uuids:
449
      raise errors.ProgrammerError("Wrong template configuration")
450

    
451
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
452
    if name_prefix is None:
453
      names = None
454
    else:
455
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
456
                                        (name_prefix, base_index + i)
457
                                        for i in range(disk_count)])
458

    
459
    if template_name == constants.DT_PLAIN:
460

    
461
      def logical_id_fn(idx, _, disk):
462
        vg = disk.get(constants.IDISK_VG, vgname)
463
        return (vg, names[idx])
464

    
465
    elif template_name == constants.DT_GLUSTER:
466
      logical_id_fn = lambda _1, disk_index, _2: \
467
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
468
                                        disk_index))
469

    
470
    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
471
      logical_id_fn = \
472
        lambda _, disk_index, disk: (file_driver,
473
                                     "%s/%s" % (file_storage_dir,
474
                                                names[idx]))
475
    elif template_name == constants.DT_BLOCK:
476
      logical_id_fn = \
477
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478
                                       disk[constants.IDISK_ADOPT])
479
    elif template_name == constants.DT_RBD:
480
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481
    elif template_name == constants.DT_EXT:
482
      def logical_id_fn(idx, _, disk):
483
        provider = disk.get(constants.IDISK_PROVIDER, None)
484
        if provider is None:
485
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486
                                       " not found", constants.DT_EXT,
487
                                       constants.IDISK_PROVIDER)
488
        return (provider, names[idx])
489
    else:
490
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491

    
492
    dev_type = template_name
493

    
494
    for idx, disk in enumerate(disk_info):
495
      params = {}
496
      # Only for the Ext template add disk_info to params
497
      if template_name == constants.DT_EXT:
498
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499
        for key in disk:
500
          if key not in constants.IDISK_PARAMS:
501
            params[key] = disk[key]
502
      disk_index = idx + base_index
503
      size = disk[constants.IDISK_SIZE]
504
      feedback_fn("* disk %s, size %s" %
505
                  (disk_index, utils.FormatUnit(size, "h")))
506
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
507
                              logical_id=logical_id_fn(idx, disk_index, disk),
508
                              iv_name="disk/%d" % disk_index,
509
                              mode=disk[constants.IDISK_MODE],
510
                              params=params,
511
                              spindles=disk.get(constants.IDISK_SPINDLES))
512
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
513
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
514
      disks.append(disk_dev)
515

    
516
  return disks
517

    
518

    
519
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
520
  """Check the presence of the spindle options with exclusive_storage.
521

522
  @type diskdict: dict
523
  @param diskdict: disk parameters
524
  @type es_flag: bool
525
  @param es_flag: the effective value of the exlusive_storage flag
526
  @type required: bool
527
  @param required: whether spindles are required or just optional
528
  @raise errors.OpPrereqError when spindles are given and they should not
529

530
  """
531
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
532
      diskdict[constants.IDISK_SPINDLES] is not None):
533
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
534
                               " when exclusive storage is not active",
535
                               errors.ECODE_INVAL)
536
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
537
                                diskdict[constants.IDISK_SPINDLES] is None)):
538
    raise errors.OpPrereqError("You must specify spindles in instance disks"
539
                               " when exclusive storage is active",
540
                               errors.ECODE_INVAL)
541

    
542

    
543
class LUInstanceRecreateDisks(LogicalUnit):
544
  """Recreate an instance's missing disks.
545

546
  """
547
  HPATH = "instance-recreate-disks"
548
  HTYPE = constants.HTYPE_INSTANCE
549
  REQ_BGL = False
550

    
551
  _MODIFYABLE = compat.UniqueFrozenset([
552
    constants.IDISK_SIZE,
553
    constants.IDISK_MODE,
554
    constants.IDISK_SPINDLES,
555
    ])
556

    
557
  # New or changed disk parameters may have different semantics
558
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
559
    constants.IDISK_ADOPT,
560

    
561
    # TODO: Implement support changing VG while recreating
562
    constants.IDISK_VG,
563
    constants.IDISK_METAVG,
564
    constants.IDISK_PROVIDER,
565
    constants.IDISK_NAME,
566
    ]))
567

    
568
  def _RunAllocator(self):
569
    """Run the allocator based on input opcode.
570

571
    """
572
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
573

    
574
    # FIXME
575
    # The allocator should actually run in "relocate" mode, but current
576
    # allocators don't support relocating all the nodes of an instance at
577
    # the same time. As a workaround we use "allocate" mode, but this is
578
    # suboptimal for two reasons:
579
    # - The instance name passed to the allocator is present in the list of
580
    #   existing instances, so there could be a conflict within the
581
    #   internal structures of the allocator. This doesn't happen with the
582
    #   current allocators, but it's a liability.
583
    # - The allocator counts the resources used by the instance twice: once
584
    #   because the instance exists already, and once because it tries to
585
    #   allocate a new instance.
586
    # The allocator could choose some of the nodes on which the instance is
587
    # running, but that's not a problem. If the instance nodes are broken,
588
    # they should be already be marked as drained or offline, and hence
589
    # skipped by the allocator. If instance disks have been lost for other
590
    # reasons, then recreating the disks on the same nodes should be fine.
591
    disk_template = self.instance.disk_template
592
    spindle_use = be_full[constants.BE_SPINDLE_USE]
593
    disks = [{
594
      constants.IDISK_SIZE: d.size,
595
      constants.IDISK_MODE: d.mode,
596
      constants.IDISK_SPINDLES: d.spindles,
597
      } for d in self.instance.disks]
598
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
599
                                        disk_template=disk_template,
600
                                        tags=list(self.instance.GetTags()),
601
                                        os=self.instance.os,
602
                                        nics=[{}],
603
                                        vcpus=be_full[constants.BE_VCPUS],
604
                                        memory=be_full[constants.BE_MAXMEM],
605
                                        spindle_use=spindle_use,
606
                                        disks=disks,
607
                                        hypervisor=self.instance.hypervisor,
608
                                        node_whitelist=None)
609
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
610

    
611
    ial.Run(self.op.iallocator)
612

    
613
    assert req.RequiredNodes() == len(self.instance.all_nodes)
614

    
615
    if not ial.success:
616
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
617
                                 " %s" % (self.op.iallocator, ial.info),
618
                                 errors.ECODE_NORES)
619

    
620
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
621
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
622
                 self.op.instance_name, self.op.iallocator,
623
                 utils.CommaJoin(self.op.nodes))
624

    
625
  def CheckArguments(self):
626
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
627
      # Normalize and convert deprecated list of disk indices
628
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
629

    
630
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
631
    if duplicates:
632
      raise errors.OpPrereqError("Some disks have been specified more than"
633
                                 " once: %s" % utils.CommaJoin(duplicates),
634
                                 errors.ECODE_INVAL)
635

    
636
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
637
    # when neither iallocator nor nodes are specified
638
    if self.op.iallocator or self.op.nodes:
639
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
640

    
641
    for (idx, params) in self.op.disks:
642
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
643
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
644
      if unsupported:
645
        raise errors.OpPrereqError("Parameters for disk %s try to change"
646
                                   " unmodifyable parameter(s): %s" %
647
                                   (idx, utils.CommaJoin(unsupported)),
648
                                   errors.ECODE_INVAL)
649

    
650
  def ExpandNames(self):
651
    self._ExpandAndLockInstance()
652
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
653

    
654
    if self.op.nodes:
655
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
656
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
657
    else:
658
      self.needed_locks[locking.LEVEL_NODE] = []
659
      if self.op.iallocator:
660
        # iallocator will select a new node in the same group
661
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
662
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
663

    
664
    self.needed_locks[locking.LEVEL_NODE_RES] = []
665

    
666
  def DeclareLocks(self, level):
667
    if level == locking.LEVEL_NODEGROUP:
668
      assert self.op.iallocator is not None
669
      assert not self.op.nodes
670
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
671
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
672
      # Lock the primary group used by the instance optimistically; this
673
      # requires going via the node before it's locked, requiring
674
      # verification later on
675
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
676
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
677

    
678
    elif level == locking.LEVEL_NODE:
679
      # If an allocator is used, then we lock all the nodes in the current
680
      # instance group, as we don't know yet which ones will be selected;
681
      # if we replace the nodes without using an allocator, locks are
682
      # already declared in ExpandNames; otherwise, we need to lock all the
683
      # instance nodes for disk re-creation
684
      if self.op.iallocator:
685
        assert not self.op.nodes
686
        assert not self.needed_locks[locking.LEVEL_NODE]
687
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
688

    
689
        # Lock member nodes of the group of the primary node
690
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
691
          self.needed_locks[locking.LEVEL_NODE].extend(
692
            self.cfg.GetNodeGroup(group_uuid).members)
693

    
694
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
695
      elif not self.op.nodes:
696
        self._LockInstancesNodes(primary_only=False)
697
    elif level == locking.LEVEL_NODE_RES:
698
      # Copy node locks
699
      self.needed_locks[locking.LEVEL_NODE_RES] = \
700
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
701

    
702
  def BuildHooksEnv(self):
703
    """Build hooks env.
704

705
    This runs on master, primary and secondary nodes of the instance.
706

707
    """
708
    return BuildInstanceHookEnvByObject(self, self.instance)
709

    
710
  def BuildHooksNodes(self):
711
    """Build hooks nodes.
712

713
    """
714
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
715
    return (nl, nl)
716

    
717
  def CheckPrereq(self):
718
    """Check prerequisites.
719

720
    This checks that the instance is in the cluster and is not running.
721

722
    """
723
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
724
    assert instance is not None, \
725
      "Cannot retrieve locked instance %s" % self.op.instance_name
726
    if self.op.node_uuids:
727
      if len(self.op.node_uuids) != len(instance.all_nodes):
728
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
729
                                   " %d replacement nodes were specified" %
730
                                   (instance.name, len(instance.all_nodes),
731
                                    len(self.op.node_uuids)),
732
                                   errors.ECODE_INVAL)
733
      assert instance.disk_template != constants.DT_DRBD8 or \
734
             len(self.op.node_uuids) == 2
735
      assert instance.disk_template != constants.DT_PLAIN or \
736
             len(self.op.node_uuids) == 1
737
      primary_node = self.op.node_uuids[0]
738
    else:
739
      primary_node = instance.primary_node
740
    if not self.op.iallocator:
741
      CheckNodeOnline(self, primary_node)
742

    
743
    if instance.disk_template == constants.DT_DISKLESS:
744
      raise errors.OpPrereqError("Instance '%s' has no disks" %
745
                                 self.op.instance_name, errors.ECODE_INVAL)
746

    
747
    # Verify if node group locks are still correct
748
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
749
    if owned_groups:
750
      # Node group locks are acquired only for the primary node (and only
751
      # when the allocator is used)
752
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
753
                              primary_only=True)
754

    
755
    # if we replace nodes *and* the old primary is offline, we don't
756
    # check the instance state
757
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
758
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
759
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
760
                         msg="cannot recreate disks")
761

    
762
    if self.op.disks:
763
      self.disks = dict(self.op.disks)
764
    else:
765
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
766

    
767
    maxidx = max(self.disks.keys())
768
    if maxidx >= len(instance.disks):
769
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
770
                                 errors.ECODE_INVAL)
771

    
772
    if ((self.op.node_uuids or self.op.iallocator) and
773
         sorted(self.disks.keys()) != range(len(instance.disks))):
774
      raise errors.OpPrereqError("Can't recreate disks partially and"
775
                                 " change the nodes at the same time",
776
                                 errors.ECODE_INVAL)
777

    
778
    self.instance = instance
779

    
780
    if self.op.iallocator:
781
      self._RunAllocator()
782
      # Release unneeded node and node resource locks
783
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
784
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
785
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
786

    
787
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
788

    
789
    if self.op.node_uuids:
790
      node_uuids = self.op.node_uuids
791
    else:
792
      node_uuids = instance.all_nodes
793
    excl_stor = compat.any(
794
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
795
      )
796
    for new_params in self.disks.values():
797
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
798

    
799
  def Exec(self, feedback_fn):
800
    """Recreate the disks.
801

802
    """
803
    assert (self.owned_locks(locking.LEVEL_NODE) ==
804
            self.owned_locks(locking.LEVEL_NODE_RES))
805

    
806
    to_skip = []
807
    mods = [] # keeps track of needed changes
808

    
809
    for idx, disk in enumerate(self.instance.disks):
810
      try:
811
        changes = self.disks[idx]
812
      except KeyError:
813
        # Disk should not be recreated
814
        to_skip.append(idx)
815
        continue
816

    
817
      # update secondaries for disks, if needed
818
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
819
        # need to update the nodes and minors
820
        assert len(self.op.node_uuids) == 2
821
        assert len(disk.logical_id) == 6 # otherwise disk internals
822
                                         # have changed
823
        (_, _, old_port, _, _, old_secret) = disk.logical_id
824
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
825
                                                self.instance.uuid)
826
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
827
                  new_minors[0], new_minors[1], old_secret)
828
        assert len(disk.logical_id) == len(new_id)
829
      else:
830
        new_id = None
831

    
832
      mods.append((idx, new_id, changes))
833

    
834
    # now that we have passed all asserts above, we can apply the mods
835
    # in a single run (to avoid partial changes)
836
    for idx, new_id, changes in mods:
837
      disk = self.instance.disks[idx]
838
      if new_id is not None:
839
        assert disk.dev_type == constants.DT_DRBD8
840
        disk.logical_id = new_id
841
      if changes:
842
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
843
                    mode=changes.get(constants.IDISK_MODE, None),
844
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
845

    
846
    # change primary node, if needed
847
    if self.op.node_uuids:
848
      self.instance.primary_node = self.op.node_uuids[0]
849
      self.LogWarning("Changing the instance's nodes, you will have to"
850
                      " remove any disks left on the older nodes manually")
851

    
852
    if self.op.node_uuids:
853
      self.cfg.Update(self.instance, feedback_fn)
854

    
855
    # All touched nodes must be locked
856
    mylocks = self.owned_locks(locking.LEVEL_NODE)
857
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
858
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
859

    
860
    # TODO: Release node locks before wiping, or explain why it's not possible
861
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
862
      wipedisks = [(idx, disk, 0)
863
                   for (idx, disk) in enumerate(self.instance.disks)
864
                   if idx not in to_skip]
865
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
866
                         cleanup=new_disks)
867

    
868

    
869
def _PerformNodeInfoCall(lu, node_uuids, vg):
870
  """Prepares the input and performs a node info call.
871

872
  @type lu: C{LogicalUnit}
873
  @param lu: a logical unit from which we get configuration data
874
  @type node_uuids: list of string
875
  @param node_uuids: list of node UUIDs to perform the call for
876
  @type vg: string
877
  @param vg: the volume group's name
878

879
  """
880
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
881
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
882
                                                  node_uuids)
883
  hvname = lu.cfg.GetHypervisorType()
884
  hvparams = lu.cfg.GetClusterInfo().hvparams
885
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
886
                                   [(hvname, hvparams[hvname])])
887
  return nodeinfo
888

    
889

    
890
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
891
  """Checks the vg capacity for a given node.
892

893
  @type node_info: tuple (_, list of dicts, _)
894
  @param node_info: the result of the node info call for one node
895
  @type node_name: string
896
  @param node_name: the name of the node
897
  @type vg: string
898
  @param vg: volume group name
899
  @type requested: int
900
  @param requested: the amount of disk in MiB to check for
901
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
902
      or we cannot check the node
903

904
  """
905
  (_, space_info, _) = node_info
906
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
907
      space_info, constants.ST_LVM_VG)
908
  if not lvm_vg_info:
909
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
910
  vg_free = lvm_vg_info.get("storage_free", None)
911
  if not isinstance(vg_free, int):
912
    raise errors.OpPrereqError("Can't compute free disk space on node"
913
                               " %s for vg %s, result was '%s'" %
914
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
915
  if requested > vg_free:
916
    raise errors.OpPrereqError("Not enough disk space on target node %s"
917
                               " vg %s: required %d MiB, available %d MiB" %
918
                               (node_name, vg, requested, vg_free),
919
                               errors.ECODE_NORES)
920

    
921

    
922
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
923
  """Checks if nodes have enough free disk space in the specified VG.
924

925
  This function checks if all given nodes have the needed amount of
926
  free disk. In case any node has less disk or we cannot get the
927
  information from the node, this function raises an OpPrereqError
928
  exception.
929

930
  @type lu: C{LogicalUnit}
931
  @param lu: a logical unit from which we get configuration data
932
  @type node_uuids: C{list}
933
  @param node_uuids: the list of node UUIDs to check
934
  @type vg: C{str}
935
  @param vg: the volume group to check
936
  @type requested: C{int}
937
  @param requested: the amount of disk in MiB to check for
938
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
939
      or we cannot check the node
940

941
  """
942
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
943
  for node_uuid in node_uuids:
944
    node_name = lu.cfg.GetNodeName(node_uuid)
945
    info = nodeinfo[node_uuid]
946
    info.Raise("Cannot get current information from node %s" % node_name,
947
               prereq=True, ecode=errors.ECODE_ENVIRON)
948
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
949

    
950

    
951
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
952
  """Checks if nodes have enough free disk space in all the VGs.
953

954
  This function checks if all given nodes have the needed amount of
955
  free disk. In case any node has less disk or we cannot get the
956
  information from the node, this function raises an OpPrereqError
957
  exception.
958

959
  @type lu: C{LogicalUnit}
960
  @param lu: a logical unit from which we get configuration data
961
  @type node_uuids: C{list}
962
  @param node_uuids: the list of node UUIDs to check
963
  @type req_sizes: C{dict}
964
  @param req_sizes: the hash of vg and corresponding amount of disk in
965
      MiB to check for
966
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
967
      or we cannot check the node
968

969
  """
970
  for vg, req_size in req_sizes.items():
971
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
972

    
973

    
974
def _DiskSizeInBytesToMebibytes(lu, size):
975
  """Converts a disk size in bytes to mebibytes.
976

977
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
978

979
  """
980
  (mib, remainder) = divmod(size, 1024 * 1024)
981

    
982
  if remainder != 0:
983
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
984
                  " to not overwrite existing data (%s bytes will not be"
985
                  " wiped)", (1024 * 1024) - remainder)
986
    mib += 1
987

    
988
  return mib
989

    
990

    
991
def _CalcEta(time_taken, written, total_size):
992
  """Calculates the ETA based on size written and total size.
993

994
  @param time_taken: The time taken so far
995
  @param written: amount written so far
996
  @param total_size: The total size of data to be written
997
  @return: The remaining time in seconds
998

999
  """
1000
  avg_time = time_taken / float(written)
1001
  return (total_size - written) * avg_time
1002

    
1003

    
1004
def WipeDisks(lu, instance, disks=None):
1005
  """Wipes instance disks.
1006

1007
  @type lu: L{LogicalUnit}
1008
  @param lu: the logical unit on whose behalf we execute
1009
  @type instance: L{objects.Instance}
1010
  @param instance: the instance whose disks we should create
1011
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1012
  @param disks: Disk details; tuple contains disk index, disk object and the
1013
    start offset
1014

1015
  """
1016
  node_uuid = instance.primary_node
1017
  node_name = lu.cfg.GetNodeName(node_uuid)
1018

    
1019
  if disks is None:
1020
    disks = [(idx, disk, 0)
1021
             for (idx, disk) in enumerate(instance.disks)]
1022

    
1023
  logging.info("Pausing synchronization of disks of instance '%s'",
1024
               instance.name)
1025
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1026
                                                  (map(compat.snd, disks),
1027
                                                   instance),
1028
                                                  True)
1029
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1030

    
1031
  for idx, success in enumerate(result.payload):
1032
    if not success:
1033
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1034
                   " failed", idx, instance.name)
1035

    
1036
  try:
1037
    for (idx, device, offset) in disks:
1038
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1039
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1040
      wipe_chunk_size = \
1041
        int(min(constants.MAX_WIPE_CHUNK,
1042
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1043

    
1044
      size = device.size
1045
      last_output = 0
1046
      start_time = time.time()
1047

    
1048
      if offset == 0:
1049
        info_text = ""
1050
      else:
1051
        info_text = (" (from %s to %s)" %
1052
                     (utils.FormatUnit(offset, "h"),
1053
                      utils.FormatUnit(size, "h")))
1054

    
1055
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1056

    
1057
      logging.info("Wiping disk %d for instance %s on node %s using"
1058
                   " chunk size %s", idx, instance.name, node_name,
1059
                   wipe_chunk_size)
1060

    
1061
      while offset < size:
1062
        wipe_size = min(wipe_chunk_size, size - offset)
1063

    
1064
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1065
                      idx, offset, wipe_size)
1066

    
1067
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1068
                                           offset, wipe_size)
1069
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1070
                     (idx, offset, wipe_size))
1071

    
1072
        now = time.time()
1073
        offset += wipe_size
1074
        if now - last_output >= 60:
1075
          eta = _CalcEta(now - start_time, offset, size)
1076
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1077
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1078
          last_output = now
1079
  finally:
1080
    logging.info("Resuming synchronization of disks for instance '%s'",
1081
                 instance.name)
1082

    
1083
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1084
                                                    (map(compat.snd, disks),
1085
                                                     instance),
1086
                                                    False)
1087

    
1088
    if result.fail_msg:
1089
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1090
                    node_name, result.fail_msg)
1091
    else:
1092
      for idx, success in enumerate(result.payload):
1093
        if not success:
1094
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1095
                        " failed", idx, instance.name)
1096

    
1097

    
1098
def ImageDisks(lu, instance, image, disks=None):
1099
  """Dumps an image onto an instance disk.
1100

1101
  @type lu: L{LogicalUnit}
1102
  @param lu: the logical unit on whose behalf we execute
1103
  @type instance: L{objects.Instance}
1104
  @param instance: the instance whose disks we should create
1105
  @type image: string
1106
  @param image: the image whose disks we should create
1107
  @type disks: None or list of ints
1108
  @param disks: disk indices
1109

1110
  """
1111
  node_uuid = instance.primary_node
1112
  node_name = lu.cfg.GetNodeName(node_uuid)
1113

    
1114
  if disks is None:
1115
    disks = [(0, instance.disks[0])]
1116
  else:
1117
    disks = map(lambda idx: instance.disks[idx], disks)
1118

    
1119
  logging.info("Pausing synchronization of disks of instance '%s'",
1120
               instance.name)
1121
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1122
                                                  (map(compat.snd, disks),
1123
                                                   instance),
1124
                                                  True)
1125
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1126

    
1127
  for idx, success in enumerate(result.payload):
1128
    if not success:
1129
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1130
                   " failed", idx, instance.name)
1131

    
1132
  try:
1133
    for (idx, device) in disks:
1134
      lu.LogInfo("Imaging disk '%d' for instance '%s' on node '%s'",
1135
                 idx, instance.name, node_name)
1136

    
1137
      result = lu.rpc.call_blockdev_image(node_uuid, (device, instance),
1138
                                          image, device.size)
1139
      result.Raise("Could not image disk '%d' for instance '%s' on node '%s'" %
1140
                   (idx, instance.name, node_name))
1141
  finally:
1142
    logging.info("Resuming synchronization of disks for instance '%s'",
1143
                 instance.name)
1144

    
1145
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1146
                                                    (map(compat.snd, disks),
1147
                                                     instance),
1148
                                                    False)
1149

    
1150
    if result.fail_msg:
1151
      lu.LogWarning("Failed to resume disk synchronization for instance '%s' on"
1152
                    " node '%s'", node_name, result.fail_msg)
1153
    else:
1154
      for idx, success in enumerate(result.payload):
1155
        if not success:
1156
          lu.LogWarning("Failed to resume synchronization of disk '%d' of"
1157
                        " instance '%s'", idx, instance.name)
1158

    
1159

    
1160
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1161
  """Wrapper for L{WipeDisks} that handles errors.
1162

1163
  @type lu: L{LogicalUnit}
1164
  @param lu: the logical unit on whose behalf we execute
1165
  @type instance: L{objects.Instance}
1166
  @param instance: the instance whose disks we should wipe
1167
  @param disks: see L{WipeDisks}
1168
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1169
      case of error
1170
  @raise errors.OpPrereqError: in case of failure
1171

1172
  """
1173
  try:
1174
    WipeDisks(lu, instance, disks=disks)
1175
  except errors.OpExecError:
1176
    logging.warning("Wiping disks for instance '%s' failed",
1177
                    instance.name)
1178
    _UndoCreateDisks(lu, cleanup, instance)
1179
    raise
1180

    
1181

    
1182
def ExpandCheckDisks(instance, disks):
1183
  """Return the instance disks selected by the disks list
1184

1185
  @type disks: list of L{objects.Disk} or None
1186
  @param disks: selected disks
1187
  @rtype: list of L{objects.Disk}
1188
  @return: selected instance disks to act on
1189

1190
  """
1191
  if disks is None:
1192
    return instance.disks
1193
  else:
1194
    if not set(disks).issubset(instance.disks):
1195
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1196
                                   " target instance: expected a subset of %r,"
1197
                                   " got %r" % (instance.disks, disks))
1198
    return disks
1199

    
1200

    
1201
def WaitForSync(lu, instance, disks=None, oneshot=False):
1202
  """Sleep and poll for an instance's disk to sync.
1203

1204
  """
1205
  if not instance.disks or disks is not None and not disks:
1206
    return True
1207

    
1208
  disks = ExpandCheckDisks(instance, disks)
1209

    
1210
  if not oneshot:
1211
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1212

    
1213
  node_uuid = instance.primary_node
1214
  node_name = lu.cfg.GetNodeName(node_uuid)
1215

    
1216
  # TODO: Convert to utils.Retry
1217

    
1218
  retries = 0
1219
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1220
  while True:
1221
    max_time = 0
1222
    done = True
1223
    cumul_degraded = False
1224
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1225
    msg = rstats.fail_msg
1226
    if msg:
1227
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1228
      retries += 1
1229
      if retries >= 10:
1230
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1231
                                 " aborting." % node_name)
1232
      time.sleep(6)
1233
      continue
1234
    rstats = rstats.payload
1235
    retries = 0
1236
    for i, mstat in enumerate(rstats):
1237
      if mstat is None:
1238
        lu.LogWarning("Can't compute data for node %s/%s",
1239
                      node_name, disks[i].iv_name)
1240
        continue
1241

    
1242
      cumul_degraded = (cumul_degraded or
1243
                        (mstat.is_degraded and mstat.sync_percent is None))
1244
      if mstat.sync_percent is not None:
1245
        done = False
1246
        if mstat.estimated_time is not None:
1247
          rem_time = ("%s remaining (estimated)" %
1248
                      utils.FormatSeconds(mstat.estimated_time))
1249
          max_time = mstat.estimated_time
1250
        else:
1251
          rem_time = "no time estimate"
1252
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1253
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1254

    
1255
    # if we're done but degraded, let's do a few small retries, to
1256
    # make sure we see a stable and not transient situation; therefore
1257
    # we force restart of the loop
1258
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1259
      logging.info("Degraded disks found, %d retries left", degr_retries)
1260
      degr_retries -= 1
1261
      time.sleep(1)
1262
      continue
1263

    
1264
    if done or oneshot:
1265
      break
1266

    
1267
    time.sleep(min(60, max_time))
1268

    
1269
  if done:
1270
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1271

    
1272
  return not cumul_degraded
1273

    
1274

    
1275
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1276
  """Shutdown block devices of an instance.
1277

1278
  This does the shutdown on all nodes of the instance.
1279

1280
  If the ignore_primary is false, errors on the primary node are
1281
  ignored.
1282

1283
  """
1284
  all_result = True
1285

    
1286
  if disks is None:
1287
    # only mark instance disks as inactive if all disks are affected
1288
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1289
  disks = ExpandCheckDisks(instance, disks)
1290

    
1291
  for disk in disks:
1292
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1293
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1294
      msg = result.fail_msg
1295
      if msg:
1296
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1297
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1298
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1299
            (node_uuid != instance.primary_node and not result.offline)):
1300
          all_result = False
1301
  return all_result
1302

    
1303

    
1304
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1305
  """Shutdown block devices of an instance.
1306

1307
  This function checks if an instance is running, before calling
1308
  _ShutdownInstanceDisks.
1309

1310
  """
1311
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1312
  ShutdownInstanceDisks(lu, instance, disks=disks)
1313

    
1314

    
1315
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1316
                          ignore_size=False):
1317
  """Prepare the block devices for an instance.
1318

1319
  This sets up the block devices on all nodes.
1320

1321
  @type lu: L{LogicalUnit}
1322
  @param lu: the logical unit on whose behalf we execute
1323
  @type instance: L{objects.Instance}
1324
  @param instance: the instance for whose disks we assemble
1325
  @type disks: list of L{objects.Disk} or None
1326
  @param disks: which disks to assemble (or all, if None)
1327
  @type ignore_secondaries: boolean
1328
  @param ignore_secondaries: if true, errors on secondary nodes
1329
      won't result in an error return from the function
1330
  @type ignore_size: boolean
1331
  @param ignore_size: if true, the current known size of the disk
1332
      will not be used during the disk activation, useful for cases
1333
      when the size is wrong
1334
  @return: False if the operation failed, otherwise a list of
1335
      (host, instance_visible_name, node_visible_name)
1336
      with the mapping from node devices to instance devices
1337

1338
  """
1339
  device_info = []
1340
  disks_ok = True
1341

    
1342
  if disks is None:
1343
    # only mark instance disks as active if all disks are affected
1344
    lu.cfg.MarkInstanceDisksActive(instance.uuid)
1345

    
1346
  disks = ExpandCheckDisks(instance, disks)
1347

    
1348
  # With the two passes mechanism we try to reduce the window of
1349
  # opportunity for the race condition of switching DRBD to primary
1350
  # before handshaking occured, but we do not eliminate it
1351

    
1352
  # The proper fix would be to wait (with some limits) until the
1353
  # connection has been made and drbd transitions from WFConnection
1354
  # into any other network-connected state (Connected, SyncTarget,
1355
  # SyncSource, etc.)
1356

    
1357
  # 1st pass, assemble on all nodes in secondary mode
1358
  for idx, inst_disk in enumerate(disks):
1359
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1360
                                  instance.primary_node):
1361
      if ignore_size:
1362
        node_disk = node_disk.Copy()
1363
        node_disk.UnsetSize()
1364
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1365
                                             instance.name, False, idx)
1366
      msg = result.fail_msg
1367
      if msg:
1368
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1369
                                result.offline)
1370
        lu.LogWarning("Could not prepare block device %s on node %s"
1371
                      " (is_primary=False, pass=1): %s",
1372
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1373
        if not (ignore_secondaries or is_offline_secondary):
1374
          disks_ok = False
1375

    
1376
  # FIXME: race condition on drbd migration to primary
1377

    
1378
  # 2nd pass, do only the primary node
1379
  for idx, inst_disk in enumerate(disks):
1380
    dev_path = None
1381

    
1382
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1383
                                  instance.primary_node):
1384
      if node_uuid != instance.primary_node:
1385
        continue
1386
      if ignore_size:
1387
        node_disk = node_disk.Copy()
1388
        node_disk.UnsetSize()
1389
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1390
                                             instance.name, True, idx)
1391
      msg = result.fail_msg
1392
      if msg:
1393
        lu.LogWarning("Could not prepare block device %s on node %s"
1394
                      " (is_primary=True, pass=2): %s",
1395
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1396
        disks_ok = False
1397
      else:
1398
        dev_path, _ = result.payload
1399

    
1400
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1401
                        inst_disk.iv_name, dev_path))
1402

    
1403
  if not disks_ok:
1404
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1405

    
1406
  return disks_ok, device_info
1407

    
1408

    
1409
def StartInstanceDisks(lu, instance, force):
1410
  """Start the disks of an instance.
1411

1412
  """
1413
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1414
                                      ignore_secondaries=force)
1415
  if not disks_ok:
1416
    ShutdownInstanceDisks(lu, instance)
1417
    if force is not None and not force:
1418
      lu.LogWarning("",
1419
                    hint=("If the message above refers to a secondary node,"
1420
                          " you can retry the operation using '--force'"))
1421
    raise errors.OpExecError("Disk consistency error")
1422

    
1423

    
1424
class LUInstanceGrowDisk(LogicalUnit):
1425
  """Grow a disk of an instance.
1426

1427
  """
1428
  HPATH = "disk-grow"
1429
  HTYPE = constants.HTYPE_INSTANCE
1430
  REQ_BGL = False
1431

    
1432
  def ExpandNames(self):
1433
    self._ExpandAndLockInstance()
1434
    self.needed_locks[locking.LEVEL_NODE] = []
1435
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1436
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1437
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1438

    
1439
  def DeclareLocks(self, level):
1440
    if level == locking.LEVEL_NODE:
1441
      self._LockInstancesNodes()
1442
    elif level == locking.LEVEL_NODE_RES:
1443
      # Copy node locks
1444
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1445
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1446

    
1447
  def BuildHooksEnv(self):
1448
    """Build hooks env.
1449

1450
    This runs on the master, the primary and all the secondaries.
1451

1452
    """
1453
    env = {
1454
      "DISK": self.op.disk,
1455
      "AMOUNT": self.op.amount,
1456
      "ABSOLUTE": self.op.absolute,
1457
      }
1458
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1459
    return env
1460

    
1461
  def BuildHooksNodes(self):
1462
    """Build hooks nodes.
1463

1464
    """
1465
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1466
    return (nl, nl)
1467

    
1468
  def CheckPrereq(self):
1469
    """Check prerequisites.
1470

1471
    This checks that the instance is in the cluster.
1472

1473
    """
1474
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1475
    assert self.instance is not None, \
1476
      "Cannot retrieve locked instance %s" % self.op.instance_name
1477
    node_uuids = list(self.instance.all_nodes)
1478
    for node_uuid in node_uuids:
1479
      CheckNodeOnline(self, node_uuid)
1480
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1481

    
1482
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1483
      raise errors.OpPrereqError("Instance's disk layout does not support"
1484
                                 " growing", errors.ECODE_INVAL)
1485

    
1486
    self.disk = self.instance.FindDisk(self.op.disk)
1487

    
1488
    if self.op.absolute:
1489
      self.target = self.op.amount
1490
      self.delta = self.target - self.disk.size
1491
      if self.delta < 0:
1492
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1493
                                   "current disk size (%s)" %
1494
                                   (utils.FormatUnit(self.target, "h"),
1495
                                    utils.FormatUnit(self.disk.size, "h")),
1496
                                   errors.ECODE_STATE)
1497
    else:
1498
      self.delta = self.op.amount
1499
      self.target = self.disk.size + self.delta
1500
      if self.delta < 0:
1501
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1502
                                   utils.FormatUnit(self.delta, "h"),
1503
                                   errors.ECODE_INVAL)
1504

    
1505
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1506

    
1507
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1508
    template = self.instance.disk_template
1509
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1510
        not any(self.node_es_flags.values())):
1511
      # TODO: check the free disk space for file, when that feature will be
1512
      # supported
1513
      # With exclusive storage we need to do something smarter than just looking
1514
      # at free space, which, in the end, is basically a dry run. So we rely on
1515
      # the dry run performed in Exec() instead.
1516
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1517

    
1518
  def Exec(self, feedback_fn):
1519
    """Execute disk grow.
1520

1521
    """
1522
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1523
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1524
            self.owned_locks(locking.LEVEL_NODE_RES))
1525

    
1526
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1527

    
1528
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1529
    if not disks_ok:
1530
      raise errors.OpExecError("Cannot activate block device to grow")
1531

    
1532
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1533
                (self.op.disk, self.instance.name,
1534
                 utils.FormatUnit(self.delta, "h"),
1535
                 utils.FormatUnit(self.target, "h")))
1536

    
1537
    # First run all grow ops in dry-run mode
1538
    for node_uuid in self.instance.all_nodes:
1539
      result = self.rpc.call_blockdev_grow(node_uuid,
1540
                                           (self.disk, self.instance),
1541
                                           self.delta, True, True,
1542
                                           self.node_es_flags[node_uuid])
1543
      result.Raise("Dry-run grow request failed to node %s" %
1544
                   self.cfg.GetNodeName(node_uuid))
1545

    
1546
    if wipe_disks:
1547
      # Get disk size from primary node for wiping
1548
      result = self.rpc.call_blockdev_getdimensions(
1549
                 self.instance.primary_node, [([self.disk], self.instance)])
1550
      result.Raise("Failed to retrieve disk size from node '%s'" %
1551
                   self.instance.primary_node)
1552

    
1553
      (disk_dimensions, ) = result.payload
1554

    
1555
      if disk_dimensions is None:
1556
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1557
                                 " node '%s'" % self.instance.primary_node)
1558
      (disk_size_in_bytes, _) = disk_dimensions
1559

    
1560
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1561

    
1562
      assert old_disk_size >= self.disk.size, \
1563
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1564
         (old_disk_size, self.disk.size))
1565
    else:
1566
      old_disk_size = None
1567

    
1568
    # We know that (as far as we can test) operations across different
1569
    # nodes will succeed, time to run it for real on the backing storage
1570
    for node_uuid in self.instance.all_nodes:
1571
      result = self.rpc.call_blockdev_grow(node_uuid,
1572
                                           (self.disk, self.instance),
1573
                                           self.delta, False, True,
1574
                                           self.node_es_flags[node_uuid])
1575
      result.Raise("Grow request failed to node %s" %
1576
                   self.cfg.GetNodeName(node_uuid))
1577

    
1578
    # And now execute it for logical storage, on the primary node
1579
    node_uuid = self.instance.primary_node
1580
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1581
                                         self.delta, False, False,
1582
                                         self.node_es_flags[node_uuid])
1583
    result.Raise("Grow request failed to node %s" %
1584
                 self.cfg.GetNodeName(node_uuid))
1585

    
1586
    self.disk.RecordGrow(self.delta)
1587
    self.cfg.Update(self.instance, feedback_fn)
1588

    
1589
    # Changes have been recorded, release node lock
1590
    ReleaseLocks(self, locking.LEVEL_NODE)
1591

    
1592
    # Downgrade lock while waiting for sync
1593
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1594

    
1595
    assert wipe_disks ^ (old_disk_size is None)
1596

    
1597
    if wipe_disks:
1598
      assert self.instance.disks[self.op.disk] == self.disk
1599

    
1600
      # Wipe newly added disk space
1601
      WipeDisks(self, self.instance,
1602
                disks=[(self.op.disk, self.disk, old_disk_size)])
1603

    
1604
    if self.op.wait_for_sync:
1605
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1606
      if disk_abort:
1607
        self.LogWarning("Disk syncing has not returned a good status; check"
1608
                        " the instance")
1609
      if not self.instance.disks_active:
1610
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1611
    elif not self.instance.disks_active:
1612
      self.LogWarning("Not shutting down the disk even if the instance is"
1613
                      " not supposed to be running because no wait for"
1614
                      " sync mode was requested")
1615

    
1616
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1617
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1618

    
1619

    
1620
class LUInstanceReplaceDisks(LogicalUnit):
1621
  """Replace the disks of an instance.
1622

1623
  """
1624
  HPATH = "mirrors-replace"
1625
  HTYPE = constants.HTYPE_INSTANCE
1626
  REQ_BGL = False
1627

    
1628
  def CheckArguments(self):
1629
    """Check arguments.
1630

1631
    """
1632
    if self.op.mode == constants.REPLACE_DISK_CHG:
1633
      if self.op.remote_node is None and self.op.iallocator is None:
1634
        raise errors.OpPrereqError("When changing the secondary either an"
1635
                                   " iallocator script must be used or the"
1636
                                   " new node given", errors.ECODE_INVAL)
1637
      else:
1638
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1639

    
1640
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1641
      # Not replacing the secondary
1642
      raise errors.OpPrereqError("The iallocator and new node options can"
1643
                                 " only be used when changing the"
1644
                                 " secondary node", errors.ECODE_INVAL)
1645

    
1646
  def ExpandNames(self):
1647
    self._ExpandAndLockInstance()
1648

    
1649
    assert locking.LEVEL_NODE not in self.needed_locks
1650
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1651
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1652

    
1653
    assert self.op.iallocator is None or self.op.remote_node is None, \
1654
      "Conflicting options"
1655

    
1656
    if self.op.remote_node is not None:
1657
      (self.op.remote_node_uuid, self.op.remote_node) = \
1658
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1659
                              self.op.remote_node)
1660

    
1661
      # Warning: do not remove the locking of the new secondary here
1662
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1663
      # currently it doesn't since parallel invocations of
1664
      # FindUnusedMinor will conflict
1665
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1666
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1667
    else:
1668
      self.needed_locks[locking.LEVEL_NODE] = []
1669
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1670

    
1671
      if self.op.iallocator is not None:
1672
        # iallocator will select a new node in the same group
1673
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1674
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1675

    
1676
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1677

    
1678
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1679
                                   self.op.instance_name, self.op.mode,
1680
                                   self.op.iallocator, self.op.remote_node_uuid,
1681
                                   self.op.disks, self.op.early_release,
1682
                                   self.op.ignore_ipolicy)
1683

    
1684
    self.tasklets = [self.replacer]
1685

    
1686
  def DeclareLocks(self, level):
1687
    if level == locking.LEVEL_NODEGROUP:
1688
      assert self.op.remote_node_uuid is None
1689
      assert self.op.iallocator is not None
1690
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1691

    
1692
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1693
      # Lock all groups used by instance optimistically; this requires going
1694
      # via the node before it's locked, requiring verification later on
1695
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1696
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1697

    
1698
    elif level == locking.LEVEL_NODE:
1699
      if self.op.iallocator is not None:
1700
        assert self.op.remote_node_uuid is None
1701
        assert not self.needed_locks[locking.LEVEL_NODE]
1702
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1703

    
1704
        # Lock member nodes of all locked groups
1705
        self.needed_locks[locking.LEVEL_NODE] = \
1706
          [node_uuid
1707
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1708
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1709
      else:
1710
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1711

    
1712
        self._LockInstancesNodes()
1713

    
1714
    elif level == locking.LEVEL_NODE_RES:
1715
      # Reuse node locks
1716
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1717
        self.needed_locks[locking.LEVEL_NODE]
1718

    
1719
  def BuildHooksEnv(self):
1720
    """Build hooks env.
1721

1722
    This runs on the master, the primary and all the secondaries.
1723

1724
    """
1725
    instance = self.replacer.instance
1726
    env = {
1727
      "MODE": self.op.mode,
1728
      "NEW_SECONDARY": self.op.remote_node,
1729
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1730
      }
1731
    env.update(BuildInstanceHookEnvByObject(self, instance))
1732
    return env
1733

    
1734
  def BuildHooksNodes(self):
1735
    """Build hooks nodes.
1736

1737
    """
1738
    instance = self.replacer.instance
1739
    nl = [
1740
      self.cfg.GetMasterNode(),
1741
      instance.primary_node,
1742
      ]
1743
    if self.op.remote_node_uuid is not None:
1744
      nl.append(self.op.remote_node_uuid)
1745
    return nl, nl
1746

    
1747
  def CheckPrereq(self):
1748
    """Check prerequisites.
1749

1750
    """
1751
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1752
            self.op.iallocator is None)
1753

    
1754
    # Verify if node group locks are still correct
1755
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1756
    if owned_groups:
1757
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1758

    
1759
    return LogicalUnit.CheckPrereq(self)
1760

    
1761

    
1762
class LUInstanceActivateDisks(NoHooksLU):
1763
  """Bring up an instance's disks.
1764

1765
  """
1766
  REQ_BGL = False
1767

    
1768
  def ExpandNames(self):
1769
    self._ExpandAndLockInstance()
1770
    self.needed_locks[locking.LEVEL_NODE] = []
1771
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1772

    
1773
  def DeclareLocks(self, level):
1774
    if level == locking.LEVEL_NODE:
1775
      self._LockInstancesNodes()
1776

    
1777
  def CheckPrereq(self):
1778
    """Check prerequisites.
1779

1780
    This checks that the instance is in the cluster.
1781

1782
    """
1783
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1784
    assert self.instance is not None, \
1785
      "Cannot retrieve locked instance %s" % self.op.instance_name
1786
    CheckNodeOnline(self, self.instance.primary_node)
1787

    
1788
  def Exec(self, feedback_fn):
1789
    """Activate the disks.
1790

1791
    """
1792
    disks_ok, disks_info = \
1793
              AssembleInstanceDisks(self, self.instance,
1794
                                    ignore_size=self.op.ignore_size)
1795
    if not disks_ok:
1796
      raise errors.OpExecError("Cannot activate block devices")
1797

    
1798
    if self.op.wait_for_sync:
1799
      if not WaitForSync(self, self.instance):
1800
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1801
        raise errors.OpExecError("Some disks of the instance are degraded!")
1802

    
1803
    return disks_info
1804

    
1805

    
1806
class LUInstanceDeactivateDisks(NoHooksLU):
1807
  """Shutdown an instance's disks.
1808

1809
  """
1810
  REQ_BGL = False
1811

    
1812
  def ExpandNames(self):
1813
    self._ExpandAndLockInstance()
1814
    self.needed_locks[locking.LEVEL_NODE] = []
1815
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1816

    
1817
  def DeclareLocks(self, level):
1818
    if level == locking.LEVEL_NODE:
1819
      self._LockInstancesNodes()
1820

    
1821
  def CheckPrereq(self):
1822
    """Check prerequisites.
1823

1824
    This checks that the instance is in the cluster.
1825

1826
    """
1827
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1828
    assert self.instance is not None, \
1829
      "Cannot retrieve locked instance %s" % self.op.instance_name
1830

    
1831
  def Exec(self, feedback_fn):
1832
    """Deactivate the disks
1833

1834
    """
1835
    if self.op.force:
1836
      ShutdownInstanceDisks(self, self.instance)
1837
    else:
1838
      _SafeShutdownInstanceDisks(self, self.instance)
1839

    
1840

    
1841
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1842
                               ldisk=False):
1843
  """Check that mirrors are not degraded.
1844

1845
  @attention: The device has to be annotated already.
1846

1847
  The ldisk parameter, if True, will change the test from the
1848
  is_degraded attribute (which represents overall non-ok status for
1849
  the device(s)) to the ldisk (representing the local storage status).
1850

1851
  """
1852
  result = True
1853

    
1854
  if on_primary or dev.AssembleOnSecondary():
1855
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1856
    msg = rstats.fail_msg
1857
    if msg:
1858
      lu.LogWarning("Can't find disk on node %s: %s",
1859
                    lu.cfg.GetNodeName(node_uuid), msg)
1860
      result = False
1861
    elif not rstats.payload:
1862
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1863
      result = False
1864
    else:
1865
      if ldisk:
1866
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1867
      else:
1868
        result = result and not rstats.payload.is_degraded
1869

    
1870
  if dev.children:
1871
    for child in dev.children:
1872
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1873
                                                     node_uuid, on_primary)
1874

    
1875
  return result
1876

    
1877

    
1878
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1879
  """Wrapper around L{_CheckDiskConsistencyInner}.
1880

1881
  """
1882
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1883
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1884
                                    ldisk=ldisk)
1885

    
1886

    
1887
def _BlockdevFind(lu, node_uuid, dev, instance):
1888
  """Wrapper around call_blockdev_find to annotate diskparams.
1889

1890
  @param lu: A reference to the lu object
1891
  @param node_uuid: The node to call out
1892
  @param dev: The device to find
1893
  @param instance: The instance object the device belongs to
1894
  @returns The result of the rpc call
1895

1896
  """
1897
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1898
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1899

    
1900

    
1901
def _GenerateUniqueNames(lu, exts):
1902
  """Generate a suitable LV name.
1903

1904
  This will generate a logical volume name for the given instance.
1905

1906
  """
1907
  results = []
1908
  for val in exts:
1909
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1910
    results.append("%s%s" % (new_id, val))
1911
  return results
1912

    
1913

    
1914
class TLReplaceDisks(Tasklet):
1915
  """Replaces disks for an instance.
1916

1917
  Note: Locking is not within the scope of this class.
1918

1919
  """
1920
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1921
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1922
    """Initializes this class.
1923

1924
    """
1925
    Tasklet.__init__(self, lu)
1926

    
1927
    # Parameters
1928
    self.instance_uuid = instance_uuid
1929
    self.instance_name = instance_name
1930
    self.mode = mode
1931
    self.iallocator_name = iallocator_name
1932
    self.remote_node_uuid = remote_node_uuid
1933
    self.disks = disks
1934
    self.early_release = early_release
1935
    self.ignore_ipolicy = ignore_ipolicy
1936

    
1937
    # Runtime data
1938
    self.instance = None
1939
    self.new_node_uuid = None
1940
    self.target_node_uuid = None
1941
    self.other_node_uuid = None
1942
    self.remote_node_info = None
1943
    self.node_secondary_ip = None
1944

    
1945
  @staticmethod
1946
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1947
                    relocate_from_node_uuids):
1948
    """Compute a new secondary node using an IAllocator.
1949

1950
    """
1951
    req = iallocator.IAReqRelocate(
1952
          inst_uuid=instance_uuid,
1953
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1954
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1955

    
1956
    ial.Run(iallocator_name)
1957

    
1958
    if not ial.success:
1959
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1960
                                 " %s" % (iallocator_name, ial.info),
1961
                                 errors.ECODE_NORES)
1962

    
1963
    remote_node_name = ial.result[0]
1964
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1965

    
1966
    if remote_node is None:
1967
      raise errors.OpPrereqError("Node %s not found in configuration" %
1968
                                 remote_node_name, errors.ECODE_NOENT)
1969

    
1970
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1971
               instance_uuid, remote_node_name)
1972

    
1973
    return remote_node.uuid
1974

    
1975
  def _FindFaultyDisks(self, node_uuid):
1976
    """Wrapper for L{FindFaultyInstanceDisks}.
1977

1978
    """
1979
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1980
                                   node_uuid, True)
1981

    
1982
  def _CheckDisksActivated(self, instance):
1983
    """Checks if the instance disks are activated.
1984

1985
    @param instance: The instance to check disks
1986
    @return: True if they are activated, False otherwise
1987

1988
    """
1989
    node_uuids = instance.all_nodes
1990

    
1991
    for idx, dev in enumerate(instance.disks):
1992
      for node_uuid in node_uuids:
1993
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1994
                        self.cfg.GetNodeName(node_uuid))
1995

    
1996
        result = _BlockdevFind(self, node_uuid, dev, instance)
1997

    
1998
        if result.offline:
1999
          continue
2000
        elif result.fail_msg or not result.payload:
2001
          return False
2002

    
2003
    return True
2004

    
2005
  def CheckPrereq(self):
2006
    """Check prerequisites.
2007

2008
    This checks that the instance is in the cluster.
2009

2010
    """
2011
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
2012
    assert self.instance is not None, \
2013
      "Cannot retrieve locked instance %s" % self.instance_name
2014

    
2015
    if self.instance.disk_template != constants.DT_DRBD8:
2016
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
2017
                                 " instances", errors.ECODE_INVAL)
2018

    
2019
    if len(self.instance.secondary_nodes) != 1:
2020
      raise errors.OpPrereqError("The instance has a strange layout,"
2021
                                 " expected one secondary but found %d" %
2022
                                 len(self.instance.secondary_nodes),
2023
                                 errors.ECODE_FAULT)
2024

    
2025
    secondary_node_uuid = self.instance.secondary_nodes[0]
2026

    
2027
    if self.iallocator_name is None:
2028
      remote_node_uuid = self.remote_node_uuid
2029
    else:
2030
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
2031
                                            self.instance.uuid,
2032
                                            self.instance.secondary_nodes)
2033

    
2034
    if remote_node_uuid is None:
2035
      self.remote_node_info = None
2036
    else:
2037
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
2038
             "Remote node '%s' is not locked" % remote_node_uuid
2039

    
2040
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2041
      assert self.remote_node_info is not None, \
2042
        "Cannot retrieve locked node %s" % remote_node_uuid
2043

    
2044
    if remote_node_uuid == self.instance.primary_node:
2045
      raise errors.OpPrereqError("The specified node is the primary node of"
2046
                                 " the instance", errors.ECODE_INVAL)
2047

    
2048
    if remote_node_uuid == secondary_node_uuid:
2049
      raise errors.OpPrereqError("The specified node is already the"
2050
                                 " secondary node of the instance",
2051
                                 errors.ECODE_INVAL)
2052

    
2053
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2054
                                    constants.REPLACE_DISK_CHG):
2055
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2056
                                 errors.ECODE_INVAL)
2057

    
2058
    if self.mode == constants.REPLACE_DISK_AUTO:
2059
      if not self._CheckDisksActivated(self.instance):
2060
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2061
                                   " first" % self.instance_name,
2062
                                   errors.ECODE_STATE)
2063
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2064
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2065

    
2066
      if faulty_primary and faulty_secondary:
2067
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2068
                                   " one node and can not be repaired"
2069
                                   " automatically" % self.instance_name,
2070
                                   errors.ECODE_STATE)
2071

    
2072
      if faulty_primary:
2073
        self.disks = faulty_primary
2074
        self.target_node_uuid = self.instance.primary_node
2075
        self.other_node_uuid = secondary_node_uuid
2076
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2077
      elif faulty_secondary:
2078
        self.disks = faulty_secondary
2079
        self.target_node_uuid = secondary_node_uuid
2080
        self.other_node_uuid = self.instance.primary_node
2081
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2082
      else:
2083
        self.disks = []
2084
        check_nodes = []
2085

    
2086
    else:
2087
      # Non-automatic modes
2088
      if self.mode == constants.REPLACE_DISK_PRI:
2089
        self.target_node_uuid = self.instance.primary_node
2090
        self.other_node_uuid = secondary_node_uuid
2091
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2092

    
2093
      elif self.mode == constants.REPLACE_DISK_SEC:
2094
        self.target_node_uuid = secondary_node_uuid
2095
        self.other_node_uuid = self.instance.primary_node
2096
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2097

    
2098
      elif self.mode == constants.REPLACE_DISK_CHG:
2099
        self.new_node_uuid = remote_node_uuid
2100
        self.other_node_uuid = self.instance.primary_node
2101
        self.target_node_uuid = secondary_node_uuid
2102
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2103

    
2104
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2105
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2106

    
2107
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2108
        assert old_node_info is not None
2109
        if old_node_info.offline and not self.early_release:
2110
          # doesn't make sense to delay the release
2111
          self.early_release = True
2112
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2113
                          " early-release mode", secondary_node_uuid)
2114

    
2115
      else:
2116
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2117
                                     self.mode)
2118

    
2119
      # If not specified all disks should be replaced
2120
      if not self.disks:
2121
        self.disks = range(len(self.instance.disks))
2122

    
2123
    # TODO: This is ugly, but right now we can't distinguish between internal
2124
    # submitted opcode and external one. We should fix that.
2125
    if self.remote_node_info:
2126
      # We change the node, lets verify it still meets instance policy
2127
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2128
      cluster = self.cfg.GetClusterInfo()
2129
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2130
                                                              new_group_info)
2131
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2132
                             self.remote_node_info, self.cfg,
2133
                             ignore=self.ignore_ipolicy)
2134

    
2135
    for node_uuid in check_nodes:
2136
      CheckNodeOnline(self.lu, node_uuid)
2137

    
2138
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2139
                                                          self.other_node_uuid,
2140
                                                          self.target_node_uuid]
2141
                              if node_uuid is not None)
2142

    
2143
    # Release unneeded node and node resource locks
2144
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2145
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2146
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2147

    
2148
    # Release any owned node group
2149
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2150

    
2151
    # Check whether disks are valid
2152
    for disk_idx in self.disks:
2153
      self.instance.FindDisk(disk_idx)
2154

    
2155
    # Get secondary node IP addresses
2156
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2157
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2158

    
2159
  def Exec(self, feedback_fn):
2160
    """Execute disk replacement.
2161

2162
    This dispatches the disk replacement to the appropriate handler.
2163

2164
    """
2165
    if __debug__:
2166
      # Verify owned locks before starting operation
2167
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2168
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2169
          ("Incorrect node locks, owning %s, expected %s" %
2170
           (owned_nodes, self.node_secondary_ip.keys()))
2171
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2172
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2173
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2174

    
2175
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2176
      assert list(owned_instances) == [self.instance_name], \
2177
          "Instance '%s' not locked" % self.instance_name
2178

    
2179
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2180
          "Should not own any node group lock at this point"
2181

    
2182
    if not self.disks:
2183
      feedback_fn("No disks need replacement for instance '%s'" %
2184
                  self.instance.name)
2185
      return
2186

    
2187
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2188
                (utils.CommaJoin(self.disks), self.instance.name))
2189
    feedback_fn("Current primary node: %s" %
2190
                self.cfg.GetNodeName(self.instance.primary_node))
2191
    feedback_fn("Current seconary node: %s" %
2192
                utils.CommaJoin(self.cfg.GetNodeNames(
2193
                                  self.instance.secondary_nodes)))
2194

    
2195
    activate_disks = not self.instance.disks_active
2196

    
2197
    # Activate the instance disks if we're replacing them on a down instance
2198
    if activate_disks:
2199
      StartInstanceDisks(self.lu, self.instance, True)
2200

    
2201
    try:
2202
      # Should we replace the secondary node?
2203
      if self.new_node_uuid is not None:
2204
        fn = self._ExecDrbd8Secondary
2205
      else:
2206
        fn = self._ExecDrbd8DiskOnly
2207

    
2208
      result = fn(feedback_fn)
2209
    finally:
2210
      # Deactivate the instance disks if we're replacing them on a
2211
      # down instance
2212
      if activate_disks:
2213
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2214

    
2215
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2216

    
2217
    if __debug__:
2218
      # Verify owned locks
2219
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2220
      nodes = frozenset(self.node_secondary_ip)
2221
      assert ((self.early_release and not owned_nodes) or
2222
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2223
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2224
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2225

    
2226
    return result
2227

    
2228
  def _CheckVolumeGroup(self, node_uuids):
2229
    self.lu.LogInfo("Checking volume groups")
2230

    
2231
    vgname = self.cfg.GetVGName()
2232

    
2233
    # Make sure volume group exists on all involved nodes
2234
    results = self.rpc.call_vg_list(node_uuids)
2235
    if not results:
2236
      raise errors.OpExecError("Can't list volume groups on the nodes")
2237

    
2238
    for node_uuid in node_uuids:
2239
      res = results[node_uuid]
2240
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2241
      if vgname not in res.payload:
2242
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2243
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2244

    
2245
  def _CheckDisksExistence(self, node_uuids):
2246
    # Check disk existence
2247
    for idx, dev in enumerate(self.instance.disks):
2248
      if idx not in self.disks:
2249
        continue
2250

    
2251
      for node_uuid in node_uuids:
2252
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2253
                        self.cfg.GetNodeName(node_uuid))
2254

    
2255
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2256

    
2257
        msg = result.fail_msg
2258
        if msg or not result.payload:
2259
          if not msg:
2260
            msg = "disk not found"
2261
          if not self._CheckDisksActivated(self.instance):
2262
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2263
                          " running activate-disks on the instance before"
2264
                          " using replace-disks.")
2265
          else:
2266
            extra_hint = ""
2267
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2268
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2269
                                    extra_hint))
2270

    
2271
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2272
    for idx, dev in enumerate(self.instance.disks):
2273
      if idx not in self.disks:
2274
        continue
2275

    
2276
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2277
                      (idx, self.cfg.GetNodeName(node_uuid)))
2278

    
2279
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2280
                                  on_primary, ldisk=ldisk):
2281
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2282
                                 " replace disks for instance %s" %
2283
                                 (self.cfg.GetNodeName(node_uuid),
2284
                                  self.instance.name))
2285

    
2286
  def _CreateNewStorage(self, node_uuid):
2287
    """Create new storage on the primary or secondary node.
2288

2289
    This is only used for same-node replaces, not for changing the
2290
    secondary node, hence we don't want to modify the existing disk.
2291

2292
    """
2293
    iv_names = {}
2294

    
2295
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2296
    for idx, dev in enumerate(disks):
2297
      if idx not in self.disks:
2298
        continue
2299

    
2300
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2301
                      self.cfg.GetNodeName(node_uuid), idx)
2302

    
2303
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2304
      names = _GenerateUniqueNames(self.lu, lv_names)
2305

    
2306
      (data_disk, meta_disk) = dev.children
2307
      vg_data = data_disk.logical_id[0]
2308
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2309
                             logical_id=(vg_data, names[0]),
2310
                             params=data_disk.params)
2311
      vg_meta = meta_disk.logical_id[0]
2312
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2313
                             size=constants.DRBD_META_SIZE,
2314
                             logical_id=(vg_meta, names[1]),
2315
                             params=meta_disk.params)
2316

    
2317
      new_lvs = [lv_data, lv_meta]
2318
      old_lvs = [child.Copy() for child in dev.children]
2319
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2320
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2321

    
2322
      # we pass force_create=True to force the LVM creation
2323
      for new_lv in new_lvs:
2324
        try:
2325
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2326
                               GetInstanceInfoText(self.instance), False,
2327
                               excl_stor)
2328
        except errors.DeviceCreationError, e:
2329
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2330

    
2331
    return iv_names
2332

    
2333
  def _CheckDevices(self, node_uuid, iv_names):
2334
    for name, (dev, _, _) in iv_names.iteritems():
2335
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2336

    
2337
      msg = result.fail_msg
2338
      if msg or not result.payload:
2339
        if not msg:
2340
          msg = "disk not found"
2341
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2342
                                 (name, msg))
2343

    
2344
      if result.payload.is_degraded:
2345
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2346

    
2347
  def _RemoveOldStorage(self, node_uuid, iv_names):
2348
    for name, (_, old_lvs, _) in iv_names.iteritems():
2349
      self.lu.LogInfo("Remove logical volumes for %s", name)
2350

    
2351
      for lv in old_lvs:
2352
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2353
                .fail_msg
2354
        if msg:
2355
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2356
                             hint="remove unused LVs manually")
2357

    
2358
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2359
    """Replace a disk on the primary or secondary for DRBD 8.
2360

2361
    The algorithm for replace is quite complicated:
2362

2363
      1. for each disk to be replaced:
2364

2365
        1. create new LVs on the target node with unique names
2366
        1. detach old LVs from the drbd device
2367
        1. rename old LVs to name_replaced.<time_t>
2368
        1. rename new LVs to old LVs
2369
        1. attach the new LVs (with the old names now) to the drbd device
2370

2371
      1. wait for sync across all devices
2372

2373
      1. for each modified disk:
2374

2375
        1. remove old LVs (which have the name name_replaces.<time_t>)
2376

2377
    Failures are not very well handled.
2378

2379
    """
2380
    steps_total = 6
2381

    
2382
    # Step: check device activation
2383
    self.lu.LogStep(1, steps_total, "Check device existence")
2384
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2385
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2386

    
2387
    # Step: check other node consistency
2388
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2389
    self._CheckDisksConsistency(
2390
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2391
      False)
2392

    
2393
    # Step: create new storage
2394
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2395
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2396

    
2397
    # Step: for each lv, detach+rename*2+attach
2398
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2399
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2400
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2401

    
2402
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2403
                                                     (dev, self.instance),
2404
                                                     (old_lvs, self.instance))
2405
      result.Raise("Can't detach drbd from local storage on node"
2406
                   " %s for device %s" %
2407
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2408
      #dev.children = []
2409
      #cfg.Update(instance)
2410

    
2411
      # ok, we created the new LVs, so now we know we have the needed
2412
      # storage; as such, we proceed on the target node to rename
2413
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2414
      # using the assumption that logical_id == unique_id on that node
2415

    
2416
      # FIXME(iustin): use a better name for the replaced LVs
2417
      temp_suffix = int(time.time())
2418
      ren_fn = lambda d, suff: (d.logical_id[0],
2419
                                d.logical_id[1] + "_replaced-%s" % suff)
2420

    
2421
      # Build the rename list based on what LVs exist on the node
2422
      rename_old_to_new = []
2423
      for to_ren in old_lvs:
2424
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2425
                                             (to_ren, self.instance))
2426
        if not result.fail_msg and result.payload:
2427
          # device exists
2428
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2429

    
2430
      self.lu.LogInfo("Renaming the old LVs on the target node")
2431
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2432
                                             rename_old_to_new)
2433
      result.Raise("Can't rename old LVs on node %s" %
2434
                   self.cfg.GetNodeName(self.target_node_uuid))
2435

    
2436
      # Now we rename the new LVs to the old LVs
2437
      self.lu.LogInfo("Renaming the new LVs on the target node")
2438
      rename_new_to_old = [(new, old.logical_id)
2439
                           for old, new in zip(old_lvs, new_lvs)]
2440
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2441
                                             rename_new_to_old)
2442
      result.Raise("Can't rename new LVs on node %s" %
2443
                   self.cfg.GetNodeName(self.target_node_uuid))
2444

    
2445
      # Intermediate steps of in memory modifications
2446
      for old, new in zip(old_lvs, new_lvs):
2447
        new.logical_id = old.logical_id
2448

    
2449
      # We need to modify old_lvs so that removal later removes the
2450
      # right LVs, not the newly added ones; note that old_lvs is a
2451
      # copy here
2452
      for disk in old_lvs:
2453
        disk.logical_id = ren_fn(disk, temp_suffix)
2454

    
2455
      # Now that the new lvs have the old name, we can add them to the device
2456
      self.lu.LogInfo("Adding new mirror component on %s",
2457
                      self.cfg.GetNodeName(self.target_node_uuid))
2458
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2459
                                                  (dev, self.instance),
2460
                                                  (new_lvs, self.instance))
2461
      msg = result.fail_msg
2462
      if msg:
2463
        for new_lv in new_lvs:
2464
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2465
                                               (new_lv, self.instance)).fail_msg
2466
          if msg2:
2467
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2468
                               hint=("cleanup manually the unused logical"
2469
                                     "volumes"))
2470
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2471

    
2472
    cstep = itertools.count(5)
2473

    
2474
    if self.early_release:
2475
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2476
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2477
      # TODO: Check if releasing locks early still makes sense
2478
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2479
    else:
2480
      # Release all resource locks except those used by the instance
2481
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2482
                   keep=self.node_secondary_ip.keys())
2483

    
2484
    # Release all node locks while waiting for sync
2485
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2486

    
2487
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2488
    # shutdown in the caller into consideration.
2489

    
2490
    # Wait for sync
2491
    # This can fail as the old devices are degraded and _WaitForSync
2492
    # does a combined result over all disks, so we don't check its return value
2493
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2494
    WaitForSync(self.lu, self.instance)
2495

    
2496
    # Check all devices manually
2497
    self._CheckDevices(self.instance.primary_node, iv_names)
2498

    
2499
    # Step: remove old storage
2500
    if not self.early_release:
2501
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2502
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2503

    
2504
  def _ExecDrbd8Secondary(self, feedback_fn):
2505
    """Replace the secondary node for DRBD 8.
2506

2507
    The algorithm for replace is quite complicated:
2508
      - for all disks of the instance:
2509
        - create new LVs on the new node with same names
2510
        - shutdown the drbd device on the old secondary
2511
        - disconnect the drbd network on the primary
2512
        - create the drbd device on the new secondary
2513
        - network attach the drbd on the primary, using an artifice:
2514
          the drbd code for Attach() will connect to the network if it
2515
          finds a device which is connected to the good local disks but
2516
          not network enabled
2517
      - wait for sync across all devices
2518
      - remove all disks from the old secondary
2519

2520
    Failures are not very well handled.
2521

2522
    """
2523
    steps_total = 6
2524

    
2525
    pnode = self.instance.primary_node
2526

    
2527
    # Step: check device activation
2528
    self.lu.LogStep(1, steps_total, "Check device existence")
2529
    self._CheckDisksExistence([self.instance.primary_node])
2530
    self._CheckVolumeGroup([self.instance.primary_node])
2531

    
2532
    # Step: check other node consistency
2533
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2534
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2535

    
2536
    # Step: create new storage
2537
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2538
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2539
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2540
                                                  self.new_node_uuid)
2541
    for idx, dev in enumerate(disks):
2542
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2543
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2544
      # we pass force_create=True to force LVM creation
2545
      for new_lv in dev.children:
2546
        try:
2547
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2548
                               new_lv, True, GetInstanceInfoText(self.instance),
2549
                               False, excl_stor)
2550
        except errors.DeviceCreationError, e:
2551
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2552

    
2553
    # Step 4: dbrd minors and drbd setups changes
2554
    # after this, we must manually remove the drbd minors on both the
2555
    # error and the success paths
2556
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2557
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2558
                                         for _ in self.instance.disks],
2559
                                        self.instance.uuid)
2560
    logging.debug("Allocated minors %r", minors)
2561

    
2562
    iv_names = {}
2563
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2564
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2565
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2566
      # create new devices on new_node; note that we create two IDs:
2567
      # one without port, so the drbd will be activated without
2568
      # networking information on the new node at this stage, and one
2569
      # with network, for the latter activation in step 4
2570
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2571
      if self.instance.primary_node == o_node1:
2572
        p_minor = o_minor1
2573
      else:
2574
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2575
        p_minor = o_minor2
2576

    
2577
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2578
                      p_minor, new_minor, o_secret)
2579
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2580
                    p_minor, new_minor, o_secret)
2581

    
2582
      iv_names[idx] = (dev, dev.children, new_net_id)
2583
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2584
                    new_net_id)
2585
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2586
                              logical_id=new_alone_id,
2587
                              children=dev.children,
2588
                              size=dev.size,
2589
                              params={})
2590
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2591
                                            self.cfg)
2592
      try:
2593
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2594
                             anno_new_drbd,
2595
                             GetInstanceInfoText(self.instance), False,
2596
                             excl_stor)
2597
      except errors.GenericError:
2598
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2599
        raise
2600

    
2601
    # We have new devices, shutdown the drbd on the old secondary
2602
    for idx, dev in enumerate(self.instance.disks):
2603
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2604
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2605
                                            (dev, self.instance)).fail_msg
2606
      if msg:
2607
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2608
                           "node: %s" % (idx, msg),
2609
                           hint=("Please cleanup this device manually as"
2610
                                 " soon as possible"))
2611

    
2612
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2613
    result = self.rpc.call_drbd_disconnect_net(
2614
               [pnode], (self.instance.disks, self.instance))[pnode]
2615

    
2616
    msg = result.fail_msg
2617
    if msg:
2618
      # detaches didn't succeed (unlikely)
2619
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2620
      raise errors.OpExecError("Can't detach the disks from the network on"
2621
                               " old node: %s" % (msg,))
2622

    
2623
    # if we managed to detach at least one, we update all the disks of
2624
    # the instance to point to the new secondary
2625
    self.lu.LogInfo("Updating instance configuration")
2626
    for dev, _, new_logical_id in iv_names.itervalues():
2627
      dev.logical_id = new_logical_id
2628

    
2629
    self.cfg.Update(self.instance, feedback_fn)
2630

    
2631
    # Release all node locks (the configuration has been updated)
2632
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2633

    
2634
    # and now perform the drbd attach
2635
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2636
                    " (standalone => connected)")
2637
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2638
                                            self.new_node_uuid],
2639
                                           (self.instance.disks, self.instance),
2640
                                           self.instance.name,
2641
                                           False)
2642
    for to_node, to_result in result.items():
2643
      msg = to_result.fail_msg
2644
      if msg:
2645
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2646
                           self.cfg.GetNodeName(to_node), msg,
2647
                           hint=("please do a gnt-instance info to see the"
2648
                                 " status of disks"))
2649

    
2650
    cstep = itertools.count(5)
2651

    
2652
    if self.early_release:
2653
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2654
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2655
      # TODO: Check if releasing locks early still makes sense
2656
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2657
    else:
2658
      # Release all resource locks except those used by the instance
2659
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2660
                   keep=self.node_secondary_ip.keys())
2661

    
2662
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2663
    # shutdown in the caller into consideration.
2664

    
2665
    # Wait for sync
2666
    # This can fail as the old devices are degraded and _WaitForSync
2667
    # does a combined result over all disks, so we don't check its return value
2668
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2669
    WaitForSync(self.lu, self.instance)
2670

    
2671
    # Check all devices manually
2672
    self._CheckDevices(self.instance.primary_node, iv_names)
2673

    
2674
    # Step: remove old storage
2675
    if not self.early_release:
2676
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2677
      self._RemoveOldStorage(self.target_node_uuid, iv_names)