Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ bb3011ad

History | View | Annotate | Download (100.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
import ganeti.rpc.node as rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  constants.DT_FILE: ".file",
56
  constants.DT_SHARED_FILE: ".sharedfile",
57
  }
58

    
59

    
60
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61
                         excl_stor):
62
  """Create a single block device on a given node.
63

64
  This will not recurse over children of the device, so they must be
65
  created in advance.
66

67
  @param lu: the lu on whose behalf we execute
68
  @param node_uuid: the node on which to create the device
69
  @type instance: L{objects.Instance}
70
  @param instance: the instance which owns the device
71
  @type device: L{objects.Disk}
72
  @param device: the device to create
73
  @param info: the extra 'metadata' we should attach to the device
74
      (this will be represented as a LVM tag)
75
  @type force_open: boolean
76
  @param force_open: this parameter will be passes to the
77
      L{backend.BlockdevCreate} function where it specifies
78
      whether we run on primary or not, and it affects both
79
      the child assembly and the device own Open() execution
80
  @type excl_stor: boolean
81
  @param excl_stor: Whether exclusive_storage is active for the node
82

83
  """
84
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
85
                                       device.size, instance.name, force_open,
86
                                       info, excl_stor)
87
  result.Raise("Can't create block device %s on"
88
               " node %s for instance %s" % (device,
89
                                             lu.cfg.GetNodeName(node_uuid),
90
                                             instance.name))
91

    
92

    
93
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
94
                         info, force_open, excl_stor):
95
  """Create a tree of block devices on a given node.
96

97
  If this device type has to be created on secondaries, create it and
98
  all its children.
99

100
  If not, just recurse to children keeping the same 'force' value.
101

102
  @attention: The device has to be annotated already.
103

104
  @param lu: the lu on whose behalf we execute
105
  @param node_uuid: the node on which to create the device
106
  @type instance: L{objects.Instance}
107
  @param instance: the instance which owns the device
108
  @type device: L{objects.Disk}
109
  @param device: the device to create
110
  @type force_create: boolean
111
  @param force_create: whether to force creation of this device; this
112
      will be change to True whenever we find a device which has
113
      CreateOnSecondary() attribute
114
  @param info: the extra 'metadata' we should attach to the device
115
      (this will be represented as a LVM tag)
116
  @type force_open: boolean
117
  @param force_open: this parameter will be passes to the
118
      L{backend.BlockdevCreate} function where it specifies
119
      whether we run on primary or not, and it affects both
120
      the child assembly and the device own Open() execution
121
  @type excl_stor: boolean
122
  @param excl_stor: Whether exclusive_storage is active for the node
123

124
  @return: list of created devices
125
  """
126
  created_devices = []
127
  try:
128
    if device.CreateOnSecondary():
129
      force_create = True
130

    
131
    if device.children:
132
      for child in device.children:
133
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
134
                                    force_create, info, force_open, excl_stor)
135
        created_devices.extend(devs)
136

    
137
    if not force_create:
138
      return created_devices
139

    
140
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
141
                         excl_stor)
142
    # The device has been completely created, so there is no point in keeping
143
    # its subdevices in the list. We just add the device itself instead.
144
    created_devices = [(node_uuid, device)]
145
    return created_devices
146

    
147
  except errors.DeviceCreationError, e:
148
    e.created_devices.extend(created_devices)
149
    raise e
150
  except errors.OpExecError, e:
151
    raise errors.DeviceCreationError(str(e), created_devices)
152

    
153

    
154
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
155
  """Whether exclusive_storage is in effect for the given node.
156

157
  @type cfg: L{config.ConfigWriter}
158
  @param cfg: The cluster configuration
159
  @type node_uuid: string
160
  @param node_uuid: The node UUID
161
  @rtype: bool
162
  @return: The effective value of exclusive_storage
163
  @raise errors.OpPrereqError: if no node exists with the given name
164

165
  """
166
  ni = cfg.GetNodeInfo(node_uuid)
167
  if ni is None:
168
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
169
                               errors.ECODE_NOENT)
170
  return IsExclusiveStorageEnabledNode(cfg, ni)
171

    
172

    
173
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
174
                    force_open):
175
  """Wrapper around L{_CreateBlockDevInner}.
176

177
  This method annotates the root device first.
178

179
  """
180
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
181
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
182
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
183
                              force_open, excl_stor)
184

    
185

    
186
def _UndoCreateDisks(lu, disks_created, instance):
187
  """Undo the work performed by L{CreateDisks}.
188

189
  This function is called in case of an error to undo the work of
190
  L{CreateDisks}.
191

192
  @type lu: L{LogicalUnit}
193
  @param lu: the logical unit on whose behalf we execute
194
  @param disks_created: the result returned by L{CreateDisks}
195
  @type instance: L{objects.Instance}
196
  @param instance: the instance for which disks were created
197

198
  """
199
  for (node_uuid, disk) in disks_created:
200
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
201
    result.Warn("Failed to remove newly-created disk %s on node %s" %
202
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
203

    
204

    
205
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
206
  """Create all disks for an instance.
207

208
  This abstracts away some work from AddInstance.
209

210
  @type lu: L{LogicalUnit}
211
  @param lu: the logical unit on whose behalf we execute
212
  @type instance: L{objects.Instance}
213
  @param instance: the instance whose disks we should create
214
  @type to_skip: list
215
  @param to_skip: list of indices to skip
216
  @type target_node_uuid: string
217
  @param target_node_uuid: if passed, overrides the target node for creation
218
  @type disks: list of {objects.Disk}
219
  @param disks: the disks to create; if not specified, all the disks of the
220
      instance are created
221
  @return: information about the created disks, to be used to call
222
      L{_UndoCreateDisks}
223
  @raise errors.OpPrereqError: in case of error
224

225
  """
226
  info = GetInstanceInfoText(instance)
227
  inst_disks = lu.cfg.GetInstanceDisks(instance)
228
  if target_node_uuid is None:
229
    pnode_uuid = instance.primary_node
230
    all_node_uuids = lu.cfg.GetInstanceNodes(instance, disks=disks)
231
  else:
232
    pnode_uuid = target_node_uuid
233
    all_node_uuids = [pnode_uuid]
234

    
235
  if disks is None:
236
    disks = inst_disks
237

    
238
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
239

    
240
  if instance.disk_template in constants.DTS_FILEBASED:
241
    if inst_disks:
242
      file_storage_dir = os.path.dirname(inst_disks[0].logical_id[1])
243
    else:
244
      file_storage_dir = os.path.dirname(disks[0].logical_id[1])
245
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
246

    
247
    result.Raise("Failed to create directory '%s' on"
248
                 " node %s" % (file_storage_dir,
249
                               lu.cfg.GetNodeName(pnode_uuid)))
250

    
251
  disks_created = []
252
  for idx, device in enumerate(disks):
253
    if to_skip and idx in to_skip:
254
      continue
255
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
256
    for node_uuid in all_node_uuids:
257
      f_create = node_uuid == pnode_uuid
258
      try:
259
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
260
                        f_create)
261
        disks_created.append((node_uuid, device))
262
      except errors.DeviceCreationError, e:
263
        logging.warning("Creating disk %s for instance '%s' failed",
264
                        idx, instance.name)
265
        disks_created.extend(e.created_devices)
266
        _UndoCreateDisks(lu, disks_created, instance)
267
        raise errors.OpExecError(e.message)
268
  return disks_created
269

    
270

    
271
def ComputeDiskSizePerVG(disk_template, disks):
272
  """Compute disk size requirements in the volume group
273

274
  """
275
  def _compute(disks, payload):
276
    """Universal algorithm.
277

278
    """
279
    vgs = {}
280
    for disk in disks:
281
      vgs[disk[constants.IDISK_VG]] = \
282
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
283

    
284
    return vgs
285

    
286
  # Required free disk space as a function of disk and swap space
287
  req_size_dict = {
288
    constants.DT_DISKLESS: {},
289
    constants.DT_PLAIN: _compute(disks, 0),
290
    # 128 MB are added for drbd metadata for each disk
291
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
292
    constants.DT_FILE: {},
293
    constants.DT_SHARED_FILE: {},
294
    constants.DT_GLUSTER: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    for key in [
347
      constants.IDISK_METAVG,
348
      constants.IDISK_ADOPT,
349
      constants.IDISK_SPINDLES,
350
      ]:
351
      if key in disk:
352
        new_disk[key] = disk[key]
353

    
354
    # For extstorage, demand the `provider' option and add any
355
    # additional parameters (ext-params) to the dict
356
    if op.disk_template == constants.DT_EXT:
357
      if ext_provider:
358
        new_disk[constants.IDISK_PROVIDER] = ext_provider
359
        for key in disk:
360
          if key not in constants.IDISK_PARAMS:
361
            new_disk[key] = disk[key]
362
      else:
363
        raise errors.OpPrereqError("Missing provider for template '%s'" %
364
                                   constants.DT_EXT, errors.ECODE_INVAL)
365

    
366
    disks.append(new_disk)
367

    
368
  return disks
369

    
370

    
371
def CheckRADOSFreeSpace():
372
  """Compute disk size requirements inside the RADOS cluster.
373

374
  """
375
  # For the RADOS cluster we assume there is always enough space.
376
  pass
377

    
378

    
379
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
380
                         iv_name, p_minor, s_minor):
381
  """Generate a drbd8 device complete with its children.
382

383
  """
384
  assert len(vgnames) == len(names) == 2
385
  port = lu.cfg.AllocatePort()
386
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387

    
388
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
389
                          logical_id=(vgnames[0], names[0]),
390
                          params={})
391
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
393
                          size=constants.DRBD_META_SIZE,
394
                          logical_id=(vgnames[1], names[1]),
395
                          params={})
396
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
398
                          logical_id=(primary_uuid, secondary_uuid, port,
399
                                      p_minor, s_minor,
400
                                      shared_secret),
401
                          children=[dev_data, dev_meta],
402
                          iv_name=iv_name, params={})
403
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404
  return drbd_dev
405

    
406

    
407
def GenerateDiskTemplate(
408
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
409
  disk_info, file_storage_dir, file_driver, base_index,
410
  feedback_fn, full_disk_params):
411
  """Generate the entire disk layout for a given template type.
412

413
  """
414
  vgname = lu.cfg.GetVGName()
415
  disk_count = len(disk_info)
416
  disks = []
417

    
418
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
419

    
420
  if template_name == constants.DT_DISKLESS:
421
    pass
422
  elif template_name == constants.DT_DRBD8:
423
    if len(secondary_node_uuids) != 1:
424
      raise errors.ProgrammerError("Wrong template configuration")
