Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 6ccce5d4

History | View | Annotate | Download (99.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
import ganeti.rpc.node as rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  constants.DT_FILE: ".file",
56
  constants.DT_SHARED_FILE: ".sharedfile",
57
  }
58

    
59

    
60
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61
                         excl_stor):
62
  """Create a single block device on a given node.
63

64
  This will not recurse over children of the device, so they must be
65
  created in advance.
66

67
  @param lu: the lu on whose behalf we execute
68
  @param node_uuid: the node on which to create the device
69
  @type instance: L{objects.Instance}
70
  @param instance: the instance which owns the device
71
  @type device: L{objects.Disk}
72
  @param device: the device to create
73
  @param info: the extra 'metadata' we should attach to the device
74
      (this will be represented as a LVM tag)
75
  @type force_open: boolean
76
  @param force_open: this parameter will be passes to the
77
      L{backend.BlockdevCreate} function where it specifies
78
      whether we run on primary or not, and it affects both
79
      the child assembly and the device own Open() execution
80
  @type excl_stor: boolean
81
  @param excl_stor: Whether exclusive_storage is active for the node
82

83
  """
84
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
85
                                       device.size, instance.name, force_open,
86
                                       info, excl_stor)
87
  result.Raise("Can't create block device %s on"
88
               " node %s for instance %s" % (device,
89
                                             lu.cfg.GetNodeName(node_uuid),
90
                                             instance.name))
91

    
92

    
93
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
94
                         info, force_open, excl_stor):
95
  """Create a tree of block devices on a given node.
96

97
  If this device type has to be created on secondaries, create it and
98
  all its children.
99

100
  If not, just recurse to children keeping the same 'force' value.
101

102
  @attention: The device has to be annotated already.
103

104
  @param lu: the lu on whose behalf we execute
105
  @param node_uuid: the node on which to create the device
106
  @type instance: L{objects.Instance}
107
  @param instance: the instance which owns the device
108
  @type device: L{objects.Disk}
109
  @param device: the device to create
110
  @type force_create: boolean
111
  @param force_create: whether to force creation of this device; this
112
      will be change to True whenever we find a device which has
113
      CreateOnSecondary() attribute
114
  @param info: the extra 'metadata' we should attach to the device
115
      (this will be represented as a LVM tag)
116
  @type force_open: boolean
117
  @param force_open: this parameter will be passes to the
118
      L{backend.BlockdevCreate} function where it specifies
119
      whether we run on primary or not, and it affects both
120
      the child assembly and the device own Open() execution
121
  @type excl_stor: boolean
122
  @param excl_stor: Whether exclusive_storage is active for the node
123

124
  @return: list of created devices
125
  """
126
  created_devices = []
127
  try:
128
    if device.CreateOnSecondary():
129
      force_create = True
130

    
131
    if device.children:
132
      for child in device.children:
133
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
134
                                    force_create, info, force_open, excl_stor)
135
        created_devices.extend(devs)
136

    
137
    if not force_create:
138
      return created_devices
139

    
140
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
141
                         excl_stor)
142
    # The device has been completely created, so there is no point in keeping
143
    # its subdevices in the list. We just add the device itself instead.
144
    created_devices = [(node_uuid, device)]
145
    return created_devices
146

    
147
  except errors.DeviceCreationError, e:
148
    e.created_devices.extend(created_devices)
149
    raise e
150
  except errors.OpExecError, e:
151
    raise errors.DeviceCreationError(str(e), created_devices)
152

    
153

    
154
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
155
  """Whether exclusive_storage is in effect for the given node.
156

157
  @type cfg: L{config.ConfigWriter}
158
  @param cfg: The cluster configuration
159
  @type node_uuid: string
160
  @param node_uuid: The node UUID
161
  @rtype: bool
162
  @return: The effective value of exclusive_storage
163
  @raise errors.OpPrereqError: if no node exists with the given name
164

165
  """
166
  ni = cfg.GetNodeInfo(node_uuid)
167
  if ni is None:
168
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
169
                               errors.ECODE_NOENT)
170
  return IsExclusiveStorageEnabledNode(cfg, ni)
171

    
172

    
173
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
174
                    force_open):
175
  """Wrapper around L{_CreateBlockDevInner}.
176

177
  This method annotates the root device first.
178

179
  """
180
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
181
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
182
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
183
                              force_open, excl_stor)
184

    
185

    
186
def _UndoCreateDisks(lu, disks_created, instance):
187
  """Undo the work performed by L{CreateDisks}.
188

189
  This function is called in case of an error to undo the work of
190
  L{CreateDisks}.
191

192
  @type lu: L{LogicalUnit}
193
  @param lu: the logical unit on whose behalf we execute
194
  @param disks_created: the result returned by L{CreateDisks}
195
  @type instance: L{objects.Instance}
196
  @param instance: the instance for which disks were created
197

198
  """
199
  for (node_uuid, disk) in disks_created:
200
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
201
    result.Warn("Failed to remove newly-created disk %s on node %s" %
202
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
203

    
204

    
205
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
206
  """Create all disks for an instance.
207

208
  This abstracts away some work from AddInstance.
209

210
  @type lu: L{LogicalUnit}
211
  @param lu: the logical unit on whose behalf we execute
212
  @type instance: L{objects.Instance}
213
  @param instance: the instance whose disks we should create
214
  @type to_skip: list
215
  @param to_skip: list of indices to skip
216
  @type target_node_uuid: string
217
  @param target_node_uuid: if passed, overrides the target node for creation
218
  @type disks: list of {objects.Disk}
219
  @param disks: the disks to create; if not specified, all the disks of the
220
      instance are created
221
  @return: information about the created disks, to be used to call
222
      L{_UndoCreateDisks}
223
  @raise errors.OpPrereqError: in case of error
224

225
  """
226
  info = GetInstanceInfoText(instance)
227
  if target_node_uuid is None:
228
    pnode_uuid = instance.primary_node
229
    all_node_uuids = instance.all_nodes
230
  else:
231
    pnode_uuid = target_node_uuid
232
    all_node_uuids = [pnode_uuid]
233

    
234
  if disks is None:
235
    disks = instance.disks
236

    
237
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
238

    
239
  if instance.disk_template in constants.DTS_FILEBASED:
240
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
241
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
242

    
243
    result.Raise("Failed to create directory '%s' on"
244
                 " node %s" % (file_storage_dir,
245
                               lu.cfg.GetNodeName(pnode_uuid)))
246

    
247
  disks_created = []
248
  for idx, device in enumerate(disks):
249
    if to_skip and idx in to_skip:
250
      continue
251
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
252
    for node_uuid in all_node_uuids:
253
      f_create = node_uuid == pnode_uuid
254
      try:
255
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
256
                        f_create)
257
        disks_created.append((node_uuid, device))
258
      except errors.DeviceCreationError, e:
259
        logging.warning("Creating disk %s for instance '%s' failed",
260
                        idx, instance.name)
261
        disks_created.extend(e.created_devices)
262
        _UndoCreateDisks(lu, disks_created, instance)
263
        raise errors.OpExecError(e.message)
264
  return disks_created
265

    
266

    
267
def ComputeDiskSizePerVG(disk_template, disks):
268
  """Compute disk size requirements in the volume group
269

270
  """
271
  def _compute(disks, payload):
272
    """Universal algorithm.
273

274
    """
275
    vgs = {}
276
    for disk in disks:
277
      vgs[disk[constants.IDISK_VG]] = \
278
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
279

    
280
    return vgs
281

    
282
  # Required free disk space as a function of disk and swap space
283
  req_size_dict = {
284
    constants.DT_DISKLESS: {},
285
    constants.DT_PLAIN: _compute(disks, 0),
286
    # 128 MB are added for drbd metadata for each disk
287
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
288
    constants.DT_FILE: {},
289
    constants.DT_SHARED_FILE: {},
290
    constants.DT_GLUSTER: {},
291
    }
292

    
293
  if disk_template not in req_size_dict:
294
    raise errors.ProgrammerError("Disk template '%s' size requirement"
295
                                 " is unknown" % disk_template)
296

    
297
  return req_size_dict[disk_template]
298

    
299

    
300
def ComputeDisks(op, default_vg):
301
  """Computes the instance disks.
302

303
  @param op: The instance opcode
304
  @param default_vg: The default_vg to assume
305

306
  @return: The computed disks
307

308
  """
309
  disks = []
310
  for disk in op.disks:
311
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
312
    if mode not in constants.DISK_ACCESS_SET:
313
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
314
                                 mode, errors.ECODE_INVAL)
315
    size = disk.get(constants.IDISK_SIZE, None)
316
    if size is None:
317
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
318
    try:
319
      size = int(size)
320
    except (TypeError, ValueError):
321
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
322
                                 errors.ECODE_INVAL)
323

    
324
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
325
    if ext_provider and op.disk_template != constants.DT_EXT:
326
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
327
                                 " disk template, not %s" %
328
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
329
                                  op.disk_template), errors.ECODE_INVAL)
330

    
331
    data_vg = disk.get(constants.IDISK_VG, default_vg)
332
    name = disk.get(constants.IDISK_NAME, None)
333
    if name is not None and name.lower() == constants.VALUE_NONE:
334
      name = None
335
    new_disk = {
336
      constants.IDISK_SIZE: size,
337
      constants.IDISK_MODE: mode,
338
      constants.IDISK_VG: data_vg,
339
      constants.IDISK_NAME: name,
340
      }
341

    
342
    for key in [
343
      constants.IDISK_METAVG,
344
      constants.IDISK_ADOPT,
345
      constants.IDISK_SPINDLES,
346
      ]:
347
      if key in disk:
348
        new_disk[key] = disk[key]
349

    
350
    # For extstorage, demand the `provider' option and add any
351
    # additional parameters (ext-params) to the dict
352
    if op.disk_template == constants.DT_EXT:
353
      if ext_provider:
354
        new_disk[constants.IDISK_PROVIDER] = ext_provider
355
        for key in disk:
356
          if key not in constants.IDISK_PARAMS:
357
            new_disk[key] = disk[key]
358
      else:
359
        raise errors.OpPrereqError("Missing provider for template '%s'" %
360
                                   constants.DT_EXT, errors.ECODE_INVAL)
361

    
362
    disks.append(new_disk)
363

    
364
  return disks
365

    
366

    
367
def CheckRADOSFreeSpace():
368
  """Compute disk size requirements inside the RADOS cluster.
369

370
  """
371
  # For the RADOS cluster we assume there is always enough space.
372
  pass
373

    
374

    
375
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
376
                         iv_name, p_minor, s_minor):