425
    remote_node_uuid = secondary_node_uuids[0]
426
    minors = lu.cfg.AllocateDRBDMinor(
427
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
428

    
429
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
430
                                                       full_disk_params)
431
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
432

    
433
    names = []
434
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
435
                                               for i in range(disk_count)]):
436
      names.append(lv_prefix + "_data")
437
      names.append(lv_prefix + "_meta")
438
    for idx, disk in enumerate(disk_info):
439
      disk_index = idx + base_index
440
      data_vg = disk.get(constants.IDISK_VG, vgname)
441
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
442
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
443
                                      disk[constants.IDISK_SIZE],
444
                                      [data_vg, meta_vg],
445
                                      names[idx * 2:idx * 2 + 2],
446
                                      "disk/%d" % disk_index,
447
                                      minors[idx * 2], minors[idx * 2 + 1])
448
      disk_dev.mode = disk[constants.IDISK_MODE]
449
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
450
      disks.append(disk_dev)
451
  else:
452
    if secondary_node_uuids:
453
      raise errors.ProgrammerError("Wrong template configuration")
454

    
455
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
456
    if name_prefix is None:
457
      names = None
458
    else:
459
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
460
                                        (name_prefix, base_index + i)
461
                                        for i in range(disk_count)])
462

    
463
    if template_name == constants.DT_PLAIN:
464

    
465
      def logical_id_fn(idx, _, disk):
466
        vg = disk.get(constants.IDISK_VG, vgname)
467
        return (vg, names[idx])
468

    
469
    elif template_name == constants.DT_GLUSTER:
470
      logical_id_fn = lambda _1, disk_index, _2: \
471
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
472
                                        disk_index))
473

    
474
    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
475
      logical_id_fn = \
476
        lambda _, disk_index, disk: (file_driver,
477
                                     "%s/%s" % (file_storage_dir,
478
                                                names[idx]))
479
    elif template_name == constants.DT_BLOCK:
480
      logical_id_fn = \
481
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
482
                                       disk[constants.IDISK_ADOPT])
483
    elif template_name == constants.DT_RBD:
484
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
485
    elif template_name == constants.DT_EXT:
486
      def logical_id_fn(idx, _, disk):
487
        provider = disk.get(constants.IDISK_PROVIDER, None)
488
        if provider is None:
489
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
490
                                       " not found", constants.DT_EXT,
491
                                       constants.IDISK_PROVIDER)
492
        return (provider, names[idx])
493
    else:
494
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
495

    
496
    dev_type = template_name
497

    
498
    for idx, disk in enumerate(disk_info):
499
      params = {}
500
      # Only for the Ext template add disk_info to params
501
      if template_name == constants.DT_EXT:
502
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
503
        for key in disk:
504
          if key not in constants.IDISK_PARAMS:
505
            params[key] = disk[key]
506
      disk_index = idx + base_index
507
      size = disk[constants.IDISK_SIZE]
508
      feedback_fn("* disk %s, size %s" %
509
                  (disk_index, utils.FormatUnit(size, "h")))
510
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
511
                              logical_id=logical_id_fn(idx, disk_index, disk),
512
                              iv_name="disk/%d" % disk_index,
513
                              mode=disk[constants.IDISK_MODE],
514
                              params=params,
515
                              spindles=disk.get(constants.IDISK_SPINDLES))
516
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
517
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
518
      disks.append(disk_dev)
519

    
520
  return disks
521

    
522

    
523
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
524
  """Check the presence of the spindle options with exclusive_storage.
525

526
  @type diskdict: dict
527
  @param diskdict: disk parameters
528
  @type es_flag: bool
529
  @param es_flag: the effective value of the exlusive_storage flag
530
  @type required: bool
531
  @param required: whether spindles are required or just optional
532
  @raise errors.OpPrereqError when spindles are given and they should not
533

534
  """
535
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
536
      diskdict[constants.IDISK_SPINDLES] is not None):
537
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
538
                               " when exclusive storage is not active",
539
                               errors.ECODE_INVAL)
540
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
541
                                diskdict[constants.IDISK_SPINDLES] is None)):
542
    raise errors.OpPrereqError("You must specify spindles in instance disks"
543
                               " when exclusive storage is active",
544
                               errors.ECODE_INVAL)
545

    
546

    
547
class LUInstanceRecreateDisks(LogicalUnit):
548
  """Recreate an instance's missing disks.
549

550
  """
551
  HPATH = "instance-recreate-disks"
552
  HTYPE = constants.HTYPE_INSTANCE
553
  REQ_BGL = False
554

    
555
  _MODIFYABLE = compat.UniqueFrozenset([
556
    constants.IDISK_SIZE,
557
    constants.IDISK_MODE,
558
    constants.IDISK_SPINDLES,
559
    ])
560

    
561
  # New or changed disk parameters may have different semantics
562
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
563
    constants.IDISK_ADOPT,
564

    
565
    # TODO: Implement support changing VG while recreating
566
    constants.IDISK_VG,
567
    constants.IDISK_METAVG,
568
    constants.IDISK_PROVIDER,
569
    constants.IDISK_NAME,
570
    ]))
571

    
572
  def _RunAllocator(self):
573
    """Run the allocator based on input opcode.
574

575
    """
576
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
577

    
578
    # FIXME
579
    # The allocator should actually run in "relocate" mode, but current
580
    # allocators don't support relocating all the nodes of an instance at
581
    # the same time. As a workaround we use "allocate" mode, but this is
582
    # suboptimal for two reasons:
583
    # - The instance name passed to the allocator is present in the list of
584
    #   existing instances, so there could be a conflict within the
585
    #   internal structures of the allocator. This doesn't happen with the
586
    #   current allocators, but it's a liability.
587
    # - The allocator counts the resources used by the instance twice: once
588
    #   because the instance exists already, and once because it tries to
589
    #   allocate a new instance.
590
    # The allocator could choose some of the nodes on which the instance is
591
    # running, but that's not a problem. If the instance nodes are broken,
592
    # they should be already be marked as drained or offline, and hence
593
    # skipped by the allocator. If instance disks have been lost for other
594
    # reasons, then recreating the disks on the same nodes should be fine.
595
    disk_template = self.instance.disk_template
596
    spindle_use = be_full[constants.BE_SPINDLE_USE]
597
    disks = [{
598
      constants.IDISK_SIZE: d.size,
599
      constants.IDISK_MODE: d.mode,
600
      constants.IDISK_SPINDLES: d.spindles,
601
      } for d in self.cfg.GetInstanceDisks(self.instance)]
602
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
603
                                        disk_template=disk_template,
604
                                        tags=list(self.instance.GetTags()),
605
                                        os=self.instance.os,
606
                                        nics=[{}],
607
                                        vcpus=be_full[constants.BE_VCPUS],
608
                                        memory=be_full[constants.BE_MAXMEM],
609
                                        spindle_use=spindle_use,
610
                                        disks=disks,
611
                                        hypervisor=self.instance.hypervisor,
612
                                        node_whitelist=None)
613
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
614

    
615
    ial.Run(self.op.iallocator)
616

    
617
    assert req.RequiredNodes() == len(self.cfg.GetInstanceNodes(self.instance))
618

    
619
    if not ial.success:
620
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
621
                                 " %s" % (self.op.iallocator, ial.info),
622
                                 errors.ECODE_NORES)
623

    
624
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
625
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
626
                 self.op.instance_name, self.op.iallocator,
627
                 utils.CommaJoin(self.op.nodes))
628

    
629
  def CheckArguments(self):
630
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
631
      # Normalize and convert deprecated list of disk indices
632
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
633

    
634
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
635
    if duplicates:
636
      raise errors.OpPrereqError("Some disks have been specified more than"
637
                                 " once: %s" % utils.CommaJoin(duplicates),
638
                                 errors.ECODE_INVAL)
639

    
640
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
641
    # when neither iallocator nor nodes are specified
642
    if self.op.iallocator or self.op.nodes:
643
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
644

    
645
    for (idx, params) in self.op.disks:
646
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
647
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
648
      if unsupported:
649
        raise errors.OpPrereqError("Parameters for disk %s try to change"
650
                                   " unmodifyable parameter(s): %s" %
651
                                   (idx, utils.CommaJoin(unsupported)),
652
                                   errors.ECODE_INVAL)
653

    
654
  def ExpandNames(self):
655
    self._ExpandAndLockInstance()
656
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
657

    
658
    if self.op.nodes:
659
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
660
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
661
    else:
662
      self.needed_locks[locking.LEVEL_NODE] = []
663
      if self.op.iallocator:
664
        # iallocator will select a new node in the same group
665
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
666
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
667

    
668
    self.needed_locks[locking.LEVEL_NODE_RES] = []
669

    
670
  def DeclareLocks(self, level):
671
    if level == locking.LEVEL_NODEGROUP:
672
      assert self.op.iallocator is not None
673
      assert not self.op.nodes
674
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
675
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
676
      # Lock the primary group used by the instance optimistically; this
677
      # requires going via the node before it's locked, requiring
678
      # verification later on
679
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
680
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
681

    
682
    elif level == locking.LEVEL_NODE:
683
      # If an allocator is used, then we lock all the nodes in the current
684
      # instance group, as we don't know yet which ones will be selected;
685
      # if we replace the nodes without using an allocator, locks are
686
      # already declared in ExpandNames; otherwise, we need to lock all the
687
      # instance nodes for disk re-creation
688
      if self.op.iallocator:
689
        assert not self.op.nodes
690
        assert not self.needed_locks[locking.LEVEL_NODE]
691
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
692

    
693
        # Lock member nodes of the group of the primary node
694
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
695
          self.needed_locks[locking.LEVEL_NODE].extend(
696
            self.cfg.GetNodeGroup(group_uuid).members)
697

    
698
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
699
      elif not self.op.nodes:
700
        self._LockInstancesNodes(primary_only=False)
701
    elif level == locking.LEVEL_NODE_RES:
702
      # Copy node locks
703
      self.needed_locks[locking.LEVEL_NODE_RES] = \
704
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
705

    
706
  def BuildHooksEnv(self):
707
    """Build hooks env.
708

709
    This runs on master, primary and secondary nodes of the instance.
710

711
    """
712
    return BuildInstanceHookEnvByObject(self, self.instance)
713

    
714
  def BuildHooksNodes(self):
715
    """Build hooks nodes.
716

717
    """
718
    nl = [self.cfg.GetMasterNode()] + \
719
      list(self.cfg.GetInstanceNodes(self.instance))
720
    return (nl, nl)
721

    
722
  def CheckPrereq(self):
723
    """Check prerequisites.
724

725
    This checks that the instance is in the cluster and is not running.
726

727
    """