377
  """Generate a drbd8 device complete with its children.
378

379
  """
380
  assert len(vgnames) == len(names) == 2
381
  port = lu.cfg.AllocatePort()
382
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
383

    
384
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
385
                          logical_id=(vgnames[0], names[0]),
386
                          params={})
387
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
388
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
389
                          size=constants.DRBD_META_SIZE,
390
                          logical_id=(vgnames[1], names[1]),
391
                          params={})
392
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
393
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
394
                          logical_id=(primary_uuid, secondary_uuid, port,
395
                                      p_minor, s_minor,
396
                                      shared_secret),
397
                          children=[dev_data, dev_meta],
398
                          iv_name=iv_name, params={})
399
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400
  return drbd_dev
401

    
402

    
403
def GenerateDiskTemplate(
404
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
405
  disk_info, file_storage_dir, file_driver, base_index,
406
  feedback_fn, full_disk_params):
407
  """Generate the entire disk layout for a given template type.
408

409
  """
410
  vgname = lu.cfg.GetVGName()
411
  disk_count = len(disk_info)
412
  disks = []
413

    
414
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
415

    
416
  if template_name == constants.DT_DISKLESS:
417
    pass
418
  elif template_name == constants.DT_DRBD8:
419
    if len(secondary_node_uuids) != 1:
420
      raise errors.ProgrammerError("Wrong template configuration")
421
    remote_node_uuid = secondary_node_uuids[0]
422
    minors = lu.cfg.AllocateDRBDMinor(
423
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
424

    
425
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426
                                                       full_disk_params)
427
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428

    
429
    names = []
430
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431
                                               for i in range(disk_count)]):
432
      names.append(lv_prefix + "_data")
433
      names.append(lv_prefix + "_meta")
434
    for idx, disk in enumerate(disk_info):
435
      disk_index = idx + base_index
436
      data_vg = disk.get(constants.IDISK_VG, vgname)
437
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
439
                                      disk[constants.IDISK_SIZE],
440
                                      [data_vg, meta_vg],
441
                                      names[idx * 2:idx * 2 + 2],
442
                                      "disk/%d" % disk_index,
443
                                      minors[idx * 2], minors[idx * 2 + 1])
444
      disk_dev.mode = disk[constants.IDISK_MODE]
445
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
446
      disks.append(disk_dev)
447
  else:
448
    if secondary_node_uuids:
449
      raise errors.ProgrammerError("Wrong template configuration")
450

    
451
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
452
    if name_prefix is None:
453
      names = None
454
    else:
455
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
456
                                        (name_prefix, base_index + i)
457
                                        for i in range(disk_count)])
458

    
459
    if template_name == constants.DT_PLAIN:
460

    
461
      def logical_id_fn(idx, _, disk):
462
        vg = disk.get(constants.IDISK_VG, vgname)
463
        return (vg, names[idx])
464

    
465
    elif template_name == constants.DT_GLUSTER:
466
      logical_id_fn = lambda _1, disk_index, _2: \
467
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
468
                                        disk_index))
469

    
470
    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
471
      logical_id_fn = \
472
        lambda _, disk_index, disk: (file_driver,
473
                                     "%s/%s" % (file_storage_dir,
474
                                                names[idx]))
475
    elif template_name == constants.DT_BLOCK:
476
      logical_id_fn = \
477
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478
                                       disk[constants.IDISK_ADOPT])
479
    elif template_name == constants.DT_RBD:
480
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481
    elif template_name == constants.DT_EXT:
482
      def logical_id_fn(idx, _, disk):
483
        provider = disk.get(constants.IDISK_PROVIDER, None)
484
        if provider is None:
485
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486
                                       " not found", constants.DT_EXT,
487
                                       constants.IDISK_PROVIDER)
488
        return (provider, names[idx])
489
    else:
490
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491

    
492
    dev_type = template_name
493

    
494
    for idx, disk in enumerate(disk_info):
495
      params = {}
496
      # Only for the Ext template add disk_info to params
497
      if template_name == constants.DT_EXT:
498
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499
        for key in disk:
500
          if key not in constants.IDISK_PARAMS:
501
            params[key] = disk[key]
502
      disk_index = idx + base_index
503
      size = disk[constants.IDISK_SIZE]
504
      feedback_fn("* disk %s, size %s" %
505
                  (disk_index, utils.FormatUnit(size, "h")))
506
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
507
                              logical_id=logical_id_fn(idx, disk_index, disk),
508
                              iv_name="disk/%d" % disk_index,
509
                              mode=disk[constants.IDISK_MODE],
510
                              params=params,
511
                              spindles=disk.get(constants.IDISK_SPINDLES))
512
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
513
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
514
      disks.append(disk_dev)
515

    
516
  return disks
517

    
518

    
519
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
520
  """Check the presence of the spindle options with exclusive_storage.
521

522
  @type diskdict: dict
523
  @param diskdict: disk parameters
524
  @type es_flag: bool
525
  @param es_flag: the effective value of the exlusive_storage flag
526
  @type required: bool
527
  @param required: whether spindles are required or just optional
528
  @raise errors.OpPrereqError when spindles are given and they should not
529

530
  """
531
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
532
      diskdict[constants.IDISK_SPINDLES] is not None):
533
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
534
                               " when exclusive storage is not active",
535
                               errors.ECODE_INVAL)
536
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
537
                                diskdict[constants.IDISK_SPINDLES] is None)):
538
    raise errors.OpPrereqError("You must specify spindles in instance disks"
539
                               " when exclusive storage is active",
540
                               errors.ECODE_INVAL)
541

    
542

    
543
class LUInstanceRecreateDisks(LogicalUnit):
544
  """Recreate an instance's missing disks.
545

546
  """
547
  HPATH = "instance-recreate-disks"
548
  HTYPE = constants.HTYPE_INSTANCE
549
  REQ_BGL = False
550

    
551
  _MODIFYABLE = compat.UniqueFrozenset([
552
    constants.IDISK_SIZE,
553
    constants.IDISK_MODE,
554
    constants.IDISK_SPINDLES,
555
    ])
556

    
557
  # New or changed disk parameters may have different semantics
558
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
559
    constants.IDISK_ADOPT,
560

    
561
    # TODO: Implement support changing VG while recreating
562
    constants.IDISK_VG,
563
    constants.IDISK_METAVG,
564
    constants.IDISK_PROVIDER,
565
    constants.IDISK_NAME,
566
    ]))
567

    
568
  def _RunAllocator(self):
569
    """Run the allocator based on input opcode.
570

571
    """
572
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
573

    
574
    # FIXME
575
    # The allocator should actually run in "relocate" mode, but current
576
    # allocators don't support relocating all the nodes of an instance at
577
    # the same time. As a workaround we use "allocate" mode, but this is
578
    # suboptimal for two reasons:
579
    # - The instance name passed to the allocator is present in the list of
580
    #   existing instances, so there could be a conflict within the
581
    #   internal structures of the allocator. This doesn't happen with the
582
    #   current allocators, but it's a liability.
583
    # - The allocator counts the resources used by the instance twice: once
584
    #   because the instance exists already, and once because it tries to
585
    #   allocate a new instance.
586
    # The allocator could choose some of the nodes on which the instance is
587
    # running, but that's not a problem. If the instance nodes are broken,
588
    # they should be already be marked as drained or offline, and hence
589
    # skipped by the allocator. If instance disks have been lost for other
590
    # reasons, then recreating the disks on the same nodes should be fine.
591
    disk_template = self.instance.disk_template
592
    spindle_use = be_full[constants.BE_SPINDLE_USE]
593
    disks = [{
594
      constants.IDISK_SIZE: d.size,
595
      constants.IDISK_MODE: d.mode,
596
      constants.IDISK_SPINDLES: d.spindles,
597
      } for d in self.instance.disks]
598
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
599
                                        disk_template=disk_template,
600
                                        tags=list(self.instance.GetTags()),
601
                                        os=self.instance.os,
602
                                        nics=[{}],
603
                                        vcpus=be_full[constants.BE_VCPUS],
604
                                        memory=be_full[constants.BE_MAXMEM],
605
                                        spindle_use=spindle_use,
606
                                        disks=disks,
607
                                        hypervisor=self.instance.hypervisor,
608
                                        node_whitelist=None)
609
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
610

    
611
    ial.Run(self.op.iallocator)
612

    
613
    assert req.RequiredNodes() == len(self.instance.all_nodes)
614

    
615
    if not ial.success:
616
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
617
                                 " %s" % (self.op.iallocator, ial.info),
618
                                 errors.ECODE_NORES)
619

    
620
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
621
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
622
                 self.op.instance_name, self.op.iallocator,
623
                 utils.CommaJoin(self.op.nodes))
624

    
625
  def CheckArguments(self):
626
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
627
      # Normalize and convert deprecated list of disk indices
628
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
629

    
630
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
631
    if duplicates:
632
      raise errors.OpPrereqError("Some disks have been specified more than"
633
                                 " once: %s" % utils.CommaJoin(duplicates),
634
                                 errors.ECODE_INVAL)
635

    
636
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
637
    # when neither iallocator nor nodes are specified
638
    if self.op.iallocator or self.op.nodes:
639
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
640

    
641
    for (idx, params) in self.op.disks:
642
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
643
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
644
      if unsupported:
645
        raise errors.OpPrereqError("Parameters for disk %s try to change"
646
                                   " unmodifyable parameter(s): %s" %
647
                                   (idx, utils.CommaJoin(unsupported)),
648
                                   errors.ECODE_INVAL)
649

    
650
  def ExpandNames(self):
651
    self._ExpandAndLockInstance()
652
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
653

    
654
    if self.op.nodes:
655
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
656
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
657
    else:
658
      self.needed_locks[locking.LEVEL_NODE] = []
659
      if self.op.iallocator:
660
        # iallocator will select a new node in the same group
661
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
662
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
663

    
664
    self.needed_locks[locking.LEVEL_NODE_RES] = []
665

    
666
  def DeclareLocks(self, level):
667
    if level == locking.LEVEL_NODEGROUP:
668
      assert self.op.iallocator is not None
669
      assert not self.op.nodes
670
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
671
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
672
      # Lock the primary group used by the instance optimistically; this
673
      # requires going via the node before it's locked, requiring
674
      # verification later on
675
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
676
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
677

    
678
    elif level == locking.LEVEL_NODE:
679
      # If an allocator is used, then we lock all the nodes in the current
680
      # instance group, as we don't know yet which ones will be selected;
681
      # if we replace the nodes without using an allocator, locks are