728
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
729
    assert instance is not None, \
730
      "Cannot retrieve locked instance %s" % self.op.instance_name
731
    if self.op.node_uuids:
732
      inst_nodes = self.cfg.GetInstanceNodes(instance)
733
      if len(self.op.node_uuids) != len(inst_nodes):
734
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
735
                                   " %d replacement nodes were specified" %
736
                                   (instance.name, len(inst_nodes),
737
                                    len(self.op.node_uuids)),
738
                                   errors.ECODE_INVAL)
739
      assert instance.disk_template != constants.DT_DRBD8 or \
740
             len(self.op.node_uuids) == 2
741
      assert instance.disk_template != constants.DT_PLAIN or \
742
             len(self.op.node_uuids) == 1
743
      primary_node = self.op.node_uuids[0]
744
    else:
745
      primary_node = instance.primary_node
746
    if not self.op.iallocator:
747
      CheckNodeOnline(self, primary_node)
748

    
749
    if instance.disk_template == constants.DT_DISKLESS:
750
      raise errors.OpPrereqError("Instance '%s' has no disks" %
751
                                 self.op.instance_name, errors.ECODE_INVAL)
752

    
753
    # Verify if node group locks are still correct
754
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
755
    if owned_groups:
756
      # Node group locks are acquired only for the primary node (and only
757
      # when the allocator is used)
758
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
759
                              primary_only=True)
760

    
761
    # if we replace nodes *and* the old primary is offline, we don't
762
    # check the instance state
763
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
764
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
765
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
766
                         msg="cannot recreate disks")
767

    
768
    if self.op.disks:
769
      self.disks = dict(self.op.disks)
770
    else:
771
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
772

    
773
    maxidx = max(self.disks.keys())
774
    if maxidx >= len(instance.disks):
775
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
776
                                 errors.ECODE_INVAL)
777

    
778
    if ((self.op.node_uuids or self.op.iallocator) and
779
         sorted(self.disks.keys()) != range(len(instance.disks))):
780
      raise errors.OpPrereqError("Can't recreate disks partially and"
781
                                 " change the nodes at the same time",
782
                                 errors.ECODE_INVAL)
783

    
784
    self.instance = instance
785

    
786
    if self.op.iallocator:
787
      self._RunAllocator()
788
      # Release unneeded node and node resource locks
789
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
790
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
791
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
792

    
793
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
794

    
795
    if self.op.node_uuids:
796
      node_uuids = self.op.node_uuids
797
    else:
798
      node_uuids = self.cfg.GetInstanceNodes(instance)
799
    excl_stor = compat.any(
800
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
801
      )
802
    for new_params in self.disks.values():
803
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
804

    
805
  def Exec(self, feedback_fn):
806
    """Recreate the disks.
807

808
    """
809
    assert (self.owned_locks(locking.LEVEL_NODE) ==
810
            self.owned_locks(locking.LEVEL_NODE_RES))
811

    
812
    to_skip = []
813
    mods = [] # keeps track of needed changes
814

    
815
    inst_disks = self.cfg.GetInstanceDisks(self.instance)
816
    for idx, disk in enumerate(inst_disks):
817
      try:
818
        changes = self.disks[idx]
819
      except KeyError:
820
        # Disk should not be recreated
821
        to_skip.append(idx)
822
        continue
823

    
824
      # update secondaries for disks, if needed
825
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
826
        # need to update the nodes and minors
827
        assert len(self.op.node_uuids) == 2
828
        assert len(disk.logical_id) == 6 # otherwise disk internals
829
                                         # have changed
830
        (_, _, old_port, _, _, old_secret) = disk.logical_id
831
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
832
                                                self.instance.uuid)
833
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
834
                  new_minors[0], new_minors[1], old_secret)
835
        assert len(disk.logical_id) == len(new_id)
836
      else:
837
        new_id = None
838

    
839
      mods.append((idx, new_id, changes))
840

    
841
    # now that we have passed all asserts above, we can apply the mods
842
    # in a single run (to avoid partial changes)
843
    for idx, new_id, changes in mods:
844
      disk = inst_disks[idx]
845
      if new_id is not None:
846
        assert disk.dev_type == constants.DT_DRBD8
847
        disk.logical_id = new_id
848
      if changes:
849
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
850
                    mode=changes.get(constants.IDISK_MODE, None),
851
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
852
      self.cfg.Update(disk, feedback_fn)
853

    
854
    # change primary node, if needed
855
    if self.op.node_uuids:
856
      self.instance.primary_node = self.op.node_uuids[0]
857
      self.LogWarning("Changing the instance's nodes, you will have to"
858
                      " remove any disks left on the older nodes manually")
859

    
860
    if self.op.node_uuids:
861
      self.cfg.Update(self.instance, feedback_fn)
862

    
863
    # All touched nodes must be locked
864
    mylocks = self.owned_locks(locking.LEVEL_NODE)
865
    inst_nodes = self.cfg.GetInstanceNodes(self.instance)
866
    assert mylocks.issuperset(frozenset(inst_nodes))
867
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
868

    
869
    # TODO: Release node locks before wiping, or explain why it's not possible
870
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
871
      inst_disks = self.cfg.GetInstanceDisks(self.instance)
872
      wipedisks = [(idx, disk, 0)
873
                   for (idx, disk) in enumerate(inst_disks)
874
                   if idx not in to_skip]
875
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
876
                         cleanup=new_disks)
877

    
878

    
879
def _PerformNodeInfoCall(lu, node_uuids, vg):
880
  """Prepares the input and performs a node info call.
881

882
  @type lu: C{LogicalUnit}
883
  @param lu: a logical unit from which we get configuration data
884
  @type node_uuids: list of string
885
  @param node_uuids: list of node UUIDs to perform the call for
886
  @type vg: string
887
  @param vg: the volume group's name
888

889
  """
890
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
891
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
892
                                                  node_uuids)
893
  hvname = lu.cfg.GetHypervisorType()
894
  hvparams = lu.cfg.GetClusterInfo().hvparams
895
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
896
                                   [(hvname, hvparams[hvname])])
897
  return nodeinfo
898

    
899

    
900
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
901
  """Checks the vg capacity for a given node.
902

903
  @type node_info: tuple (_, list of dicts, _)
904
  @param node_info: the result of the node info call for one node
905
  @type node_name: string
906
  @param node_name: the name of the node
907
  @type vg: string
908
  @param vg: volume group name
909
  @type requested: int
910
  @param requested: the amount of disk in MiB to check for
911
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
912
      or we cannot check the node
913

914
  """
915
  (_, space_info, _) = node_info
916
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
917
      space_info, constants.ST_LVM_VG)
918
  if not lvm_vg_info:
919
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
920
  vg_free = lvm_vg_info.get("storage_free", None)
921
  if not isinstance(vg_free, int):
922
    raise errors.OpPrereqError("Can't compute free disk space on node"
923
                               " %s for vg %s, result was '%s'" %
924
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
925
  if requested > vg_free:
926
    raise errors.OpPrereqError("Not enough disk space on target node %s"
927
                               " vg %s: required %d MiB, available %d MiB" %
928
                               (node_name, vg, requested, vg_free),
929
                               errors.ECODE_NORES)
930

    
931

    
932
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
933
  """Checks if nodes have enough free disk space in the specified VG.
934

935
  This function checks if all given nodes have the needed amount of
936
  free disk. In case any node has less disk or we cannot get the
937
  information from the node, this function raises an OpPrereqError
938
  exception.
939

940
  @type lu: C{LogicalUnit}
941
  @param lu: a logical unit from which we get configuration data
942
  @type node_uuids: C{list}
943
  @param node_uuids: the list of node UUIDs to check
944
  @type vg: C{str}
945
  @param vg: the volume group to check
946
  @type requested: C{int}
947
  @param requested: the amount of disk in MiB to check for
948
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
949
      or we cannot check the node
950

951
  """
952
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
953
  for node_uuid in node_uuids:
954
    node_name = lu.cfg.GetNodeName(node_uuid)
955
    info = nodeinfo[node_uuid]
956
    info.Raise("Cannot get current information from node %s" % node_name,
957
               prereq=True, ecode=errors.ECODE_ENVIRON)
958
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
959

    
960

    
961
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
962
  """Checks if nodes have enough free disk space in all the VGs.
963

964
  This function checks if all given nodes have the needed amount of
965
  free disk. In case any node has less disk or we cannot get the
966
  information from the node, this function raises an OpPrereqError
967
  exception.
968

969
  @type lu: C{LogicalUnit}
970
  @param lu: a logical unit from which we get configuration data
971
  @type node_uuids: C{list}
972
  @param node_uuids: the list of node UUIDs to check
973
  @type req_sizes: C{dict}
974
  @param req_sizes: the hash of vg and corresponding amount of disk in
975
      MiB to check for
976
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
977
      or we cannot check the node
978

979
  """
980
  for vg, req_size in req_sizes.items():
981
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
982

    
983

    
984
def _DiskSizeInBytesToMebibytes(lu, size):
985
  """Converts a disk size in bytes to mebibytes.
986

987
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
988

989
  """
990
  (mib, remainder) = divmod(size, 1024 * 1024)
991

    
992
  if remainder != 0:
993
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
994
                  " to not overwrite existing data (%s bytes will not be"
995
                  " wiped)", (1024 * 1024) - remainder)
996
    mib += 1
997

    
998
  return mib
999

    
1000

    
1001
def _CalcEta(time_taken, written, total_size):
1002
  """Calculates the ETA based on size written and total size.
1003

1004
  @param time_taken: The time taken so far
1005
  @param written: amount written so far
1006
  @param total_size: The total size of data to be written
1007
  @return: The remaining time in seconds
1008

1009
  """
1010
  avg_time = time_taken / float(written)
1011
  return (total_size - written) * avg_time
1012

    
1013

    
1014
def WipeDisks(lu, instance, disks=None):
1015
  """Wipes instance disks.
1016

1017
  @type lu: L{LogicalUnit}
1018
  @param lu: the logical unit on whose behalf we execute
1019
  @type instance: L{objects.Instance}
1020
  @param instance: the instance whose disks we should create
1021
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1022
  @param disks: Disk details; tuple contains disk index, disk object and the
1023
    start offset
1024

1025
  """
1026
  node_uuid = instance.primary_node
1027
  node_name = lu.cfg.GetNodeName(node_uuid)
1028

    
1029
  if disks is None:
1030
    inst_disks = lu.cfg.GetInstanceDisks(instance)
1031
    disks = [(idx, disk, 0)
1032
             for (idx, disk) in enumerate(inst_disks)]
1033

    
1034
  logging.info("Pausing synchronization of disks of instance '%s'",
1035
               instance.name)
1036
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1037
                                                  (map(compat.snd, disks),
1038
                                                   instance),
1039
                                                  True)
1040
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1041

    
1042
  for idx, success in enumerate(result.payload):
1043
    if not success:
1044
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1045
                   " failed", idx, instance.name)
1046

    
1047
  try:
1048
    for (idx, device, offset) in disks:
1049
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1050
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1051
      wipe_chunk_size = \
1052
        int(min(constants.MAX_WIPE_CHUNK,
1053
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1054

    
1055
      size = device.size
1056
      last_output = 0
1057
      start_time = time.time()
1058

    
1059
      if offset == 0:
1060
        info_text = ""
1061
      else:
1062
        info_text = (" (from %s to %s)" %
1063
                     (utils.FormatUnit(offset, "h"),
1064
                      utils.FormatUnit(size, "h")))
1065

    
1066
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1067

    
1068
      logging.info("Wiping disk %d for instance %s on node %s using"
1069
                   " chunk size %s", idx, instance.name, node_name,
1070
                   wipe_chunk_size)
1071

    
1072
      while offset < size:
1073
        wipe_size = min(wipe_chunk_size, size - offset)
1074

    
1075
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1076
                      idx, offset, wipe_size)
1077

    
1078
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1079
                                           offset, wipe_size)
1080
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1081
                     (idx, offset, wipe_size))
1082

    
1083
        now = time.time()
1084
        offset += wipe_size
1085
        if now - last_output >= 60:
1086
          eta = _CalcEta(now - start_time, offset, size)
1087
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1088
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1089
          last_output = now
1090
  finally:
1091
    logging.info("Resuming synchronization of disks for instance '%s'",
1092
                 instance.name)
1093

    
1094
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1095
                                                    (map(compat.snd, disks),
1096
                                                     instance),
1097
                                                    False)
1098

    
1099
    if result.fail_msg:
1100
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1101
                    node_name, result.fail_msg)
1102
    else:
1103
      for idx, success in enumerate(result.payload):
1104
        if not success:
1105
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1106
                        " failed", idx, instance.name)
1107

    
1108

    
1109
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1110
  """Wrapper for L{WipeDisks} that handles errors.
1111

1112
  @type lu: L{LogicalUnit}
1113
  @param lu: the logical unit on whose behalf we execute
1114
  @type instance: L{objects.Instance}
1115
  @param instance: the instance whose disks we should wipe
1116
  @param disks: see L{WipeDisks}
1117
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1118
      case of error
1119
  @raise errors.OpPrereqError: in case of failure
1120

1121
  """
1122
  try:
1123
    WipeDisks(lu, instance, disks=disks)
1124
  except errors.OpExecError:
1125
    logging.warning("Wiping disks for instance '%s' failed",
1126
                    instance.name)
1127
    _UndoCreateDisks(lu, cleanup, instance)
1128
    raise
1129

    
1130

    
1131
def ExpandCheckDisks(instance_disks, disks):
1132
  """Return the instance disks selected by the disks list
1133

1134
  @type disks: list of L{objects.Disk} or None
1135
  @param disks: selected disks
1136
  @rtype: list of L{objects.Disk}
1137
  @return: selected instance disks to act on
1138

1139
  """
1140
  if disks is None:
1141
    return instance_disks
1142
  else:
1143
    if not set(disks).issubset(instance_disks):
1144
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1145
                                   " target instance: expected a subset of %r,"
1146
                                   " got %r" % (instance_disks, disks))
1147
    return disks
1148

    
1149

    
1150
def WaitForSync(lu, instance, disks=None, oneshot=False):
1151
  """Sleep and poll for an instance's disk to sync.
1152

1153
  """
1154
  inst_disks = lu.cfg.GetInstanceDisks(instance)
1155
  if not inst_disks or disks is not None and not disks:
1156
    return True
1157

    
1158
  disks = ExpandCheckDisks(inst_disks, disks)
1159

    
1160
  if not oneshot:
1161
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1162

    
1163
  node_uuid = instance.primary_node
1164
  node_name = lu.cfg.GetNodeName(node_uuid)
1165

    
1166
  # TODO: Convert to utils.Retry
1167

    
1168
  retries = 0
1169
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1170
  while True:
1171
    max_time = 0
1172
    done = True
1173
    cumul_degraded = False
1174
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1175
    msg = rstats.fail_msg
1176
    if msg:
1177
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1178
      retries += 1
1179
      if retries >= 10:
1180
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1181
                                 " aborting." % node_name)
1182
      time.sleep(6)
1183
      continue
1184
    rstats = rstats.payload
1185
    retries = 0
1186
    for i, mstat in enumerate(rstats):
1187
      if mstat is None:
1188
        lu.LogWarning("Can't compute data for node %s/%s",
1189
                      node_name, disks[i].iv_name)
1190
        continue
1191

    
1192
      cumul_degraded = (cumul_degraded or
1193
                        (mstat.is_degraded and mstat.sync_percent is None))
1194
      if mstat.sync_percent is not None:
1195
        done = False
1196
        if mstat.estimated_time is not None:
1197
          rem_time = ("%s remaining (estimated)" %
1198
                      utils.FormatSeconds(mstat.estimated_time))
1199
          max_time = mstat.estimated_time
1200
        else:
1201
          rem_time = "no time estimate"
1202
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1203
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1204

    
1205
    # if we're done but degraded, let's do a few small retries, to
1206
    # make sure we see a stable and not transient situation; therefore
1207
    # we force restart of the loop
1208
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1209
      logging.info("Degraded disks found, %d retries left", degr_retries)
1210
      degr_retries -= 1
1211
      time.sleep(1)
1212
      continue
1213

    
1214
    if done or oneshot:
1215
      break
1216

    
1217
    time.sleep(min(60, max_time))
1218

    
1219
  if done:
1220
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1221

    
1222
  return not cumul_degraded
1223

    
1224

    
1225
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1226
  """Shutdown block devices of an instance.
1227

1228
  This does the shutdown on all nodes of the instance.
1229

1230
  If the ignore_primary is false, errors on the primary node are
1231
  ignored.
1232

1233
  """
1234
  all_result = True
1235

    
1236
  if disks is None:
1237
    # only mark instance disks as inactive if all disks are affected
1238
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1239
  inst_disks = lu.cfg.GetInstanceDisks(instance)
1240
  disks = ExpandCheckDisks(inst_disks, disks)
1241

    
1242
  for disk in disks:
1243
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1244
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1245
      msg = result.fail_msg
1246
      if msg:
1247
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1248
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1249
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1250
            (node_uuid != instance.primary_node and not result.offline)):
1251
          all_result = False
1252
  return all_result
1253

    
1254

    
1255
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1256
  """Shutdown block devices of an instance.
1257

1258
  This function checks if an instance is running, before calling
1259
  _ShutdownInstanceDisks.
1260

1261
  """
1262
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1263
  ShutdownInstanceDisks(lu, instance, disks=disks)
1264

    
1265

    
1266
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1267
                          ignore_size=False):
1268
  """Prepare the block devices for an instance.
1269

1270
  This sets up the block devices on all nodes.
1271

1272
  @type lu: L{LogicalUnit}
1273
  @param lu: the logical unit on whose behalf we execute
1274
  @type instance: L{objects.Instance}
1275
  @param instance: the instance for whose disks we assemble
1276
  @type disks: list of L{objects.Disk} or None
1277
  @param disks: which disks to assemble (or all, if None)
1278
  @type ignore_secondaries: boolean
1279
  @param ignore_secondaries: if true, errors on secondary nodes
1280
      won't result in an error return from the function
1281
  @type ignore_size: boolean
1282
  @param ignore_size: if true, the current known size of the disk
1283
      will not be used during the disk activation, useful for cases
1284
      when the size is wrong
1285
  @return: False if the operation failed, otherwise a list of
1286
      (host, instance_visible_name, node_visible_name)
1287
      with the mapping from node devices to instance devices
1288

1289
  """
1290
  device_info = []
1291
  disks_ok = True
1292

    
1293
  if disks is None:
1294
    # only mark instance disks as active if all disks are affected
1295
    lu.cfg.MarkInstanceDisksActive(instance.uuid)
1296

    
1297
  inst_disks = lu.cfg.GetInstanceDisks(instance)
1298
  disks = ExpandCheckDisks(inst_disks, disks)
1299

    
1300
  # With the two passes mechanism we try to reduce the window of
1301
  # opportunity for the race condition of switching DRBD to primary
1302
  # before handshaking occured, but we do not eliminate it
1303

    
1304
  # The proper fix would be to wait (with some limits) until the
1305
  # connection has been made and drbd transitions from WFConnection
1306
  # into any other network-connected state (Connected, SyncTarget,
1307
  # SyncSource, etc.)
1308

    
1309
  # 1st pass, assemble on all nodes in secondary mode
1310
  for idx, inst_disk in enumerate(disks):
1311
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1312
                                  instance.primary_node):
1313
      if ignore_size:
1314
        node_disk = node_disk.Copy()
1315
        node_disk.UnsetSize()
1316
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1317
                                             instance.name, False, idx)
1318
      msg = result.fail_msg
1319
      if msg:
1320
        secondary_nodes = lu.cfg.GetInstanceSecondaryNodes(instance)
1321
        is_offline_secondary = (node_uuid in secondary_nodes and
1322
                                result.offline)
1323
        lu.LogWarning("Could not prepare block device %s on node %s"
1324
                      " (is_primary=False, pass=1): %s",
1325
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1326
        if not (ignore_secondaries or is_offline_secondary):
1327
          disks_ok = False
1328

    
1329
  # FIXME: race condition on drbd migration to primary
1330

    
1331
  # 2nd pass, do only the primary node
1332
  for idx, inst_disk in enumerate(disks):
1333
    dev_path = None
1334

    
1335
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1336
                                  instance.primary_node):
1337
      if node_uuid != instance.primary_node:
1338
        continue
1339
      if ignore_size:
1340
        node_disk = node_disk.Copy()
1341
        node_disk.UnsetSize()
1342
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1343
                                             instance.name, True, idx)
1344
      msg = result.fail_msg
1345
      if msg:
1346
        lu.LogWarning("Could not prepare block device %s on node %s"
1347
                      " (is_primary=True, pass=2): %s",
1348
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1349
        disks_ok = False
1350
      else:
1351
        dev_path, _ = result.payload
1352

    
1353
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1354
                        inst_disk.iv_name, dev_path))
1355

    
1356
  if not disks_ok:
1357
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1358

    
1359
  return disks_ok, device_info