682
      # already declared in ExpandNames; otherwise, we need to lock all the
683
      # instance nodes for disk re-creation
684
      if self.op.iallocator:
685
        assert not self.op.nodes
686
        assert not self.needed_locks[locking.LEVEL_NODE]
687
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
688

    
689
        # Lock member nodes of the group of the primary node
690
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
691
          self.needed_locks[locking.LEVEL_NODE].extend(
692
            self.cfg.GetNodeGroup(group_uuid).members)
693

    
694
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
695
      elif not self.op.nodes:
696
        self._LockInstancesNodes(primary_only=False)
697
    elif level == locking.LEVEL_NODE_RES:
698
      # Copy node locks
699
      self.needed_locks[locking.LEVEL_NODE_RES] = \
700
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
701

    
702
  def BuildHooksEnv(self):
703
    """Build hooks env.
704

705
    This runs on master, primary and secondary nodes of the instance.
706

707
    """
708
    return BuildInstanceHookEnvByObject(self, self.instance)
709

    
710
  def BuildHooksNodes(self):
711
    """Build hooks nodes.
712

713
    """
714
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
715
    return (nl, nl)
716

    
717
  def CheckPrereq(self):
718
    """Check prerequisites.
719

720
    This checks that the instance is in the cluster and is not running.
721

722
    """
723
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
724
    assert instance is not None, \
725
      "Cannot retrieve locked instance %s" % self.op.instance_name
726
    if self.op.node_uuids:
727
      if len(self.op.node_uuids) != len(instance.all_nodes):
728
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
729
                                   " %d replacement nodes were specified" %
730
                                   (instance.name, len(instance.all_nodes),
731
                                    len(self.op.node_uuids)),
732
                                   errors.ECODE_INVAL)
733
      assert instance.disk_template != constants.DT_DRBD8 or \
734
             len(self.op.node_uuids) == 2
735
      assert instance.disk_template != constants.DT_PLAIN or \
736
             len(self.op.node_uuids) == 1
737
      primary_node = self.op.node_uuids[0]
738
    else:
739
      primary_node = instance.primary_node
740
    if not self.op.iallocator:
741
      CheckNodeOnline(self, primary_node)
742

    
743
    if instance.disk_template == constants.DT_DISKLESS:
744
      raise errors.OpPrereqError("Instance '%s' has no disks" %
745
                                 self.op.instance_name, errors.ECODE_INVAL)
746

    
747
    # Verify if node group locks are still correct
748
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
749
    if owned_groups:
750
      # Node group locks are acquired only for the primary node (and only
751
      # when the allocator is used)
752
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
753
                              primary_only=True)
754

    
755
    # if we replace nodes *and* the old primary is offline, we don't
756
    # check the instance state
757
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
758
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
759
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
760
                         msg="cannot recreate disks")
761

    
762
    if self.op.disks:
763
      self.disks = dict(self.op.disks)
764
    else:
765
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
766

    
767
    maxidx = max(self.disks.keys())
768
    if maxidx >= len(instance.disks):
769
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
770
                                 errors.ECODE_INVAL)
771

    
772
    if ((self.op.node_uuids or self.op.iallocator) and
773
         sorted(self.disks.keys()) != range(len(instance.disks))):
774
      raise errors.OpPrereqError("Can't recreate disks partially and"
775
                                 " change the nodes at the same time",
776
                                 errors.ECODE_INVAL)
777

    
778
    self.instance = instance
779

    
780
    if self.op.iallocator:
781
      self._RunAllocator()
782
      # Release unneeded node and node resource locks
783
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
784
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
785
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
786

    
787
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
788

    
789
    if self.op.node_uuids:
790
      node_uuids = self.op.node_uuids
791
    else:
792
      node_uuids = instance.all_nodes
793
    excl_stor = compat.any(
794
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
795
      )
796
    for new_params in self.disks.values():
797
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
798

    
799
  def Exec(self, feedback_fn):
800
    """Recreate the disks.
801

802
    """
803
    assert (self.owned_locks(locking.LEVEL_NODE) ==
804
            self.owned_locks(locking.LEVEL_NODE_RES))
805

    
806
    to_skip = []
807
    mods = [] # keeps track of needed changes
808

    
809
    for idx, disk in enumerate(self.instance.disks):
810
      try:
811
        changes = self.disks[idx]
812
      except KeyError:
813
        # Disk should not be recreated
814
        to_skip.append(idx)
815
        continue
816

    
817
      # update secondaries for disks, if needed
818
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
819
        # need to update the nodes and minors
820
        assert len(self.op.node_uuids) == 2
821
        assert len(disk.logical_id) == 6 # otherwise disk internals
822
                                         # have changed
823
        (_, _, old_port, _, _, old_secret) = disk.logical_id
824
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
825
                                                self.instance.uuid)
826
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
827
                  new_minors[0], new_minors[1], old_secret)
828
        assert len(disk.logical_id) == len(new_id)
829
      else:
830
        new_id = None
831

    
832
      mods.append((idx, new_id, changes))
833

    
834
    # now that we have passed all asserts above, we can apply the mods
835
    # in a single run (to avoid partial changes)
836
    for idx, new_id, changes in mods:
837
      disk = self.instance.disks[idx]
838
      if new_id is not None:
839
        assert disk.dev_type == constants.DT_DRBD8
840
        disk.logical_id = new_id
841
      if changes:
842
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
843
                    mode=changes.get(constants.IDISK_MODE, None),
844
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
845

    
846
    # change primary node, if needed
847
    if self.op.node_uuids:
848
      self.instance.primary_node = self.op.node_uuids[0]
849
      self.LogWarning("Changing the instance's nodes, you will have to"
850
                      " remove any disks left on the older nodes manually")
851

    
852
    if self.op.node_uuids:
853
      self.cfg.Update(self.instance, feedback_fn)
854

    
855
    # All touched nodes must be locked
856
    mylocks = self.owned_locks(locking.LEVEL_NODE)
857
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
858
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
859

    
860
    # TODO: Release node locks before wiping, or explain why it's not possible
861
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
862
      wipedisks = [(idx, disk, 0)
863
                   for (idx, disk) in enumerate(self.instance.disks)
864
                   if idx not in to_skip]
865
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
866
                         cleanup=new_disks)
867

    
868

    
869
def _PerformNodeInfoCall(lu, node_uuids, vg):
870
  """Prepares the input and performs a node info call.
871

872
  @type lu: C{LogicalUnit}
873
  @param lu: a logical unit from which we get configuration data
874
  @type node_uuids: list of string
875
  @param node_uuids: list of node UUIDs to perform the call for
876
  @type vg: string
877
  @param vg: the volume group's name
878

879
  """
880
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
881
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
882
                                                  node_uuids)
883
  hvname = lu.cfg.GetHypervisorType()
884
  hvparams = lu.cfg.GetClusterInfo().hvparams
885
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
886
                                   [(hvname, hvparams[hvname])])
887
  return nodeinfo
888

    
889

    
890
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
891
  """Checks the vg capacity for a given node.
892

893
  @type node_info: tuple (_, list of dicts, _)
894
  @param node_info: the result of the node info call for one node
895
  @type node_name: string
896
  @param node_name: the name of the node
897
  @type vg: string
898
  @param vg: volume group name
899
  @type requested: int
900
  @param requested: the amount of disk in MiB to check for
901
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
902
      or we cannot check the node
903

904
  """
905
  (_, space_info, _) = node_info
906
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
907
      space_info, constants.ST_LVM_VG)
908
  if not lvm_vg_info:
909
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
910
  vg_free = lvm_vg_info.get("storage_free", None)
911
  if not isinstance(vg_free, int):
912
    raise errors.OpPrereqError("Can't compute free disk space on node"
913
                               " %s for vg %s, result was '%s'" %
914
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
915
  if requested > vg_free:
916
    raise errors.OpPrereqError("Not enough disk space on target node %s"
917
                               " vg %s: required %d MiB, available %d MiB" %
918
                               (node_name, vg, requested, vg_free),
919
                               errors.ECODE_NORES)
920

    
921

    
922
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
923
  """Checks if nodes have enough free disk space in the specified VG.
924

925
  This function checks if all given nodes have the needed amount of
926
  free disk. In case any node has less disk or we cannot get the
927
  information from the node, this function raises an OpPrereqError
928
  exception.
929

930
  @type lu: C{LogicalUnit}
931
  @param lu: a logical unit from which we get configuration data
932
  @type node_uuids: C{list}
933
  @param node_uuids: the list of node UUIDs to check
934
  @type vg: C{str}
935
  @param vg: the volume group to check
936
  @type requested: C{int}
937
  @param requested: the amount of disk in MiB to check for
938
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
939
      or we cannot check the node
940

941
  """
942
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
943
  for node_uuid in node_uuids:
944
    node_name = lu.cfg.GetNodeName(node_uuid)
945
    info = nodeinfo[node_uuid]
946
    info.Raise("Cannot get current information from node %s" % node_name,
947
               prereq=True, ecode=errors.ECODE_ENVIRON)
948
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
949

    
950

    
951
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
952
  """Checks if nodes have enough free disk space in all the VGs.
953

954
  This function checks if all given nodes have the needed amount of
955
  free disk. In case any node has less disk or we cannot get the
956
  information from the node, this function raises an OpPrereqError
957
  exception.
958

959
  @type lu: C{LogicalUnit}
960
  @param lu: a logical unit from which we get configuration data
961
  @type node_uuids: C{list}
962
  @param node_uuids: the list of node UUIDs to check
963
  @type req_sizes: C{dict}
964
  @param req_sizes: the hash of vg and corresponding amount of disk in
965
      MiB to check for
966
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
967
      or we cannot check the node
968

969
  """
970
  for vg, req_size in req_sizes.items():
971
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
972

    
973

    
974
def _DiskSizeInBytesToMebibytes(lu, size):
975
  """Converts a disk size in bytes to mebibytes.
976

977
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
978

979
  """
980
  (mib, remainder) = divmod(size, 1024 * 1024)
981

    
982
  if remainder != 0:
983
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
984
                  " to not overwrite existing data (%s bytes will not be"
985
                  " wiped)", (1024 * 1024) - remainder)
986
    mib += 1
987

    
988
  return mib
989

    
990

    
991
def _CalcEta(time_taken, written, total_size):
992
  """Calculates the ETA based on size written and total size.
993

994
  @param time_taken: The time taken so far
995
  @param written: amount written so far
996
  @param total_size: The total size of data to be written
997
  @return: The remaining time in seconds
998

999
  """
1000
  avg_time = time_taken / float(written)
1001
  return (total_size - written) * avg_time
1002

    
1003

    
1004
def WipeDisks(lu, instance, disks=None):
1005
  """Wipes instance disks.
1006

1007
  @type lu: L{LogicalUnit}
1008
  @param lu: the logical unit on whose behalf we execute
1009
  @type instance: L{objects.Instance}
1010
  @param instance: the instance whose disks we should create
1011
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1012
  @param disks: Disk details; tuple contains disk index, disk object and the
1013
    start offset
1014

1015
  """
1016
  node_uuid = instance.primary_node
1017
  node_name = lu.cfg.GetNodeName(node_uuid)
1018

    
1019
  if disks is None:
1020
    disks = [(idx, disk, 0)
1021
             for (idx, disk) in enumerate(instance.disks)]
1022

    
1023
  logging.info("Pausing synchronization of disks of instance '%s'",
1024
               instance.name)
1025
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1026
                                                  (map(compat.snd, disks),
1027
                                                   instance),
1028
                                                  True)
1029
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1030

    
1031
  for idx, success in enumerate(result.payload):
1032
    if not success:
1033
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1034
                   " failed", idx, instance.name)
1035

    
1036
  try:
1037
    for (idx, device, offset) in disks:
1038
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1039
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1040
      wipe_chunk_size = \
1041
        int(min(constants.MAX_WIPE_CHUNK,
1042
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1043

    
1044
      size = device.size
1045
      last_output = 0
1046
      start_time = time.time()
1047

    
1048
      if offset == 0:
1049
        info_text = ""
1050
      else:
1051
        info_text = (" (from %s to %s)" %
1052
                     (utils.FormatUnit(offset, "h"),
1053
                      utils.FormatUnit(size, "h")))
1054

    
1055
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1056

    
1057
      logging.info("Wiping disk %d for instance %s on node %s using"
1058
                   " chunk size %s", idx, instance.name, node_name,
1059
                   wipe_chunk_size)
1060

    
1061
      while offset < size:
1062
        wipe_size = min(wipe_chunk_size, size - offset)
1063

    
1064
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1065
                      idx, offset, wipe_size)
1066

    
1067
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1068
                                           offset, wipe_size)
1069
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1070
                     (idx, offset, wipe_size))
1071

    
1072
        now = time.time()
1073
        offset += wipe_size
1074
        if now - last_output >= 60:
1075
          eta = _CalcEta(now - start_time, offset, size)
1076
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1077
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1078
          last_output = now
1079
  finally:
1080
    logging.info("Resuming synchronization of disks for instance '%s'",
1081
                 instance.name)
1082

    
1083
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1084
                                                    (map(compat.snd, disks),
1085
                                                     instance),
1086
                                                    False)
1087

    
1088
    if result.fail_msg:
1089
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1090
                    node_name, result.fail_msg)
1091
    else:
1092
      for idx, success in enumerate(result.payload):
1093
        if not success:
1094
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1095
                        " failed", idx, instance.name)
1096

    
1097

    
1098
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1099
  """Wrapper for L{WipeDisks} that handles errors.
1100

1101
  @type lu: L{LogicalUnit}
1102
  @param lu: the logical unit on whose behalf we execute
1103
  @type instance: L{objects.Instance}
1104
  @param instance: the instance whose disks we should wipe
1105
  @param disks: see L{WipeDisks}
1106
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1107
      case of error
1108
  @raise errors.OpPrereqError: in case of failure
1109

1110
  """
1111
  try:
1112
    WipeDisks(lu, instance, disks=disks)
1113
  except errors.OpExecError:
1114
    logging.warning("Wiping disks for instance '%s' failed",
1115
                    instance.name)
1116
    _UndoCreateDisks(lu, cleanup, instance)
1117
    raise
1118

    
1119

    
1120
def ExpandCheckDisks(instance, disks):
1121
  """Return the instance disks selected by the disks list
1122

1123
  @type disks: list of L{objects.Disk} or None
1124
  @param disks: selected disks
1125
  @rtype: list of L{objects.Disk}
1126
  @return: selected instance disks to act on
1127

1128
  """
1129
  if disks is None:
1130
    return instance.disks
1131
  else:
1132
    if not set(disks).issubset(instance.disks):
1133
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1134
                                   " target instance: expected a subset of %r,"
1135
                                   " got %r" % (instance.disks, disks))
1136
    return disks
1137

    
1138

    
1139
def WaitForSync(lu, instance, disks=None, oneshot=False):
1140
  """Sleep and poll for an instance's disk to sync.
1141

1142
  """
1143
  if not instance.disks or disks is not None and not disks:
1144
    return True
1145

    
1146
  disks = ExpandCheckDisks(instance, disks)
1147

    
1148
  if not oneshot:
1149
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1150

    
1151
  node_uuid = instance.primary_node
1152
  node_name = lu.cfg.GetNodeName(node_uuid)
1153

    
1154
  # TODO: Convert to utils.Retry
1155

    
1156
  retries = 0
1157
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1158
  while True:
1159
    max_time = 0
1160
    done = True
1161
    cumul_degraded = False
1162
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1163
    msg = rstats.fail_msg
1164
    if msg:
1165
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1166
      retries += 1
1167
      if retries >= 10:
1168
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1169
                                 " aborting." % node_name)
1170
      time.sleep(6)
1171
      continue
1172
    rstats = rstats.payload
1173
    retries = 0
1174
    for i, mstat in enumerate(rstats):
1175
      if mstat is None:
1176
        lu.LogWarning("Can't compute data for node %s/%s",
1177
                      node_name, disks[i].iv_name)
1178
        continue
1179

    
1180
      cumul_degraded = (cumul_degraded or
1181
                        (mstat.is_degraded and mstat.sync_percent is None))
1182
      if mstat.sync_percent is not None:
1183
        done = False
1184
        if mstat.estimated_time is not None:
1185
          rem_time = ("%s remaining (estimated)" %
1186
                      utils.FormatSeconds(mstat.estimated_time))
1187
          max_time = mstat.estimated_time
1188
        else:
1189
          rem_time = "no time estimate"
1190
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1191
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1192

    
1193
    # if we're done but degraded, let's do a few small retries, to
1194
    # make sure we see a stable and not transient situation; therefore
1195
    # we force restart of the loop
1196
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1197
      logging.info("Degraded disks found, %d retries left", degr_retries)
1198
      degr_retries -= 1
1199
      time.sleep(1)
1200
      continue
1201

    
1202
    if done or oneshot:
1203
      break
1204

    
1205
    time.sleep(min(60, max_time))
1206

    
1207
  if done:
1208
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1209

    
1210
  return not cumul_degraded
1211

    
1212

    
1213
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1214
  """Shutdown block devices of an instance.
1215

1216
  This does the shutdown on all nodes of the instance.
1217

1218
  If the ignore_primary is false, errors on the primary node are
1219
  ignored.
1220

1221
  """
1222
  all_result = True
1223

    
1224
  if disks is None:
1225
    # only mark instance disks as inactive if all disks are affected
1226
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1227
  disks = ExpandCheckDisks(instance, disks)
1228

    
1229
  for disk in disks:
1230
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1231
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232
      msg = result.fail_msg
1233
      if msg:
1234
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1237
            (node_uuid != instance.primary_node and not result.offline)):
1238
          all_result = False
1239
  return all_result
1240

    
1241

    
1242
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243
  """Shutdown block devices of an instance.
1244

1245
  This function checks if an instance is running, before calling
1246
  _ShutdownInstanceDisks.
1247

1248
  """
1249
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250
  ShutdownInstanceDisks(lu, instance, disks=disks)
1251

    
1252

    
1253
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254
                          ignore_size=False):
1255
  """Prepare the block devices for an instance.
1256

1257
  This sets up the block devices on all nodes.
1258

1259
  @type lu: L{LogicalUnit}
1260
  @param lu: the logical unit on whose behalf we execute
1261
  @type instance: L{objects.Instance}
1262
  @param instance: the instance for whose disks we assemble
1263
  @type disks: list of L{objects.Disk} or None
1264
  @param disks: which disks to assemble (or all, if None)
1265
  @type ignore_secondaries: boolean
1266
  @param ignore_secondaries: if true, errors on secondary nodes
1267
      won't result in an error return from the function
1268
  @type ignore_size: boolean
1269
  @param ignore_size: if true, the current known size of the disk
1270
      will not be used during the disk activation, useful for cases
1271
      when the size is wrong
1272
  @return: False if the operation failed, otherwise a list of
1273
      (host, instance_visible_name, node_visible_name)
1274
      with the mapping from node devices to instance devices
1275

1276
  """
1277
  device_info = []
1278
  disks_ok = True
1279

    
1280
  if disks is None:
1281
    # only mark instance disks as active if all disks are affected
1282
    lu.cfg.MarkInstanceDisksActive(instance.uuid)
1283

    
1284
  disks = ExpandCheckDisks(instance, disks)
1285

    
1286
  # With the two passes mechanism we try to reduce the window of
1287
  # opportunity for the race condition of switching DRBD to primary
1288
  # before handshaking occured, but we do not eliminate it
1289

    
1290
  # The proper fix would be to wait (with some limits) until the
1291
  # connection has been made and drbd transitions from WFConnection
1292
  # into any other network-connected state (Connected, SyncTarget,
1293
  # SyncSource, etc.)
1294

    
1295
  # 1st pass, assemble on all nodes in secondary mode
1296
  for idx, inst_disk in enumerate(disks):
1297
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1298
                                  instance.primary_node):
1299
      if ignore_size:
1300
        node_disk = node_disk.Copy()
1301
        node_disk.UnsetSize()
1302
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303
                                             instance.name, False, idx)
1304
      msg = result.fail_msg
1305
      if msg:
1306
        secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance)
1307
        is_offline_secondary = (node_uuid in secondary_nodes and
1308
                                result.offline)
1309
        lu.LogWarning("Could not prepare block device %s on node %s"
1310
                      " (is_primary=False, pass=1): %s",
1311
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1312
        if not (ignore_secondaries or is_offline_secondary):
1313
          disks_ok = False
1314

    
1315
  # FIXME: race condition on drbd migration to primary
1316

    
1317
  # 2nd pass, do only the primary node
1318
  for idx, inst_disk in enumerate(disks):
1319
    dev_path = None
1320

    
1321
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1322
                                  instance.primary_node):
1323
      if node_uuid != instance.primary_node:
1324
        continue
1325
      if ignore_size:
1326
        node_disk = node_disk.Copy()
1327
        node_disk.UnsetSize()