1360

    
1361

    
1362
def StartInstanceDisks(lu, instance, force):
1363
  """Start the disks of an instance.
1364

1365
  """
1366
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1367
                                      ignore_secondaries=force)
1368
  if not disks_ok:
1369
    ShutdownInstanceDisks(lu, instance)
1370
    if force is not None and not force:
1371
      lu.LogWarning("",
1372
                    hint=("If the message above refers to a secondary node,"
1373
                          " you can retry the operation using '--force'"))
1374
    raise errors.OpExecError("Disk consistency error")
1375

    
1376

    
1377
class LUInstanceGrowDisk(LogicalUnit):
1378
  """Grow a disk of an instance.
1379

1380
  """
1381
  HPATH = "disk-grow"
1382
  HTYPE = constants.HTYPE_INSTANCE
1383
  REQ_BGL = False
1384

    
1385
  def ExpandNames(self):
1386
    self._ExpandAndLockInstance()
1387
    self.needed_locks[locking.LEVEL_NODE] = []
1388
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1389
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1390
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1391

    
1392
  def DeclareLocks(self, level):
1393
    if level == locking.LEVEL_NODE:
1394
      self._LockInstancesNodes()
1395
    elif level == locking.LEVEL_NODE_RES:
1396
      # Copy node locks
1397
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1398
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1399

    
1400
  def BuildHooksEnv(self):
1401
    """Build hooks env.
1402

1403
    This runs on the master, the primary and all the secondaries.
1404

1405
    """
1406
    env = {
1407
      "DISK": self.op.disk,
1408
      "AMOUNT": self.op.amount,
1409
      "ABSOLUTE": self.op.absolute,
1410
      }
1411
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1412
    return env
1413

    
1414
  def BuildHooksNodes(self):
1415
    """Build hooks nodes.
1416

1417
    """
1418
    nl = [self.cfg.GetMasterNode()] + \
1419
      list(self.cfg.GetInstanceNodes(self.instance))
1420
    return (nl, nl)
1421

    
1422
  def CheckPrereq(self):
1423
    """Check prerequisites.
1424

1425
    This checks that the instance is in the cluster.
1426

1427
    """
1428
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1429
    assert self.instance is not None, \
1430
      "Cannot retrieve locked instance %s" % self.op.instance_name
1431
    node_uuids = list(self.cfg.GetInstanceNodes(self.instance))
1432
    for node_uuid in node_uuids:
1433
      CheckNodeOnline(self, node_uuid)
1434
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1435

    
1436
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1437
      raise errors.OpPrereqError("Instance's disk layout does not support"
1438
                                 " growing", errors.ECODE_INVAL)
1439

    
1440
    self.disk = self.cfg.GetDiskInfo(self.instance.FindDisk(self.op.disk))
1441

    
1442
    if self.op.absolute:
1443
      self.target = self.op.amount
1444
      self.delta = self.target - self.disk.size
1445
      if self.delta < 0:
1446
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1447
                                   "current disk size (%s)" %
1448
                                   (utils.FormatUnit(self.target, "h"),
1449
                                    utils.FormatUnit(self.disk.size, "h")),
1450
                                   errors.ECODE_STATE)
1451
    else:
1452
      self.delta = self.op.amount
1453
      self.target = self.disk.size + self.delta
1454
      if self.delta < 0:
1455
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1456
                                   utils.FormatUnit(self.delta, "h"),
1457
                                   errors.ECODE_INVAL)
1458

    
1459
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1460

    
1461
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1462
    template = self.instance.disk_template
1463
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1464
        not any(self.node_es_flags.values())):
1465
      # TODO: check the free disk space for file, when that feature will be
1466
      # supported
1467
      # With exclusive storage we need to do something smarter than just looking
1468
      # at free space, which, in the end, is basically a dry run. So we rely on
1469
      # the dry run performed in Exec() instead.
1470
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1471

    
1472
  def Exec(self, feedback_fn):
1473
    """Execute disk grow.
1474

1475
    """
1476
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1477
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1478
            self.owned_locks(locking.LEVEL_NODE_RES))
1479

    
1480
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1481

    
1482
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1483
    if not disks_ok:
1484
      raise errors.OpExecError("Cannot activate block device to grow")
1485

    
1486
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1487
                (self.op.disk, self.instance.name,
1488
                 utils.FormatUnit(self.delta, "h"),
1489
                 utils.FormatUnit(self.target, "h")))
1490

    
1491
    # First run all grow ops in dry-run mode
1492
    inst_nodes = self.cfg.GetInstanceNodes(self.instance)
1493
    for node_uuid in inst_nodes:
1494
      result = self.rpc.call_blockdev_grow(node_uuid,
1495
                                           (self.disk, self.instance),
1496
                                           self.delta, True, True,
1497
                                           self.node_es_flags[node_uuid])
1498
      result.Raise("Dry-run grow request failed to node %s" %
1499
                   self.cfg.GetNodeName(node_uuid))
1500

    
1501
    if wipe_disks:
1502
      # Get disk size from primary node for wiping
1503
      result = self.rpc.call_blockdev_getdimensions(
1504
                 self.instance.primary_node, [([self.disk], self.instance)])
1505
      result.Raise("Failed to retrieve disk size from node '%s'" %
1506
                   self.instance.primary_node)
1507

    
1508
      (disk_dimensions, ) = result.payload
1509

    
1510
      if disk_dimensions is None:
1511
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1512
                                 " node '%s'" % self.instance.primary_node)
1513
      (disk_size_in_bytes, _) = disk_dimensions
1514

    
1515
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1516

    
1517
      assert old_disk_size >= self.disk.size, \
1518
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1519
         (old_disk_size, self.disk.size))
1520
    else:
1521
      old_disk_size = None
1522

    
1523
    # We know that (as far as we can test) operations across different
1524
    # nodes will succeed, time to run it for real on the backing storage
1525
    for node_uuid in inst_nodes:
1526
      result = self.rpc.call_blockdev_grow(node_uuid,
1527
                                           (self.disk, self.instance),
1528
                                           self.delta, False, True,
1529
                                           self.node_es_flags[node_uuid])
1530
      result.Raise("Grow request failed to node %s" %
1531
                   self.cfg.GetNodeName(node_uuid))
1532

    
1533
    # And now execute it for logical storage, on the primary node
1534
    node_uuid = self.instance.primary_node
1535
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1536
                                         self.delta, False, False,
1537
                                         self.node_es_flags[node_uuid])
1538
    result.Raise("Grow request failed to node %s" %
1539
                 self.cfg.GetNodeName(node_uuid))
1540

    
1541
    self.disk.RecordGrow(self.delta)
1542
    self.cfg.Update(self.instance, feedback_fn)
1543

    
1544
    # Changes have been recorded, release node lock
1545
    ReleaseLocks(self, locking.LEVEL_NODE)
1546

    
1547
    # Downgrade lock while waiting for sync
1548
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1549

    
1550
    assert wipe_disks ^ (old_disk_size is None)
1551

    
1552
    if wipe_disks:
1553
      inst_disks = self.cfg.GetInstanceDisks(self.instance)
1554
      assert inst_disks[self.op.disk] == self.disk
1555

    
1556
      # Wipe newly added disk space
1557
      WipeDisks(self, self.instance,
1558
                disks=[(self.op.disk, self.disk, old_disk_size)])
1559

    
1560
    if self.op.wait_for_sync:
1561
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1562
      if disk_abort:
1563
        self.LogWarning("Disk syncing has not returned a good status; check"
1564
                        " the instance")
1565
      if not self.instance.disks_active:
1566
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1567
    elif not self.instance.disks_active:
1568
      self.LogWarning("Not shutting down the disk even if the instance is"
1569
                      " not supposed to be running because no wait for"
1570
                      " sync mode was requested")
1571

    
1572
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1573
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1574

    
1575

    
1576
class LUInstanceReplaceDisks(LogicalUnit):
1577
  """Replace the disks of an instance.
1578

1579
  """
1580
  HPATH = "mirrors-replace"
1581
  HTYPE = constants.HTYPE_INSTANCE
1582
  REQ_BGL = False
1583

    
1584
  def CheckArguments(self):
1585
    """Check arguments.
1586

1587
    """
1588
    if self.op.mode == constants.REPLACE_DISK_CHG:
1589
      if self.op.remote_node is None and self.op.iallocator is None:
1590
        raise errors.OpPrereqError("When changing the secondary either an"
1591
                                   " iallocator script must be used or the"
1592
                                   " new node given", errors.ECODE_INVAL)
1593
      else:
1594
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1595

    
1596
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1597
      # Not replacing the secondary
1598
      raise errors.OpPrereqError("The iallocator and new node options can"
1599
                                 " only be used when changing the"
1600
                                 " secondary node", errors.ECODE_INVAL)
1601

    
1602
  def ExpandNames(self):
1603
    self._ExpandAndLockInstance()
1604

    
1605
    assert locking.LEVEL_NODE not in self.needed_locks
1606
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1607
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1608

    
1609
    assert self.op.iallocator is None or self.op.remote_node is None, \
1610
      "Conflicting options"
1611

    
1612
    if self.op.remote_node is not None:
1613
      (self.op.remote_node_uuid, self.op.remote_node) = \
1614
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1615
                              self.op.remote_node)
1616

    
1617
      # Warning: do not remove the locking of the new secondary here
1618
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1619
      # currently it doesn't since parallel invocations of
1620
      # FindUnusedMinor will conflict
1621
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1622
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1623
    else:
1624
      self.needed_locks[locking.LEVEL_NODE] = []
1625
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1626

    
1627
      if self.op.iallocator is not None:
1628
        # iallocator will select a new node in the same group
1629
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1630
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1631

    
1632
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1633

    
1634
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1635
                                   self.op.instance_name, self.op.mode,
1636
                                   self.op.iallocator, self.op.remote_node_uuid,
1637
                                   self.op.disks, self.op.early_release,
1638
                                   self.op.ignore_ipolicy)
1639

    
1640
    self.tasklets = [self.replacer]
1641

    
1642
  def DeclareLocks(self, level):
1643
    if level == locking.LEVEL_NODEGROUP:
1644
      assert self.op.remote_node_uuid is None
1645
      assert self.op.iallocator is not None
1646
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1647

    
1648
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1649
      # Lock all groups used by instance optimistically; this requires going
1650
      # via the node before it's locked, requiring verification later on
1651
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1652
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1653

    
1654
    elif level == locking.LEVEL_NODE:
1655
      if self.op.iallocator is not None:
1656
        assert self.op.remote_node_uuid is None
1657
        assert not self.needed_locks[locking.LEVEL_NODE]
1658
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1659

    
1660
        # Lock member nodes of all locked groups
1661
        self.needed_locks[locking.LEVEL_NODE] = \
1662
          [node_uuid
1663
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1664
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1665
      else:
1666
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1667

    
1668
        self._LockInstancesNodes()
1669

    
1670
    elif level == locking.LEVEL_NODE_RES:
1671
      # Reuse node locks
1672
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1673
        self.needed_locks[locking.LEVEL_NODE]
1674

    
1675
  def BuildHooksEnv(self):
1676
    """Build hooks env.
1677

1678
    This runs on the master, the primary and all the secondaries.
1679

1680
    """
1681
    instance = self.replacer.instance
1682
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
1683
    env = {
1684
      "MODE": self.op.mode,
1685
      "NEW_SECONDARY": self.op.remote_node,
1686
      "OLD_SECONDARY": self.cfg.GetNodeName(secondary_nodes[0]),
1687
      }
1688
    env.update(BuildInstanceHookEnvByObject(self, instance))
1689
    return env
1690

    
1691
  def BuildHooksNodes(self):
1692
    """Build hooks nodes.
1693

1694
    """
1695
    instance = self.replacer.instance
1696
    nl = [
1697
      self.cfg.GetMasterNode(),
1698
      instance.primary_node,
1699
      ]
1700
    if self.op.remote_node_uuid is not None:
1701
      nl.append(self.op.remote_node_uuid)
1702
    return nl, nl
1703

    
1704
  def CheckPrereq(self):
1705
    """Check prerequisites.
1706

1707
    """
1708
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1709
            self.op.iallocator is None)
1710

    
1711
    # Verify if node group locks are still correct
1712
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1713
    if owned_groups:
1714
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1715

    
1716
    return LogicalUnit.CheckPrereq(self)
1717

    
1718

    
1719
class LUInstanceActivateDisks(NoHooksLU):
1720
  """Bring up an instance's disks.
1721

1722
  """
1723
  REQ_BGL = False
1724

    
1725
  def ExpandNames(self):
1726
    self._ExpandAndLockInstance()
1727
    self.needed_locks[locking.LEVEL_NODE] = []
1728
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1729

    
1730
  def DeclareLocks(self, level):
1731
    if level == locking.LEVEL_NODE:
1732
      self._LockInstancesNodes()
1733

    
1734
  def CheckPrereq(self):
1735
    """Check prerequisites.
1736

1737
    This checks that the instance is in the cluster.
1738

1739
    """
1740
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1741
    assert self.instance is not None, \
1742
      "Cannot retrieve locked instance %s" % self.op.instance_name
1743
    CheckNodeOnline(self, self.instance.primary_node)
1744

    
1745
  def Exec(self, feedback_fn):
1746
    """Activate the disks.
1747

1748
    """
1749
    disks_ok, disks_info = \
1750
              AssembleInstanceDisks(self, self.instance,
1751
                                    ignore_size=self.op.ignore_size)
1752
    if not disks_ok:
1753
      raise errors.OpExecError("Cannot activate block devices")
1754

    
1755
    if self.op.wait_for_sync:
1756
      if not WaitForSync(self, self.instance):
1757
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1758
        raise errors.OpExecError("Some disks of the instance are degraded!")
1759

    
1760
    return disks_info
1761

    
1762

    
1763
class LUInstanceDeactivateDisks(NoHooksLU):
1764
  """Shutdown an instance's disks.
1765

1766
  """
1767
  REQ_BGL = False
1768

    
1769
  def ExpandNames(self):
1770
    self._ExpandAndLockInstance()
1771
    self.needed_locks[locking.LEVEL_NODE] = []
1772
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1773

    
1774
  def DeclareLocks(self, level):
1775
    if level == locking.LEVEL_NODE:
1776
      self._LockInstancesNodes()
1777

    
1778
  def CheckPrereq(self):
1779
    """Check prerequisites.
1780

1781
    This checks that the instance is in the cluster.
1782

1783
    """
1784
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1785
    assert self.instance is not None, \
1786
      "Cannot retrieve locked instance %s" % self.op.instance_name
1787

    
1788
  def Exec(self, feedback_fn):
1789
    """Deactivate the disks
1790

1791
    """
1792
    if self.op.force:
1793
      ShutdownInstanceDisks(self, self.instance)
1794
    else:
1795
      _SafeShutdownInstanceDisks(self, self.instance)
1796

    
1797

    
1798
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1799
                               ldisk=False):
1800
  """Check that mirrors are not degraded.
1801

1802
  @attention: The device has to be annotated already.
1803

1804
  The ldisk parameter, if True, will change the test from the
1805
  is_degraded attribute (which represents overall non-ok status for
1806
  the device(s)) to the ldisk (representing the local storage status).
1807

1808
  """
1809
  result = True
1810

    
1811
  if on_primary or dev.AssembleOnSecondary():
1812
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1813
    msg = rstats.fail_msg
1814
    if msg:
1815
      lu.LogWarning("Can't find disk on node %s: %s",
1816
                    lu.cfg.GetNodeName(node_uuid), msg)
1817
      result = False
1818
    elif not rstats.payload:
1819
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1820
      result = False
1821
    else:
1822
      if ldisk:
1823
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1824
      else:
1825
        result = result and not rstats.payload.is_degraded
1826

    
1827
  if dev.children:
1828
    for child in dev.children:
1829
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1830
                                                     node_uuid, on_primary)
1831

    
1832
  return result
1833

    
1834

    
1835
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1836
  """Wrapper around L{_CheckDiskConsistencyInner}.
1837

1838
  """
1839
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1840
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1841
                                    ldisk=ldisk)
1842

    
1843

    
1844
def _BlockdevFind(lu, node_uuid, dev, instance):
1845
  """Wrapper around call_blockdev_find to annotate diskparams.
1846

1847
  @param lu: A reference to the lu object
1848
  @param node_uuid: The node to call out
1849
  @param dev: The device to find
1850
  @param instance: The instance object the device belongs to
1851
  @returns The result of the rpc call
1852

1853
  """
1854
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1855
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1856

    
1857

    
1858
def _GenerateUniqueNames(lu, exts):
1859
  """Generate a suitable LV name.
1860

1861
  This will generate a logical volume name for the given instance.
1862

1863
  """
1864
  results = []
1865
  for val in exts:
1866
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1867
    results.append("%s%s" % (new_id, val))
1868
  return results
1869

    
1870

    
1871
class TLReplaceDisks(Tasklet):
1872
  """Replaces disks for an instance.
1873

1874
  Note: Locking is not within the scope of this class.
1875

1876
  """
1877
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1878
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1879
    """Initializes this class.
1880

1881
    """
1882
    Tasklet.__init__(self, lu)
1883

    
1884
    # Parameters
1885
    self.instance_uuid = instance_uuid
1886
    self.instance_name = instance_name
1887
    self.mode = mode
1888
    self.iallocator_name = iallocator_name
1889
    self.remote_node_uuid = remote_node_uuid
1890
    self.disks = disks
1891
    self.early_release = early_release
1892
    self.ignore_ipolicy = ignore_ipolicy
1893

    
1894
    # Runtime data
1895
    self.instance = None
1896
    self.new_node_uuid = None
1897
    self.target_node_uuid = None
1898
    self.other_node_uuid = None
1899
    self.remote_node_info = None
1900
    self.node_secondary_ip = None
1901

    
1902
  @staticmethod
1903
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1904
                    relocate_from_node_uuids):
1905
    """Compute a new secondary node using an IAllocator.
1906

1907
    """
1908
    req = iallocator.IAReqRelocate(
1909
          inst_uuid=instance_uuid,
1910
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1911
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1912

    
1913
    ial.Run(iallocator_name)
1914

    
1915
    if not ial.success:
1916
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1917
                                 " %s" % (iallocator_name, ial.info),
1918
                                 errors.ECODE_NORES)
1919

    
1920
    remote_node_name = ial.result[0]
1921
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1922

    
1923
    if remote_node is None:
1924
      raise errors.OpPrereqError("Node %s not found in configuration" %
1925
                                 remote_node_name, errors.ECODE_NOENT)
1926

    
1927
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1928
               instance_uuid, remote_node_name)
1929

    
1930
    return remote_node.uuid
1931

    
1932
  def _FindFaultyDisks(self, node_uuid):
1933
    """Wrapper for L{FindFaultyInstanceDisks}.
1934

1935
    """
1936
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1937
                                   node_uuid, True)
1938

    
1939
  def _CheckDisksActivated(self, instance):
1940
    """Checks if the instance disks are activated.
1941

1942
    @param instance: The instance to check disks
1943
    @return: True if they are activated, False otherwise
1944

1945
    """
1946
    node_uuids = self.cfg.GetInstanceNodes(instance)
1947

    
1948
    for idx, dev in enumerate(self.cfg.GetInstanceDisks(instance)):
1949
      for node_uuid in node_uuids:
1950
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1951
                        self.cfg.GetNodeName(node_uuid))
1952

    
1953
        result = _BlockdevFind(self, node_uuid, dev, instance)
1954

    
1955
        if result.offline:
1956
          continue
1957
        elif result.fail_msg or not result.payload:
1958
          return False
1959

    
1960
    return True
1961

    
1962
  def CheckPrereq(self):
1963
    """Check prerequisites.
1964

1965
    This checks that the instance is in the cluster.
1966

1967
    """
1968
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1969
    assert self.instance is not None, \
1970
      "Cannot retrieve locked instance %s" % self.instance_name
1971

    
1972
    if self.instance.disk_template != constants.DT_DRBD8:
1973
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1974
                                 " instances", errors.ECODE_INVAL)
1975

    
1976
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance)
1977
    if len(secondary_nodes) != 1:
1978
      raise errors.OpPrereqError("The instance has a strange layout,"
1979
                                 " expected one secondary but found %d" %
1980
                                 len(secondary_nodes),
1981
                                 errors.ECODE_FAULT)
1982

    
1983
    secondary_node_uuid = secondary_nodes[0]
1984

    
1985
    if self.iallocator_name is None:
1986
      remote_node_uuid = self.remote_node_uuid
1987
    else:
1988
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1989
                                            self.instance.uuid,
1990
                                            secondary_nodes)
1991

    
1992
    if remote_node_uuid is None:
1993
      self.remote_node_info = None
1994
    else:
1995
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1996
             "Remote node '%s' is not locked" % remote_node_uuid
1997

    
1998
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1999
      assert self.remote_node_info is not None, \
2000
        "Cannot retrieve locked node %s" % remote_node_uuid
2001

    
2002
    if remote_node_uuid == self.instance.primary_node:
2003
      raise errors.OpPrereqError("The specified node is the primary node of"
2004
                                 " the instance", errors.ECODE_INVAL)