1328
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329
                                             instance.name, True, idx)
1330
      msg = result.fail_msg
1331
      if msg:
1332
        lu.LogWarning("Could not prepare block device %s on node %s"
1333
                      " (is_primary=True, pass=2): %s",
1334
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1335
        disks_ok = False
1336
      else:
1337
        dev_path, _ = result.payload
1338

    
1339
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340
                        inst_disk.iv_name, dev_path))
1341

    
1342
  if not disks_ok:
1343
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1344

    
1345
  return disks_ok, device_info
1346

    
1347

    
1348
def StartInstanceDisks(lu, instance, force):
1349
  """Start the disks of an instance.
1350

1351
  """
1352
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1353
                                      ignore_secondaries=force)
1354
  if not disks_ok:
1355
    ShutdownInstanceDisks(lu, instance)
1356
    if force is not None and not force:
1357
      lu.LogWarning("",
1358
                    hint=("If the message above refers to a secondary node,"
1359
                          " you can retry the operation using '--force'"))
1360
    raise errors.OpExecError("Disk consistency error")
1361

    
1362

    
1363
class LUInstanceGrowDisk(LogicalUnit):
1364
  """Grow a disk of an instance.
1365

1366
  """
1367
  HPATH = "disk-grow"
1368
  HTYPE = constants.HTYPE_INSTANCE
1369
  REQ_BGL = False
1370

    
1371
  def ExpandNames(self):
1372
    self._ExpandAndLockInstance()
1373
    self.needed_locks[locking.LEVEL_NODE] = []
1374
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1375
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1376
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1377

    
1378
  def DeclareLocks(self, level):
1379
    if level == locking.LEVEL_NODE:
1380
      self._LockInstancesNodes()
1381
    elif level == locking.LEVEL_NODE_RES:
1382
      # Copy node locks
1383
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1384
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1385

    
1386
  def BuildHooksEnv(self):
1387
    """Build hooks env.
1388

1389
    This runs on the master, the primary and all the secondaries.
1390

1391
    """
1392
    env = {
1393
      "DISK": self.op.disk,
1394
      "AMOUNT": self.op.amount,
1395
      "ABSOLUTE": self.op.absolute,
1396
      }
1397
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1398
    return env
1399

    
1400
  def BuildHooksNodes(self):
1401
    """Build hooks nodes.
1402

1403
    """
1404
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1405
    return (nl, nl)
1406

    
1407
  def CheckPrereq(self):
1408
    """Check prerequisites.
1409

1410
    This checks that the instance is in the cluster.
1411

1412
    """
1413
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1414
    assert self.instance is not None, \
1415
      "Cannot retrieve locked instance %s" % self.op.instance_name
1416
    node_uuids = list(self.instance.all_nodes)
1417
    for node_uuid in node_uuids:
1418
      CheckNodeOnline(self, node_uuid)
1419
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1420

    
1421
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1422
      raise errors.OpPrereqError("Instance's disk layout does not support"
1423
                                 " growing", errors.ECODE_INVAL)
1424

    
1425
    self.disk = self.instance.FindDisk(self.op.disk)
1426

    
1427
    if self.op.absolute:
1428
      self.target = self.op.amount
1429
      self.delta = self.target - self.disk.size
1430
      if self.delta < 0:
1431
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1432
                                   "current disk size (%s)" %
1433
                                   (utils.FormatUnit(self.target, "h"),
1434
                                    utils.FormatUnit(self.disk.size, "h")),
1435
                                   errors.ECODE_STATE)
1436
    else:
1437
      self.delta = self.op.amount
1438
      self.target = self.disk.size + self.delta
1439
      if self.delta < 0:
1440
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1441
                                   utils.FormatUnit(self.delta, "h"),
1442
                                   errors.ECODE_INVAL)
1443

    
1444
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1445

    
1446
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1447
    template = self.instance.disk_template
1448
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1449
        not any(self.node_es_flags.values())):
1450
      # TODO: check the free disk space for file, when that feature will be
1451
      # supported
1452
      # With exclusive storage we need to do something smarter than just looking
1453
      # at free space, which, in the end, is basically a dry run. So we rely on
1454
      # the dry run performed in Exec() instead.
1455
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1456

    
1457
  def Exec(self, feedback_fn):
1458
    """Execute disk grow.
1459

1460
    """
1461
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1462
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1463
            self.owned_locks(locking.LEVEL_NODE_RES))
1464

    
1465
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1466

    
1467
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1468
    if not disks_ok:
1469
      raise errors.OpExecError("Cannot activate block device to grow")
1470

    
1471
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1472
                (self.op.disk, self.instance.name,
1473
                 utils.FormatUnit(self.delta, "h"),
1474
                 utils.FormatUnit(self.target, "h")))
1475

    
1476
    # First run all grow ops in dry-run mode
1477
    for node_uuid in self.instance.all_nodes:
1478
      result = self.rpc.call_blockdev_grow(node_uuid,
1479
                                           (self.disk, self.instance),
1480
                                           self.delta, True, True,
1481
                                           self.node_es_flags[node_uuid])
1482
      result.Raise("Dry-run grow request failed to node %s" %
1483
                   self.cfg.GetNodeName(node_uuid))
1484

    
1485
    if wipe_disks:
1486
      # Get disk size from primary node for wiping
1487
      result = self.rpc.call_blockdev_getdimensions(
1488
                 self.instance.primary_node, [([self.disk], self.instance)])
1489
      result.Raise("Failed to retrieve disk size from node '%s'" %
1490
                   self.instance.primary_node)
1491

    
1492
      (disk_dimensions, ) = result.payload
1493

    
1494
      if disk_dimensions is None:
1495
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1496
                                 " node '%s'" % self.instance.primary_node)
1497
      (disk_size_in_bytes, _) = disk_dimensions
1498

    
1499
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1500

    
1501
      assert old_disk_size >= self.disk.size, \
1502
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1503
         (old_disk_size, self.disk.size))
1504
    else:
1505
      old_disk_size = None
1506

    
1507
    # We know that (as far as we can test) operations across different
1508
    # nodes will succeed, time to run it for real on the backing storage
1509
    for node_uuid in self.instance.all_nodes:
1510
      result = self.rpc.call_blockdev_grow(node_uuid,
1511
                                           (self.disk, self.instance),
1512
                                           self.delta, False, True,
1513
                                           self.node_es_flags[node_uuid])
1514
      result.Raise("Grow request failed to node %s" %
1515
                   self.cfg.GetNodeName(node_uuid))
1516

    
1517
    # And now execute it for logical storage, on the primary node
1518
    node_uuid = self.instance.primary_node
1519
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1520
                                         self.delta, False, False,
1521
                                         self.node_es_flags[node_uuid])
1522
    result.Raise("Grow request failed to node %s" %
1523
                 self.cfg.GetNodeName(node_uuid))
1524

    
1525
    self.disk.RecordGrow(self.delta)
1526
    self.cfg.Update(self.instance, feedback_fn)
1527

    
1528
    # Changes have been recorded, release node lock
1529
    ReleaseLocks(self, locking.LEVEL_NODE)
1530

    
1531
    # Downgrade lock while waiting for sync
1532
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1533

    
1534
    assert wipe_disks ^ (old_disk_size is None)
1535

    
1536
    if wipe_disks:
1537
      assert self.instance.disks[self.op.disk] == self.disk
1538

    
1539
      # Wipe newly added disk space
1540
      WipeDisks(self, self.instance,
1541
                disks=[(self.op.disk, self.disk, old_disk_size)])
1542

    
1543
    if self.op.wait_for_sync:
1544
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1545
      if disk_abort:
1546
        self.LogWarning("Disk syncing has not returned a good status; check"
1547
                        " the instance")
1548
      if not self.instance.disks_active:
1549
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1550
    elif not self.instance.disks_active:
1551
      self.LogWarning("Not shutting down the disk even if the instance is"
1552
                      " not supposed to be running because no wait for"
1553
                      " sync mode was requested")
1554

    
1555
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1556
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1557

    
1558

    
1559
class LUInstanceReplaceDisks(LogicalUnit):
1560
  """Replace the disks of an instance.
1561

1562
  """
1563
  HPATH = "mirrors-replace"
1564
  HTYPE = constants.HTYPE_INSTANCE
1565
  REQ_BGL = False
1566

    
1567
  def CheckArguments(self):
1568
    """Check arguments.
1569

1570
    """
1571
    if self.op.mode == constants.REPLACE_DISK_CHG:
1572
      if self.op.remote_node is None and self.op.iallocator is None:
1573
        raise errors.OpPrereqError("When changing the secondary either an"
1574
                                   " iallocator script must be used or the"
1575
                                   " new node given", errors.ECODE_INVAL)
1576
      else:
1577
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1578

    
1579
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1580
      # Not replacing the secondary
1581
      raise errors.OpPrereqError("The iallocator and new node options can"
1582
                                 " only be used when changing the"
1583
                                 " secondary node", errors.ECODE_INVAL)
1584

    
1585
  def ExpandNames(self):
1586
    self._ExpandAndLockInstance()
1587

    
1588
    assert locking.LEVEL_NODE not in self.needed_locks
1589
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1590
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1591

    
1592
    assert self.op.iallocator is None or self.op.remote_node is None, \
1593
      "Conflicting options"
1594

    
1595
    if self.op.remote_node is not None:
1596
      (self.op.remote_node_uuid, self.op.remote_node) = \
1597
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1598
                              self.op.remote_node)
1599

    
1600
      # Warning: do not remove the locking of the new secondary here
1601
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1602
      # currently it doesn't since parallel invocations of
1603
      # FindUnusedMinor will conflict
1604
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1605
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1606
    else:
1607
      self.needed_locks[locking.LEVEL_NODE] = []
1608
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1609

    
1610
      if self.op.iallocator is not None:
1611
        # iallocator will select a new node in the same group
1612
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1613
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1614

    
1615
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1616

    
1617
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1618
                                   self.op.instance_name, self.op.mode,
1619
                                   self.op.iallocator, self.op.remote_node_uuid,
1620
                                   self.op.disks, self.op.early_release,
1621
                                   self.op.ignore_ipolicy)
1622

    
1623
    self.tasklets = [self.replacer]
1624

    
1625
  def DeclareLocks(self, level):
1626
    if level == locking.LEVEL_NODEGROUP:
1627
      assert self.op.remote_node_uuid is None
1628
      assert self.op.iallocator is not None
1629
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1630

    
1631
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1632
      # Lock all groups used by instance optimistically; this requires going
1633
      # via the node before it's locked, requiring verification later on