2005

    
2006
    if remote_node_uuid == secondary_node_uuid:
2007
      raise errors.OpPrereqError("The specified node is already the"
2008
                                 " secondary node of the instance",
2009
                                 errors.ECODE_INVAL)
2010

    
2011
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2012
                                    constants.REPLACE_DISK_CHG):
2013
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2014
                                 errors.ECODE_INVAL)
2015

    
2016
    if self.mode == constants.REPLACE_DISK_AUTO:
2017
      if not self._CheckDisksActivated(self.instance):
2018
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2019
                                   " first" % self.instance_name,
2020
                                   errors.ECODE_STATE)
2021
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2022
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2023

    
2024
      if faulty_primary and faulty_secondary:
2025
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2026
                                   " one node and can not be repaired"
2027
                                   " automatically" % self.instance_name,
2028
                                   errors.ECODE_STATE)
2029

    
2030
      if faulty_primary:
2031
        self.disks = faulty_primary
2032
        self.target_node_uuid = self.instance.primary_node
2033
        self.other_node_uuid = secondary_node_uuid
2034
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2035
      elif faulty_secondary:
2036
        self.disks = faulty_secondary
2037
        self.target_node_uuid = secondary_node_uuid
2038
        self.other_node_uuid = self.instance.primary_node
2039
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2040
      else:
2041
        self.disks = []
2042
        check_nodes = []
2043

    
2044
    else:
2045
      # Non-automatic modes
2046
      if self.mode == constants.REPLACE_DISK_PRI:
2047
        self.target_node_uuid = self.instance.primary_node
2048
        self.other_node_uuid = secondary_node_uuid
2049
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2050

    
2051
      elif self.mode == constants.REPLACE_DISK_SEC:
2052
        self.target_node_uuid = secondary_node_uuid
2053
        self.other_node_uuid = self.instance.primary_node
2054
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2055

    
2056
      elif self.mode == constants.REPLACE_DISK_CHG:
2057
        self.new_node_uuid = remote_node_uuid
2058
        self.other_node_uuid = self.instance.primary_node
2059
        self.target_node_uuid = secondary_node_uuid
2060
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2061

    
2062
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2063
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2064

    
2065
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2066
        assert old_node_info is not None
2067
        if old_node_info.offline and not self.early_release:
2068
          # doesn't make sense to delay the release
2069
          self.early_release = True
2070
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2071
                          " early-release mode", secondary_node_uuid)
2072

    
2073
      else:
2074
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2075
                                     self.mode)
2076

    
2077
      # If not specified all disks should be replaced
2078
      if not self.disks:
2079
        self.disks = range(len(self.instance.disks))
2080

    
2081
    # TODO: This is ugly, but right now we can't distinguish between internal
2082
    # submitted opcode and external one. We should fix that.
2083
    if self.remote_node_info:
2084
      # We change the node, lets verify it still meets instance policy
2085
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2086
      cluster = self.cfg.GetClusterInfo()
2087
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2088
                                                              new_group_info)
2089
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2090
                             self.remote_node_info, self.cfg,
2091
                             ignore=self.ignore_ipolicy)
2092

    
2093
    for node_uuid in check_nodes:
2094
      CheckNodeOnline(self.lu, node_uuid)
2095

    
2096
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2097
                                                          self.other_node_uuid,
2098
                                                          self.target_node_uuid]
2099
                              if node_uuid is not None)
2100

    
2101
    # Release unneeded node and node resource locks
2102
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2103
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2104
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2105

    
2106
    # Release any owned node group
2107
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2108

    
2109
    # Check whether disks are valid
2110
    for disk_idx in self.disks:
2111
      self.instance.FindDisk(disk_idx)
2112

    
2113
    # Get secondary node IP addresses
2114
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2115
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2116

    
2117
  def Exec(self, feedback_fn):
2118
    """Execute disk replacement.
2119

2120
    This dispatches the disk replacement to the appropriate handler.
2121

2122
    """
2123
    if __debug__:
2124
      # Verify owned locks before starting operation
2125
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2126
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2127
          ("Incorrect node locks, owning %s, expected %s" %
2128
           (owned_nodes, self.node_secondary_ip.keys()))
2129
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2130
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2131
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2132

    
2133
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2134
      assert list(owned_instances) == [self.instance_name], \
2135
          "Instance '%s' not locked" % self.instance_name
2136

    
2137
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2138
          "Should not own any node group lock at this point"
2139

    
2140
    if not self.disks:
2141
      feedback_fn("No disks need replacement for instance '%s'" %
2142
                  self.instance.name)
2143
      return
2144

    
2145
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2146
                (utils.CommaJoin(self.disks), self.instance.name))
2147
    feedback_fn("Current primary node: %s" %
2148
                self.cfg.GetNodeName(self.instance.primary_node))
2149
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(self.instance)
2150
    feedback_fn("Current seconary node: %s" %
2151
                utils.CommaJoin(self.cfg.GetNodeNames(secondary_nodes)))
2152

    
2153
    activate_disks = not self.instance.disks_active
2154

    
2155
    # Activate the instance disks if we're replacing them on a down instance
2156
    if activate_disks:
2157
      StartInstanceDisks(self.lu, self.instance, True)
2158

    
2159
    try:
2160
      # Should we replace the secondary node?
2161
      if self.new_node_uuid is not None:
2162
        fn = self._ExecDrbd8Secondary
2163
      else:
2164
        fn = self._ExecDrbd8DiskOnly
2165

    
2166
      result = fn(feedback_fn)
2167
    finally:
2168
      # Deactivate the instance disks if we're replacing them on a
2169
      # down instance
2170
      if activate_disks:
2171
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2172

    
2173
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2174

    
2175
    if __debug__:
2176
      # Verify owned locks
2177
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2178
      nodes = frozenset(self.node_secondary_ip)
2179
      assert ((self.early_release and not owned_nodes) or
2180
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2181
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2182
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2183

    
2184
    return result
2185

    
2186
  def _CheckVolumeGroup(self, node_uuids):
2187
    self.lu.LogInfo("Checking volume groups")
2188

    
2189
    vgname = self.cfg.GetVGName()
2190

    
2191
    # Make sure volume group exists on all involved nodes
2192
    results = self.rpc.call_vg_list(node_uuids)
2193
    if not results:
2194
      raise errors.OpExecError("Can't list volume groups on the nodes")
2195

    
2196
    for node_uuid in node_uuids:
2197
      res = results[node_uuid]
2198
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2199
      if vgname not in res.payload:
2200
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2201
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2202

    
2203
  def _CheckDisksExistence(self, node_uuids):
2204
    # Check disk existence
2205
    for idx, dev in enumerate(self.cfg.GetInstnaceDisks(self.instance)):
2206
      if idx not in self.disks:
2207
        continue
2208

    
2209
      for node_uuid in node_uuids:
2210
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2211
                        self.cfg.GetNodeName(node_uuid))
2212

    
2213
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2214

    
2215
        msg = result.fail_msg
2216
        if msg or not result.payload:
2217
          if not msg:
2218
            msg = "disk not found"
2219
          if not self._CheckDisksActivated(self.instance):
2220
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2221
                          " running activate-disks on the instance before"
2222
                          " using replace-disks.")
2223
          else:
2224
            extra_hint = ""
2225
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2226
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2227
                                    extra_hint))
2228

    
2229
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2230
    for idx, dev in enumerate(self.cfg.GetInstanceDisks(self.instance)):
2231
      if idx not in self.disks:
2232
        continue
2233

    
2234
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2235
                      (idx, self.cfg.GetNodeName(node_uuid)))
2236

    
2237
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2238
                                  on_primary, ldisk=ldisk):
2239
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2240
                                 " replace disks for instance %s" %
2241
                                 (self.cfg.GetNodeName(node_uuid),
2242
                                  self.instance.name))
2243

    
2244
  def _CreateNewStorage(self, node_uuid):
2245
    """Create new storage on the primary or secondary node.
2246

2247
    This is only used for same-node replaces, not for changing the
2248
    secondary node, hence we don't want to modify the existing disk.
2249

2250
    """
2251
    iv_names = {}
2252

    
2253
    inst_disks = self.cfg.GetInstanceDisks(self.instance)
2254
    disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
2255
    for idx, dev in enumerate(disks):
2256
      if idx not in self.disks:
2257
        continue
2258

    
2259
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2260
                      self.cfg.GetNodeName(node_uuid), idx)
2261

    
2262
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2263
      names = _GenerateUniqueNames(self.lu, lv_names)
2264

    
2265
      (data_disk, meta_disk) = dev.children
2266
      vg_data = data_disk.logical_id[0]
2267
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2268
                             logical_id=(vg_data, names[0]),
2269
                             params=data_disk.params)
2270
      vg_meta = meta_disk.logical_id[0]
2271
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2272
                             size=constants.DRBD_META_SIZE,
2273
                             logical_id=(vg_meta, names[1]),
2274
                             params=meta_disk.params)
2275

    
2276
      new_lvs = [lv_data, lv_meta]
2277
      old_lvs = [child.Copy() for child in dev.children]
2278
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2279
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2280

    
2281
      # we pass force_create=True to force the LVM creation
2282
      for new_lv in new_lvs:
2283
        try:
2284
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2285
                               GetInstanceInfoText(self.instance), False,
2286
                               excl_stor)
2287
        except errors.DeviceCreationError, e:
2288
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2289

    
2290
    return iv_names
2291

    
2292
  def _CheckDevices(self, node_uuid, iv_names):
2293
    for name, (dev, _, _) in iv_names.iteritems():
2294
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2295

    
2296
      msg = result.fail_msg
2297
      if msg or not result.payload:
2298
        if not msg:
2299
          msg = "disk not found"
2300
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2301
                                 (name, msg))
2302

    
2303
      if result.payload.is_degraded:
2304
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2305

    
2306
  def _RemoveOldStorage(self, node_uuid, iv_names):
2307
    for name, (_, old_lvs, _) in iv_names.iteritems():
2308
      self.lu.LogInfo("Remove logical volumes for %s", name)
2309

    
2310
      for lv in old_lvs:
2311
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2312
                .fail_msg
2313
        if msg:
2314
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2315
                             hint="remove unused LVs manually")
2316

    
2317
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2318
    """Replace a disk on the primary or secondary for DRBD 8.
2319

2320
    The algorithm for replace is quite complicated:
2321

2322
      1. for each disk to be replaced:
2323

2324
        1. create new LVs on the target node with unique names
2325
        1. detach old LVs from the drbd device
2326
        1. rename old LVs to name_replaced.<time_t>
2327
        1. rename new LVs to old LVs
2328
        1. attach the new LVs (with the old names now) to the drbd device
2329

2330
      1. wait for sync across all devices
2331

2332
      1. for each modified disk:
2333

2334
        1. remove old LVs (which have the name name_replaces.<time_t>)
2335

2336
    Failures are not very well handled.
2337

2338
    """
2339
    steps_total = 6
2340

    
2341
    # Step: check device activation
2342
    self.lu.LogStep(1, steps_total, "Check device existence")
2343
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2344
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2345

    
2346
    # Step: check other node consistency
2347
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2348
    self._CheckDisksConsistency(
2349
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2350
      False)
2351

    
2352
    # Step: create new storage
2353
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2354
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2355

    
2356
    # Step: for each lv, detach+rename*2+attach
2357
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2358
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2359
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2360

    
2361
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2362
                                                     (dev, self.instance),
2363
                                                     (old_lvs, self.instance))
2364
      result.Raise("Can't detach drbd from local storage on node"
2365
                   " %s for device %s" %
2366
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2367
      #dev.children = []
2368
      #cfg.Update(instance)
2369

    
2370
      # ok, we created the new LVs, so now we know we have the needed
2371
      # storage; as such, we proceed on the target node to rename
2372
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2373
      # using the assumption that logical_id == unique_id on that node
2374

    
2375
      # FIXME(iustin): use a better name for the replaced LVs
2376
      temp_suffix = int(time.time())
2377
      ren_fn = lambda d, suff: (d.logical_id[0],
2378
                                d.logical_id[1] + "_replaced-%s" % suff)
2379

    
2380
      # Build the rename list based on what LVs exist on the node
2381
      rename_old_to_new = []
2382
      for to_ren in old_lvs:
2383
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2384
                                             (to_ren, self.instance))
2385
        if not result.fail_msg and result.payload:
2386
          # device exists
2387
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2388

    
2389
      self.lu.LogInfo("Renaming the old LVs on the target node")
2390
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2391
                                             rename_old_to_new)
2392
      result.Raise("Can't rename old LVs on node %s" %
2393
                   self.cfg.GetNodeName(self.target_node_uuid))
2394

    
2395
      # Now we rename the new LVs to the old LVs
2396
      self.lu.LogInfo("Renaming the new LVs on the target node")
2397
      rename_new_to_old = [(new, old.logical_id)
2398
                           for old, new in zip(old_lvs, new_lvs)]
2399
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2400
                                             rename_new_to_old)
2401
      result.Raise("Can't rename new LVs on node %s" %
2402
                   self.cfg.GetNodeName(self.target_node_uuid))
2403

    
2404
      # Intermediate steps of in memory modifications
2405
      for old, new in zip(old_lvs, new_lvs):
2406
        new.logical_id = old.logical_id
2407

    
2408
      # We need to modify old_lvs so that removal later removes the
2409
      # right LVs, not the newly added ones; note that old_lvs is a
2410
      # copy here
2411
      for disk in old_lvs:
2412
        disk.logical_id = ren_fn(disk, temp_suffix)
2413

    
2414
      # Now that the new lvs have the old name, we can add them to the device
2415
      self.lu.LogInfo("Adding new mirror component on %s",
2416
                      self.cfg.GetNodeName(self.target_node_uuid))
2417
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2418
                                                  (dev, self.instance),
2419
                                                  (new_lvs, self.instance))
2420
      msg = result.fail_msg
2421
      if msg:
2422
        for new_lv in new_lvs:
2423
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2424
                                               (new_lv, self.instance)).fail_msg
2425
          if msg2:
2426
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2427
                               hint=("cleanup manually the unused logical"
2428
                                     "volumes"))
2429
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2430

    
2431
    cstep = itertools.count(5)
2432

    
2433
    if self.early_release:
2434
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2435
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2436
      # TODO: Check if releasing locks early still makes sense
2437
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2438
    else:
2439
      # Release all resource locks except those used by the instance
2440
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2441
                   keep=self.node_secondary_ip.keys())
2442

    
2443
    # Release all node locks while waiting for sync
2444
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2445

    
2446
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2447
    # shutdown in the caller into consideration.
2448

    
2449
    # Wait for sync
2450
    # This can fail as the old devices are degraded and _WaitForSync
2451
    # does a combined result over all disks, so we don't check its return value
2452
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2453
    WaitForSync(self.lu, self.instance)
2454

    
2455
    # Check all devices manually
2456
    self._CheckDevices(self.instance.primary_node, iv_names)
2457

    
2458
    # Step: remove old storage
2459
    if not self.early_release:
2460
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2461
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2462

    
2463
  def _ExecDrbd8Secondary(self, feedback_fn):
2464
    """Replace the secondary node for DRBD 8.
2465

2466
    The algorithm for replace is quite complicated:
2467
      - for all disks of the instance:
2468
        - create new LVs on the new node with same names
2469
        - shutdown the drbd device on the old secondary
2470
        - disconnect the drbd network on the primary
2471
        - create the drbd device on the new secondary
2472
        - network attach the drbd on the primary, using an artifice:
2473
          the drbd code for Attach() will connect to the network if it
2474
          finds a device which is connected to the good local disks but
2475
          not network enabled
2476
      - wait for sync across all devices
2477
      - remove all disks from the old secondary
2478

2479
    Failures are not very well handled.
2480

2481
    """
2482
    steps_total = 6
2483

    
2484
    pnode = self.instance.primary_node
2485

    
2486
    # Step: check device activation
2487
    self.lu.LogStep(1, steps_total, "Check device existence")
2488
    self._CheckDisksExistence([self.instance.primary_node])
2489
    self._CheckVolumeGroup([self.instance.primary_node])
2490

    
2491
    # Step: check other node consistency
2492
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2493
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2494

    
2495
    # Step: create new storage
2496
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2497
    inst_disks = self.cfg.GetInstanceDisks(self.instance)
2498
    disks = AnnotateDiskParams(self.instance, inst_disks, self.cfg)
2499
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2500
                                                  self.new_node_uuid)
2501
    for idx, dev in enumerate(disks):
2502
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2503
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2504
      # we pass force_create=True to force LVM creation
2505
      for new_lv in dev.children:
2506
        try:
2507
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2508
                               new_lv, True, GetInstanceInfoText(self.instance),
2509
                               False, excl_stor)
2510
        except errors.DeviceCreationError, e:
2511
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2512

    
2513
    # Step 4: dbrd minors and drbd setups changes
2514
    # after this, we must manually remove the drbd minors on both the
2515
    # error and the success paths
2516
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2517
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2518
                                         for _ in inst_disks],
2519
                                        self.instance.uuid)
2520
    logging.debug("Allocated minors %r", minors)
2521

    
2522
    iv_names = {}
2523
    for idx, (dev, new_minor) in enumerate(zip(inst_disks, minors)):
2524
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2525
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2526
      # create new devices on new_node; note that we create two IDs:
2527
      # one without port, so the drbd will be activated without
2528
      # networking information on the new node at this stage, and one
2529
      # with network, for the latter activation in step 4
2530
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2531
      if self.instance.primary_node == o_node1:
2532
        p_minor = o_minor1
2533
      else:
2534
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2535
        p_minor = o_minor2
2536

    
2537
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2538
                      p_minor, new_minor, o_secret)
2539
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2540
                    p_minor, new_minor, o_secret)
2541

    
2542
      iv_names[idx] = (dev, dev.children, new_net_id)
2543
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2544
                    new_net_id)
2545
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2546
                              logical_id=new_alone_id,
2547
                              children=dev.children,
2548
                              size=dev.size,
2549
                              params={})
2550
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2551
                                            self.cfg)
2552
      try:
2553
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2554
                             anno_new_drbd,
2555
                             GetInstanceInfoText(self.instance), False,
2556
                             excl_stor)
2557
      except errors.GenericError:
2558
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2559
        raise
2560

    
2561
    # We have new devices, shutdown the drbd on the old secondary
2562
    for idx, dev in enumerate(inst_disks):
2563
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2564
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2565
                                            (dev, self.instance)).fail_msg
2566
      if msg:
2567
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2568
                           "node: %s" % (idx, msg),
2569
                           hint=("Please cleanup this device manually as"
2570
                                 " soon as possible"))
2571

    
2572
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2573
    result = self.rpc.call_drbd_disconnect_net(
2574
               [pnode], (inst_disks, self.instance))[pnode]
2575

    
2576
    msg = result.fail_msg
2577
    if msg:
2578
      # detaches didn't succeed (unlikely)
2579
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2580
      raise errors.OpExecError("Can't detach the disks from the network on"
2581
                               " old node: %s" % (msg,))
2582

    
2583
    # if we managed to detach at least one, we update all the disks of
2584
    # the instance to point to the new secondary
2585
    self.lu.LogInfo("Updating instance configuration")
2586
    for dev, _, new_logical_id in iv_names.itervalues():
2587
      dev.logical_id = new_logical_id
2588
      self.cfg.Update(dev, feedback_fn)
2589

    
2590
    self.cfg.Update(self.instance, feedback_fn)
2591

    
2592
    # Release all node locks (the configuration has been updated)
2593
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2594

    
2595
    # and now perform the drbd attach
2596
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2597
                    " (standalone => connected)")
2598
    inst_disks = self.cfg.GetInstanceDisks(self.instance)
2599
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2600
                                            self.new_node_uuid],
2601
                                           (inst_disks, self.instance),
2602
                                           self.instance.name,
2603
                                           False)
2604
    for to_node, to_result in result.items():
2605
      msg = to_result.fail_msg
2606
      if msg:
2607
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2608
                           self.cfg.GetNodeName(to_node), msg,
2609
                           hint=("please do a gnt-instance info to see the"
2610
                                 " status of disks"))
2611

    
2612
    cstep = itertools.count(5)
2613

    
2614
    if self.early_release:
2615
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2616
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2617
      # TODO: Check if releasing locks early still makes sense
2618
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2619
    else:
2620
      # Release all resource locks except those used by the instance
2621
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2622
                   keep=self.node_secondary_ip.keys())
2623

    
2624
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2625
    # shutdown in the caller into consideration.
2626

    
2627
    # Wait for sync
2628
    # This can fail as the old devices are degraded and _WaitForSync
2629
    # does a combined result over all disks, so we don't check its return value
2630
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2631
    WaitForSync(self.lu, self.instance)
2632

    
2633
    # Check all devices manually
2634
    self._CheckDevices(self.instance.primary_node, iv_names)
2635

    
2636
    # Step: remove old storage
2637
    if not self.early_release:
2638
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2639
      self._RemoveOldStorage(self.target_node_uuid, iv_names)