1634
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1635
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1636

    
1637
    elif level == locking.LEVEL_NODE:
1638
      if self.op.iallocator is not None:
1639
        assert self.op.remote_node_uuid is None
1640
        assert not self.needed_locks[locking.LEVEL_NODE]
1641
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1642

    
1643
        # Lock member nodes of all locked groups
1644
        self.needed_locks[locking.LEVEL_NODE] = \
1645
          [node_uuid
1646
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1647
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1648
      else:
1649
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1650

    
1651
        self._LockInstancesNodes()
1652

    
1653
    elif level == locking.LEVEL_NODE_RES:
1654
      # Reuse node locks
1655
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1656
        self.needed_locks[locking.LEVEL_NODE]
1657

    
1658
  def BuildHooksEnv(self):
1659
    """Build hooks env.
1660

1661
    This runs on the master, the primary and all the secondaries.
1662

1663
    """
1664
    instance = self.replacer.instance
1665
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
1666
    env = {
1667
      "MODE": self.op.mode,
1668
      "NEW_SECONDARY": self.op.remote_node,
1669
      "OLD_SECONDARY": self.cfg.GetNodeName(secondary_nodes[0]),
1670
      }
1671
    env.update(BuildInstanceHookEnvByObject(self, instance))
1672
    return env
1673

    
1674
  def BuildHooksNodes(self):
1675
    """Build hooks nodes.
1676

1677
    """
1678
    instance = self.replacer.instance
1679
    nl = [
1680
      self.cfg.GetMasterNode(),
1681
      instance.primary_node,
1682
      ]
1683
    if self.op.remote_node_uuid is not None:
1684
      nl.append(self.op.remote_node_uuid)
1685
    return nl, nl
1686

    
1687
  def CheckPrereq(self):
1688
    """Check prerequisites.
1689

1690
    """
1691
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1692
            self.op.iallocator is None)
1693

    
1694
    # Verify if node group locks are still correct
1695
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1696
    if owned_groups:
1697
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1698

    
1699
    return LogicalUnit.CheckPrereq(self)
1700

    
1701

    
1702
class LUInstanceActivateDisks(NoHooksLU):
1703
  """Bring up an instance's disks.
1704

1705
  """
1706
  REQ_BGL = False
1707

    
1708
  def ExpandNames(self):
1709
    self._ExpandAndLockInstance()
1710
    self.needed_locks[locking.LEVEL_NODE] = []
1711
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1712

    
1713
  def DeclareLocks(self, level):
1714
    if level == locking.LEVEL_NODE:
1715
      self._LockInstancesNodes()
1716

    
1717
  def CheckPrereq(self):
1718
    """Check prerequisites.
1719

1720
    This checks that the instance is in the cluster.
1721

1722
    """
1723
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1724
    assert self.instance is not None, \
1725
      "Cannot retrieve locked instance %s" % self.op.instance_name
1726
    CheckNodeOnline(self, self.instance.primary_node)
1727

    
1728
  def Exec(self, feedback_fn):
1729
    """Activate the disks.
1730

1731
    """
1732
    disks_ok, disks_info = \
1733
              AssembleInstanceDisks(self, self.instance,
1734
                                    ignore_size=self.op.ignore_size)
1735
    if not disks_ok:
1736
      raise errors.OpExecError("Cannot activate block devices")
1737

    
1738
    if self.op.wait_for_sync:
1739
      if not WaitForSync(self, self.instance):
1740
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1741
        raise errors.OpExecError("Some disks of the instance are degraded!")
1742

    
1743
    return disks_info
1744

    
1745

    
1746
class LUInstanceDeactivateDisks(NoHooksLU):
1747
  """Shutdown an instance's disks.
1748

1749
  """
1750
  REQ_BGL = False
1751

    
1752
  def ExpandNames(self):
1753
    self._ExpandAndLockInstance()
1754
    self.needed_locks[locking.LEVEL_NODE] = []
1755
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1756

    
1757
  def DeclareLocks(self, level):
1758
    if level == locking.LEVEL_NODE:
1759
      self._LockInstancesNodes()
1760

    
1761
  def CheckPrereq(self):
1762
    """Check prerequisites.
1763

1764
    This checks that the instance is in the cluster.
1765

1766
    """
1767
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1768
    assert self.instance is not None, \
1769
      "Cannot retrieve locked instance %s" % self.op.instance_name
1770

    
1771
  def Exec(self, feedback_fn):
1772
    """Deactivate the disks
1773

1774
    """
1775
    if self.op.force:
1776
      ShutdownInstanceDisks(self, self.instance)
1777
    else:
1778
      _SafeShutdownInstanceDisks(self, self.instance)
1779

    
1780

    
1781
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1782
                               ldisk=False):
1783
  """Check that mirrors are not degraded.
1784

1785
  @attention: The device has to be annotated already.
1786

1787
  The ldisk parameter, if True, will change the test from the
1788
  is_degraded attribute (which represents overall non-ok status for
1789
  the device(s)) to the ldisk (representing the local storage status).
1790

1791
  """
1792
  result = True
1793

    
1794
  if on_primary or dev.AssembleOnSecondary():
1795
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1796
    msg = rstats.fail_msg
1797
    if msg:
1798
      lu.LogWarning("Can't find disk on node %s: %s",
1799
                    lu.cfg.GetNodeName(node_uuid), msg)
1800
      result = False
1801
    elif not rstats.payload:
1802
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1803
      result = False
1804
    else:
1805
      if ldisk:
1806
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1807
      else:
1808
        result = result and not rstats.payload.is_degraded
1809

    
1810
  if dev.children:
1811
    for child in dev.children:
1812
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1813
                                                     node_uuid, on_primary)
1814

    
1815
  return result
1816

    
1817

    
1818
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1819
  """Wrapper around L{_CheckDiskConsistencyInner}.
1820

1821
  """
1822
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1823
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1824
                                    ldisk=ldisk)
1825

    
1826

    
1827
def _BlockdevFind(lu, node_uuid, dev, instance):
1828
  """Wrapper around call_blockdev_find to annotate diskparams.
1829

1830
  @param lu: A reference to the lu object
1831
  @param node_uuid: The node to call out
1832
  @param dev: The device to find
1833
  @param instance: The instance object the device belongs to
1834
  @returns The result of the rpc call
1835

1836
  """
1837
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1838
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1839

    
1840

    
1841
def _GenerateUniqueNames(lu, exts):
1842
  """Generate a suitable LV name.
1843

1844
  This will generate a logical volume name for the given instance.
1845

1846
  """
1847
  results = []
1848
  for val in exts:
1849
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1850
    results.append("%s%s" % (new_id, val))
1851
  return results
1852

    
1853

    
1854
class TLReplaceDisks(Tasklet):
1855
  """Replaces disks for an instance.
1856

1857
  Note: Locking is not within the scope of this class.
1858

1859
  """
1860
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1861
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1862
    """Initializes this class.
1863

1864
    """
1865
    Tasklet.__init__(self, lu)
1866

    
1867
    # Parameters
1868
    self.instance_uuid = instance_uuid
1869
    self.instance_name = instance_name
1870
    self.mode = mode
1871
    self.iallocator_name = iallocator_name
1872
    self.remote_node_uuid = remote_node_uuid
1873
    self.disks = disks
1874
    self.early_release = early_release
1875
    self.ignore_ipolicy = ignore_ipolicy
1876

    
1877
    # Runtime data
1878
    self.instance = None
1879
    self.new_node_uuid = None
1880
    self.target_node_uuid = None
1881
    self.other_node_uuid = None
1882
    self.remote_node_info = None
1883
    self.node_secondary_ip = None
1884

    
1885
  @staticmethod
1886
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1887
                    relocate_from_node_uuids):
1888
    """Compute a new secondary node using an IAllocator.
1889

1890
    """
1891
    req = iallocator.IAReqRelocate(
1892
          inst_uuid=instance_uuid,
1893
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1894
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1895

    
1896
    ial.Run(iallocator_name)
1897

    
1898
    if not ial.success:
1899
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1900
                                 " %s" % (iallocator_name, ial.info),
1901
                                 errors.ECODE_NORES)
1902

    
1903
    remote_node_name = ial.result[0]
1904
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1905

    
1906
    if remote_node is None:
1907
      raise errors.OpPrereqError("Node %s not found in configuration" %
1908
                                 remote_node_name, errors.ECODE_NOENT)
1909

    
1910
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1911
               instance_uuid, remote_node_name)
1912

    
1913
    return remote_node.uuid
1914

    
1915
  def _FindFaultyDisks(self, node_uuid):
1916
    """Wrapper for L{FindFaultyInstanceDisks}.
1917

1918
    """
1919
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1920
                                   node_uuid, True)
1921

    
1922
  def _CheckDisksActivated(self, instance):
1923
    """Checks if the instance disks are activated.
1924

1925
    @param instance: The instance to check disks
1926
    @return: True if they are activated, False otherwise
1927

1928
    """
1929
    node_uuids = instance.all_nodes
1930

    
1931
    for idx, dev in enumerate(instance.disks):
1932
      for node_uuid in node_uuids:
1933
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1934
                        self.cfg.GetNodeName(node_uuid))
1935

    
1936
        result = _BlockdevFind(self, node_uuid, dev, instance)
1937

    
1938
        if result.offline:
1939
          continue
1940
        elif result.fail_msg or not result.payload:
1941
          return False
1942

    
1943
    return True
1944

    
1945
  def CheckPrereq(self):
1946
    """Check prerequisites.
1947

1948
    This checks that the instance is in the cluster.
1949

1950
    """
1951
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1952
    assert self.instance is not None, \
1953
      "Cannot retrieve locked instance %s" % self.instance_name
1954

    
1955
    if self.instance.disk_template != constants.DT_DRBD8:
1956
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1957
                                 " instances", errors.ECODE_INVAL)
1958

    
1959
    secondary_nodes = self.cfg.GetInstanceSeconaryNodes(self.instance)
1960
    if len(secondary_nodes) != 1:
1961
      raise errors.OpPrereqError("The instance has a strange layout,"
1962
                                 " expected one secondary but found %d" %
1963
                                 len(secondary_nodes),
1964
                                 errors.ECODE_FAULT)
1965

    
1966
    secondary_node_uuid = secondary_nodes[0]
1967

    
1968
    if self.iallocator_name is None:
1969
      remote_node_uuid = self.remote_node_uuid
1970
    else:
1971
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1972
                                            self.instance.uuid,
1973
                                            secondary_nodes)
1974

    
1975
    if remote_node_uuid is None:
1976
      self.remote_node_info = None
1977
    else:
1978
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1979
             "Remote node '%s' is not locked" % remote_node_uuid
1980

    
1981
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1982
      assert self.remote_node_info is not None, \
1983
        "Cannot retrieve locked node %s" % remote_node_uuid
1984

    
1985
    if remote_node_uuid == self.instance.primary_node:
1986
      raise errors.OpPrereqError("The specified node is the primary node of"
1987
                                 " the instance", errors.ECODE_INVAL)
1988

    
1989
    if remote_node_uuid == secondary_node_uuid:
1990
      raise errors.OpPrereqError("The specified node is already the"
1991
                                 " secondary node of the instance",
1992
                                 errors.ECODE_INVAL)
1993

    
1994
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1995
                                    constants.REPLACE_DISK_CHG):
1996
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1997
                                 errors.ECODE_INVAL)
1998

    
1999
    if self.mode == constants.REPLACE_DISK_AUTO:
2000
      if not self._CheckDisksActivated(self.instance):
2001
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2002
                                   " first" % self.instance_name,
2003
                                   errors.ECODE_STATE)
2004
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2005
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2006

    
2007
      if faulty_primary and faulty_secondary:
2008
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2009
                                   " one node and can not be repaired"
2010
                                   " automatically" % self.instance_name,
2011
                                   errors.ECODE_STATE)
2012

    
2013
      if faulty_primary:
2014
        self.disks = faulty_primary
2015
        self.target_node_uuid = self.instance.primary_node
2016
        self.other_node_uuid = secondary_node_uuid
2017
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2018
      elif faulty_secondary:
2019
        self.disks = faulty_secondary
2020
        self.target_node_uuid = secondary_node_uuid
2021
        self.other_node_uuid = self.instance.primary_node
2022
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2023
      else:
2024
        self.disks = []
2025
        check_nodes = []
2026

    
2027
    else:
2028
      # Non-automatic modes
2029
      if self.mode == constants.REPLACE_DISK_PRI:
2030
        self.target_node_uuid = self.instance.primary_node
2031
        self.other_node_uuid = secondary_node_uuid
2032
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2033

    
2034
      elif self.mode == constants.REPLACE_DISK_SEC:
2035
        self.target_node_uuid = secondary_node_uuid
2036
        self.other_node_uuid = self.instance.primary_node
2037
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2038

    
2039
      elif self.mode == constants.REPLACE_DISK_CHG:
2040
        self.new_node_uuid = remote_node_uuid
2041
        self.other_node_uuid = self.instance.primary_node
2042
        self.target_node_uuid = secondary_node_uuid
2043
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2044

    
2045
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2046
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2047

    
2048
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2049
        assert old_node_info is not None
2050
        if old_node_info.offline and not self.early_release:
2051
          # doesn't make sense to delay the release
2052
          self.early_release = True
2053
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2054
                          " early-release mode", secondary_node_uuid)
2055

    
2056
      else:
2057
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2058
                                     self.mode)
2059

    
2060
      # If not specified all disks should be replaced
2061
      if not self.disks:
2062
        self.disks = range(len(self.instance.disks))
2063

    
2064
    # TODO: This is ugly, but right now we can't distinguish between internal
2065
    # submitted opcode and external one. We should fix that.
2066
    if self.remote_node_info:
2067
      # We change the node, lets verify it still meets instance policy
2068
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2069
      cluster = self.cfg.GetClusterInfo()
2070
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2071
                                                              new_group_info)
2072
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2073
                             self.remote_node_info, self.cfg,
2074
                             ignore=self.ignore_ipolicy)
2075

    
2076
    for node_uuid in check_nodes:
2077
      CheckNodeOnline(self.lu, node_uuid)
2078

    
2079
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2080
                                                          self.other_node_uuid,
2081
                                                          self.target_node_uuid]
2082
                              if node_uuid is not None)
2083

    
2084
    # Release unneeded node and node resource locks
2085
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2086
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2087
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2088

    
2089
    # Release any owned node group
2090
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2091

    
2092
    # Check whether disks are valid
2093
    for disk_idx in self.disks:
2094
      self.instance.FindDisk(disk_idx)
2095

    
2096
    # Get secondary node IP addresses
2097
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2098
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2099

    
2100
  def Exec(self, feedback_fn):
2101
    """Execute disk replacement.
2102

2103
    This dispatches the disk replacement to the appropriate handler.
2104

2105
    """
2106
    if __debug__:
2107
      # Verify owned locks before starting operation
2108
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2109
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2110
          ("Incorrect node locks, owning %s, expected %s" %
2111
           (owned_nodes, self.node_secondary_ip.keys()))
2112
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2113
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2114
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2115

    
2116
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2117
      assert list(owned_instances) == [self.instance_name], \
2118
          "Instance '%s' not locked" % self.instance_name
2119

    
2120
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2121
          "Should not own any node group lock at this point"
2122

    
2123
    if not self.disks:
2124
      feedback_fn("No disks need replacement for instance '%s'" %
2125
                  self.instance.name)
2126
      return
2127

    
2128
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2129
                (utils.CommaJoin(self.disks), self.instance.name))
2130
    feedback_fn("Current primary node: %s" %
2131
                self.cfg.GetNodeName(self.instance.primary_node))
2132
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance)
2133
    feedback_fn("Current seconary node: %s" %
2134
                utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes)))
2135

    
2136
    activate_disks = not self.instance.disks_active
2137

    
2138
    # Activate the instance disks if we're replacing them on a down instance
2139
    if activate_disks:
2140
      StartInstanceDisks(self.lu, self.instance, True)
2141

    
2142
    try:
2143
      # Should we replace the secondary node?
2144
      if self.new_node_uuid is not None:
2145
        fn = self._ExecDrbd8Secondary
2146
      else:
2147
        fn = self._ExecDrbd8DiskOnly
2148

    
2149
      result = fn(feedback_fn)
2150
    finally:
2151
      # Deactivate the instance disks if we're replacing them on a
2152
      # down instance
2153
      if activate_disks:
2154
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2155

    
2156
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2157

    
2158
    if __debug__:
2159
      # Verify owned locks
2160
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2161
      nodes = frozenset(self.node_secondary_ip)
2162
      assert ((self.early_release and not owned_nodes) or
2163
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2164
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2165
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2166

    
2167
    return result
2168

    
2169
  def _CheckVolumeGroup(self, node_uuids):
2170
    self.lu.LogInfo("Checking volume groups")
2171

    
2172
    vgname = self.cfg.GetVGName()
2173

    
2174
    # Make sure volume group exists on all involved nodes
2175
    results = self.rpc.call_vg_list(node_uuids)
2176
    if not results:
2177
      raise errors.OpExecError("Can't list volume groups on the nodes")
2178

    
2179
    for node_uuid in node_uuids:
2180
      res = results[node_uuid]
2181
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2182
      if vgname not in res.payload:
2183
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2184
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2185

    
2186
  def _CheckDisksExistence(self, node_uuids):
2187
    # Check disk existence
2188
    for idx, dev in enumerate(self.instance.disks):
2189
      if idx not in self.disks:
2190
        continue
2191

    
2192
      for node_uuid in node_uuids:
2193
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2194
                        self.cfg.GetNodeName(node_uuid))
2195

    
2196
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2197

    
2198
        msg = result.fail_msg
2199
        if msg or not result.payload:
2200
          if not msg:
2201
            msg = "disk not found"
2202
          if not self._CheckDisksActivated(self.instance):
2203
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2204
                          " running activate-disks on the instance before"
2205
                          " using replace-disks.")
2206
          else:
2207
            extra_hint = ""
2208
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2209
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2210
                                    extra_hint))
2211

    
2212
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2213
    for idx, dev in enumerate(self.instance.disks):
2214
      if idx not in self.disks:
2215
        continue
2216

    
2217
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2218
                      (idx, self.cfg.GetNodeName(node_uuid)))
2219

    
2220
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2221
                                  on_primary, ldisk=ldisk):
2222
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2223
                                 " replace disks for instance %s" %
2224
                                 (self.cfg.GetNodeName(node_uuid),
2225
                                  self.instance.name))
2226

    
2227
  def _CreateNewStorage(self, node_uuid):
2228
    """Create new storage on the primary or secondary node.
2229

2230
    This is only used for same-node replaces, not for changing the
2231
    secondary node, hence we don't want to modify the existing disk.
2232

2233
    """
2234
    iv_names = {}
2235

    
2236
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2237
    for idx, dev in enumerate(disks):
2238
      if idx not in self.disks:
2239
        continue
2240

    
2241
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2242
                      self.cfg.GetNodeName(node_uuid), idx)
2243

    
2244
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2245
      names = _GenerateUniqueNames(self.lu, lv_names)
2246

    
2247
      (data_disk, meta_disk) = dev.children
2248
      vg_data = data_disk.logical_id[0]
2249
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2250
                             logical_id=(vg_data, names[0]),
2251
                             params=data_disk.params)
2252
      vg_meta = meta_disk.logical_id[0]
2253
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2254
                             size=constants.DRBD_META_SIZE,
2255
                             logical_id=(vg_meta, names[1]),
2256
                             params=meta_disk.params)
2257

    
2258
      new_lvs = [lv_data, lv_meta]
2259
      old_lvs = [child.Copy() for child in dev.children]
2260
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2261
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2262

    
2263
      # we pass force_create=True to force the LVM creation
2264
      for new_lv in new_lvs:
2265
        try:
2266
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2267
                               GetInstanceInfoText(self.instance), False,
2268
                               excl_stor)
2269
        except errors.DeviceCreationError, e:
2270
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2271

    
2272
    return iv_names
2273

    
2274
  def _CheckDevices(self, node_uuid, iv_names):
2275
    for name, (dev, _, _) in iv_names.iteritems():
2276
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2277

    
2278
      msg = result.fail_msg
2279
      if msg or not result.payload:
2280
        if not msg:
2281
          msg = "disk not found"
2282
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2283
                                 (name, msg))
2284

    
2285
      if result.payload.is_degraded:
2286
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2287

    
2288
  def _RemoveOldStorage(self, node_uuid, iv_names):
2289
    for name, (_, old_lvs, _) in iv_names.iteritems():
2290
      self.lu.LogInfo("Remove logical volumes for %s", name)
2291

    
2292
      for lv in old_lvs:
2293
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2294
                .fail_msg
2295
        if msg:
2296
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2297
                             hint="remove unused LVs manually")
2298

    
2299
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2300
    """Replace a disk on the primary or secondary for DRBD 8.
2301

2302
    The algorithm for replace is quite complicated:
2303

2304
      1. for each disk to be replaced:
2305

2306
        1. create new LVs on the target node with unique names
2307
        1. detach old LVs from the drbd device
2308
        1. rename old LVs to name_replaced.<time_t>
2309
        1. rename new LVs to old LVs
2310
        1. attach the new LVs (with the old names now) to the drbd device
2311

2312
      1. wait for sync across all devices
2313

2314
      1. for each modified disk:
2315

2316
        1. remove old LVs (which have the name name_replaces.<time_t>)
2317

2318
    Failures are not very well handled.
2319

2320
    """
2321
    steps_total = 6
2322

    
2323
    # Step: check device activation
2324
    self.lu.LogStep(1, steps_total, "Check device existence")
2325
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2326
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2327

    
2328
    # Step: check other node consistency
2329
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2330
    self._CheckDisksConsistency(
2331
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2332
      False)
2333

    
2334
    # Step: create new storage
2335
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2336
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2337

    
2338
    # Step: for each lv, detach+rename*2+attach
2339
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2340
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2341
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2342

    
2343
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2344
                                                     (dev, self.instance),
2345
                                                     (old_lvs, self.instance))
2346
      result.Raise("Can't detach drbd from local storage on node"
2347
                   " %s for device %s" %
2348
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2349
      #dev.children = []
2350
      #cfg.Update(instance)
2351

    
2352
      # ok, we created the new LVs, so now we know we have the needed
2353
      # storage; as such, we proceed on the target node to rename
2354
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2355
      # using the assumption that logical_id == unique_id on that node
2356

    
2357
      # FIXME(iustin): use a better name for the replaced LVs
2358
      temp_suffix = int(time.time())
2359
      ren_fn = lambda d, suff: (d.logical_id[0],
2360
                                d.logical_id[1] + "_replaced-%s" % suff)
2361

    
2362
      # Build the rename list based on what LVs exist on the node
2363
      rename_old_to_new = []
2364
      for to_ren in old_lvs:
2365
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2366
                                             (to_ren, self.instance))
2367
        if not result.fail_msg and result.payload:
2368
          # device exists
2369
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2370

    
2371
      self.lu.LogInfo("Renaming the old LVs on the target node")
2372
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2373
                                             rename_old_to_new)
2374
      result.Raise("Can't rename old LVs on node %s" %
2375
                   self.cfg.GetNodeName(self.target_node_uuid))
2376

    
2377
      # Now we rename the new LVs to the old LVs
2378
      self.lu.LogInfo("Renaming the new LVs on the target node")
2379
      rename_new_to_old = [(new, old.logical_id)
2380
                           for old, new in zip(old_lvs, new_lvs)]
2381
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2382
                                             rename_new_to_old)
2383
      result.Raise("Can't rename new LVs on node %s" %
2384
                   self.cfg.GetNodeName(self.target_node_uuid))
2385

    
2386
      # Intermediate steps of in memory modifications
2387
      for old, new in zip(old_lvs, new_lvs):
2388
        new.logical_id = old.logical_id
2389

    
2390
      # We need to modify old_lvs so that removal later removes the
2391
      # right LVs, not the newly added ones; note that old_lvs is a
2392
      # copy here
2393
      for disk in old_lvs:
2394
        disk.logical_id = ren_fn(disk, temp_suffix)
2395

    
2396
      # Now that the new lvs have the old name, we can add them to the device
2397
      self.lu.LogInfo("Adding new mirror component on %s",
2398
                      self.cfg.GetNodeName(self.target_node_uuid))
2399
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2400
                                                  (dev, self.instance),
2401
                                                  (new_lvs, self.instance))
2402
      msg = result.fail_msg
2403
      if msg:
2404
        for new_lv in new_lvs:
2405
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2406
                                               (new_lv, self.instance)).fail_msg
2407
          if msg2:
2408
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2409
                               hint=("cleanup manually the unused logical"
2410
                                     "volumes"))
2411
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2412

    
2413
    cstep = itertools.count(5)
2414

    
2415
    if self.early_release:
2416
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2417
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2418
      # TODO: Check if releasing locks early still makes sense
2419
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2420
    else:
2421
      # Release all resource locks except those used by the instance
2422
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2423
                   keep=self.node_secondary_ip.keys())
2424

    
2425
    # Release all node locks while waiting for sync
2426
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2427

    
2428
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2429
    # shutdown in the caller into consideration.
2430

    
2431
    # Wait for sync
2432
    # This can fail as the old devices are degraded and _WaitForSync
2433
    # does a combined result over all disks, so we don't check its return value
2434
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2435
    WaitForSync(self.lu, self.instance)
2436

    
2437
    # Check all devices manually
2438
    self._CheckDevices(self.instance.primary_node, iv_names)
2439

    
2440
    # Step: remove old storage
2441
    if not self.early_release:
2442
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2443
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2444

    
2445
  def _ExecDrbd8Secondary(self, feedback_fn):
2446
    """Replace the secondary node for DRBD 8.
2447

2448
    The algorithm for replace is quite complicated:
2449
      - for all disks of the instance:
2450
        - create new LVs on the new node with same names
2451
        - shutdown the drbd device on the old secondary
2452
        - disconnect the drbd network on the primary
2453
        - create the drbd device on the new secondary
2454
        - network attach the drbd on the primary, using an artifice:
2455
          the drbd code for Attach() will connect to the network if it
2456
          finds a device which is connected to the good local disks but
2457
          not network enabled
2458
      - wait for sync across all devices
2459
      - remove all disks from the old secondary
2460

2461
    Failures are not very well handled.
2462

2463
    """
2464
    steps_total = 6
2465

    
2466
    pnode = self.instance.primary_node
2467

    
2468
    # Step: check device activation
2469
    self.lu.LogStep(1, steps_total, "Check device existence")
2470
    self._CheckDisksExistence([self.instance.primary_node])
2471
    self._CheckVolumeGroup([self.instance.primary_node])
2472

    
2473
    # Step: check other node consistency
2474
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2475
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2476

    
2477
    # Step: create new storage
2478
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2479
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2480
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2481
                                                  self.new_node_uuid)
2482
    for idx, dev in enumerate(disks):
2483
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2484
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2485
      # we pass force_create=True to force LVM creation
2486
      for new_lv in dev.children:
2487
        try:
2488
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2489
                               new_lv, True, GetInstanceInfoText(self.instance),
2490
                               False, excl_stor)
2491
        except errors.DeviceCreationError, e:
2492
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2493

    
2494
    # Step 4: dbrd minors and drbd setups changes
2495
    # after this, we must manually remove the drbd minors on both the
2496
    # error and the success paths
2497
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2498
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2499
                                         for _ in self.instance.disks],
2500
                                        self.instance.uuid)
2501
    logging.debug("Allocated minors %r", minors)
2502

    
2503
    iv_names = {}
2504
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2505
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2506
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2507
      # create new devices on new_node; note that we create two IDs:
2508
      # one without port, so the drbd will be activated without
2509
      # networking information on the new node at this stage, and one
2510
      # with network, for the latter activation in step 4
2511
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2512
      if self.instance.primary_node == o_node1:
2513
        p_minor = o_minor1
2514
      else:
2515
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2516
        p_minor = o_minor2
2517

    
2518
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2519
                      p_minor, new_minor, o_secret)
2520
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2521
                    p_minor, new_minor, o_secret)
2522

    
2523
      iv_names[idx] = (dev, dev.children, new_net_id)
2524
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2525
                    new_net_id)
2526
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2527
                              logical_id=new_alone_id,
2528
                              children=dev.children,
2529
                              size=dev.size,
2530
                              params={})
2531
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2532
                                            self.cfg)
2533
      try:
2534
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2535
                             anno_new_drbd,
2536
                             GetInstanceInfoText(self.instance), False,
2537
                             excl_stor)
2538
      except errors.GenericError:
2539
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2540
        raise
2541

    
2542
    # We have new devices, shutdown the drbd on the old secondary
2543
    for idx, dev in enumerate(self.instance.disks):
2544
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2545
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2546
                                            (dev, self.instance)).fail_msg
2547
      if msg:
2548
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2549
                           "node: %s" % (idx, msg),
2550
                           hint=("Please cleanup this device manually as"
2551
                                 " soon as possible"))
2552

    
2553
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2554
    result = self.rpc.call_drbd_disconnect_net(
2555
               [pnode], (self.instance.disks, self.instance))[pnode]
2556

    
2557
    msg = result.fail_msg
2558
    if msg:
2559
      # detaches didn't succeed (unlikely)
2560
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2561
      raise errors.OpExecError("Can't detach the disks from the network on"
2562
                               " old node: %s" % (msg,))
2563

    
2564
    # if we managed to detach at least one, we update all the disks of
2565
    # the instance to point to the new secondary
2566
    self.lu.LogInfo("Updating instance configuration")
2567
    for dev, _, new_logical_id in iv_names.itervalues():
2568
      dev.logical_id = new_logical_id
2569

    
2570
    self.cfg.Update(self.instance, feedback_fn)
2571

    
2572
    # Release all node locks (the configuration has been updated)
2573
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2574

    
2575
    # and now perform the drbd attach
2576
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2577
                    " (standalone => connected)")
2578
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2579
                                            self.new_node_uuid],
2580
                                           (self.instance.disks, self.instance),
2581
                                           self.instance.name,
2582
                                           False)
2583
    for to_node, to_result in result.items():
2584
      msg = to_result.fail_msg
2585
      if msg:
2586
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2587
                           self.cfg.GetNodeName(to_node), msg,
2588
                           hint=("please do a gnt-instance info to see the"
2589
                                 " status of disks"))
2590

    
2591
    cstep = itertools.count(5)
2592

    
2593
    if self.early_release:
2594
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2595
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2596
      # TODO: Check if releasing locks early still makes sense
2597
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2598
    else:
2599
      # Release all resource locks except those used by the instance
2600
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2601
                   keep=self.node_secondary_ip.keys())
2602

    
2603
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2604
    # shutdown in the caller into consideration.
2605

    
2606
    # Wait for sync
2607
    # This can fail as the old devices are degraded and _WaitForSync
2608
    # does a combined result over all disks, so we don't check its return value
2609
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2610
    WaitForSync(self.lu, self.instance)
2611

    
2612
    # Check all devices manually
2613
    self._CheckDevices(self.instance.primary_node, iv_names)
2614

    
2615
    # Step: remove old storage
2616
    if not self.early_release:
2617
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2618
      self._RemoveOldStorage(self.target_node_uuid, iv_names)