Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 18397489

History | View | Annotate | Download (99.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node_uuid: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node_uuid)
93
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device,
98
                                             lu.cfg.GetNodeName(node_uuid),
99
                                             instance.name))
100
  if device.physical_id is None:
101
    device.physical_id = result.payload
102

    
103

    
104
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105
                         info, force_open, excl_stor):
106
  """Create a tree of block devices on a given node.
107

108
  If this device type has to be created on secondaries, create it and
109
  all its children.
110

111
  If not, just recurse to children keeping the same 'force' value.
112

113
  @attention: The device has to be annotated already.
114

115
  @param lu: the lu on whose behalf we execute
116
  @param node_uuid: the node on which to create the device
117
  @type instance: L{objects.Instance}
118
  @param instance: the instance which owns the device
119
  @type device: L{objects.Disk}
120
  @param device: the device to create
121
  @type force_create: boolean
122
  @param force_create: whether to force creation of this device; this
123
      will be change to True whenever we find a device which has
124
      CreateOnSecondary() attribute
125
  @param info: the extra 'metadata' we should attach to the device
126
      (this will be represented as a LVM tag)
127
  @type force_open: boolean
128
  @param force_open: this parameter will be passes to the
129
      L{backend.BlockdevCreate} function where it specifies
130
      whether we run on primary or not, and it affects both
131
      the child assembly and the device own Open() execution
132
  @type excl_stor: boolean
133
  @param excl_stor: Whether exclusive_storage is active for the node
134

135
  @return: list of created devices
136
  """
137
  created_devices = []
138
  try:
139
    if device.CreateOnSecondary():
140
      force_create = True
141

    
142
    if device.children:
143
      for child in device.children:
144
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145
                                    force_create, info, force_open, excl_stor)
146
        created_devices.extend(devs)
147

    
148
    if not force_create:
149
      return created_devices
150

    
151
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
152
                         excl_stor)
153
    # The device has been completely created, so there is no point in keeping
154
    # its subdevices in the list. We just add the device itself instead.
155
    created_devices = [(node_uuid, device)]
156
    return created_devices
157

    
158
  except errors.DeviceCreationError, e:
159
    e.created_devices.extend(created_devices)
160
    raise e
161
  except errors.OpExecError, e:
162
    raise errors.DeviceCreationError(str(e), created_devices)
163

    
164

    
165
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166
  """Whether exclusive_storage is in effect for the given node.
167

168
  @type cfg: L{config.ConfigWriter}
169
  @param cfg: The cluster configuration
170
  @type node_uuid: string
171
  @param node_uuid: The node UUID
172
  @rtype: bool
173
  @return: The effective value of exclusive_storage
174
  @raise errors.OpPrereqError: if no node exists with the given name
175

176
  """
177
  ni = cfg.GetNodeInfo(node_uuid)
178
  if ni is None:
179
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
180
                               errors.ECODE_NOENT)
181
  return IsExclusiveStorageEnabledNode(cfg, ni)
182

    
183

    
184
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
185
                    force_open):
186
  """Wrapper around L{_CreateBlockDevInner}.
187

188
  This method annotates the root device first.
189

190
  """
191
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194
                              force_open, excl_stor)
195

    
196

    
197
def _UndoCreateDisks(lu, disks_created):
198
  """Undo the work performed by L{CreateDisks}.
199

200
  This function is called in case of an error to undo the work of
201
  L{CreateDisks}.
202

203
  @type lu: L{LogicalUnit}
204
  @param lu: the logical unit on whose behalf we execute
205
  @param disks_created: the result returned by L{CreateDisks}
206

207
  """
208
  for (node_uuid, disk) in disks_created:
209
    lu.cfg.SetDiskID(disk, node_uuid)
210
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211
    result.Warn("Failed to remove newly-created disk %s on node %s" %
212
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213

    
214

    
215
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216
  """Create all disks for an instance.
217

218
  This abstracts away some work from AddInstance.
219

220
  @type lu: L{LogicalUnit}
221
  @param lu: the logical unit on whose behalf we execute
222
  @type instance: L{objects.Instance}
223
  @param instance: the instance whose disks we should create
224
  @type to_skip: list
225
  @param to_skip: list of indices to skip
226
  @type target_node_uuid: string
227
  @param target_node_uuid: if passed, overrides the target node for creation
228
  @type disks: list of {objects.Disk}
229
  @param disks: the disks to create; if not specified, all the disks of the
230
      instance are created
231
  @return: information about the created disks, to be used to call
232
      L{_UndoCreateDisks}
233
  @raise errors.OpPrereqError: in case of error
234

235
  """
236
  info = GetInstanceInfoText(instance)
237
  if target_node_uuid is None:
238
    pnode_uuid = instance.primary_node
239
    all_node_uuids = instance.all_nodes
240
  else:
241
    pnode_uuid = target_node_uuid
242
    all_node_uuids = [pnode_uuid]
243

    
244
  if disks is None:
245
    disks = instance.disks
246

    
247
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
248

    
249
  if instance.disk_template in constants.DTS_FILEBASED:
250
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
251
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
252

    
253
    result.Raise("Failed to create directory '%s' on"
254
                 " node %s" % (file_storage_dir,
255
                               lu.cfg.GetNodeName(pnode_uuid)))
256

    
257
  disks_created = []
258
  for idx, device in enumerate(disks):
259
    if to_skip and idx in to_skip:
260
      continue
261
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
262
    for node_uuid in all_node_uuids:
263
      f_create = node_uuid == pnode_uuid
264
      try:
265
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
266
                        f_create)
267
        disks_created.append((node_uuid, device))
268
      except errors.DeviceCreationError, e:
269
        logging.warning("Creating disk %s for instance '%s' failed",
270
                        idx, instance.name)
271
        disks_created.extend(e.created_devices)
272
        _UndoCreateDisks(lu, disks_created)
273
        raise errors.OpExecError(e.message)
274
  return disks_created
275

    
276

    
277
def ComputeDiskSizePerVG(disk_template, disks):
278
  """Compute disk size requirements in the volume group
279

280
  """
281
  def _compute(disks, payload):
282
    """Universal algorithm.
283

284
    """
285
    vgs = {}
286
    for disk in disks:
287
      vgs[disk[constants.IDISK_VG]] = \
288
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
289

    
290
    return vgs
291

    
292
  # Required free disk space as a function of disk and swap space
293
  req_size_dict = {
294
    constants.DT_DISKLESS: {},
295
    constants.DT_PLAIN: _compute(disks, 0),
296
    # 128 MB are added for drbd metadata for each disk
297
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
298
    constants.DT_FILE: {},
299
    constants.DT_SHARED_FILE: {},
300
    }
301

    
302
  if disk_template not in req_size_dict:
303
    raise errors.ProgrammerError("Disk template '%s' size requirement"
304
                                 " is unknown" % disk_template)
305

    
306
  return req_size_dict[disk_template]
307

    
308

    
309
def ComputeDisks(op, default_vg):
310
  """Computes the instance disks.
311

312
  @param op: The instance opcode
313
  @param default_vg: The default_vg to assume
314

315
  @return: The computed disks
316

317
  """
318
  disks = []
319
  for disk in op.disks:
320
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
321
    if mode not in constants.DISK_ACCESS_SET:
322
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
323
                                 mode, errors.ECODE_INVAL)
324
    size = disk.get(constants.IDISK_SIZE, None)
325
    if size is None:
326
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
327
    try:
328
      size = int(size)
329
    except (TypeError, ValueError):
330
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
331
                                 errors.ECODE_INVAL)
332

    
333
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
334
    if ext_provider and op.disk_template != constants.DT_EXT:
335
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
336
                                 " disk template, not %s" %
337
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
338
                                  op.disk_template), errors.ECODE_INVAL)
339

    
340
    data_vg = disk.get(constants.IDISK_VG, default_vg)
341
    name = disk.get(constants.IDISK_NAME, None)
342
    if name is not None and name.lower() == constants.VALUE_NONE:
343
      name = None
344
    new_disk = {
345
      constants.IDISK_SIZE: size,
346
      constants.IDISK_MODE: mode,
347
      constants.IDISK_VG: data_vg,
348
      constants.IDISK_NAME: name,
349
      }
350

    
351
    for key in [
352
      constants.IDISK_METAVG,
353
      constants.IDISK_ADOPT,
354
      constants.IDISK_SPINDLES,
355
      ]:
356
      if key in disk:
357
        new_disk[key] = disk[key]
358

    
359
    # For extstorage, demand the `provider' option and add any
360
    # additional parameters (ext-params) to the dict
361
    if op.disk_template == constants.DT_EXT:
362
      if ext_provider:
363
        new_disk[constants.IDISK_PROVIDER] = ext_provider
364
        for key in disk:
365
          if key not in constants.IDISK_PARAMS:
366
            new_disk[key] = disk[key]
367
      else:
368
        raise errors.OpPrereqError("Missing provider for template '%s'" %
369
                                   constants.DT_EXT, errors.ECODE_INVAL)
370

    
371
    disks.append(new_disk)
372

    
373
  return disks
374

    
375

    
376
def CheckRADOSFreeSpace():
377
  """Compute disk size requirements inside the RADOS cluster.
378

379
  """
380
  # For the RADOS cluster we assume there is always enough space.
381
  pass
382

    
383

    
384
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
385
                         iv_name, p_minor, s_minor):
386
  """Generate a drbd8 device complete with its children.
387

388
  """
389
  assert len(vgnames) == len(names) == 2
390
  port = lu.cfg.AllocatePort()
391
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
392

    
393
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
394
                          logical_id=(vgnames[0], names[0]),
395
                          params={})
396
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
398
                          size=constants.DRBD_META_SIZE,
399
                          logical_id=(vgnames[1], names[1]),
400
                          params={})
401
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
402
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
403
                          logical_id=(primary_uuid, secondary_uuid, port,
404
                                      p_minor, s_minor,
405
                                      shared_secret),
406
                          children=[dev_data, dev_meta],
407
                          iv_name=iv_name, params={})
408
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
409
  return drbd_dev
410

    
411

    
412
def GenerateDiskTemplate(
413
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
414
  disk_info, file_storage_dir, file_driver, base_index,
415
  feedback_fn, full_disk_params):
416
  """Generate the entire disk layout for a given template type.
417

418
  """
419
  vgname = lu.cfg.GetVGName()
420
  disk_count = len(disk_info)
421
  disks = []
422

    
423
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
424

    
425
  if template_name == constants.DT_DISKLESS:
426
    pass
427
  elif template_name == constants.DT_DRBD8:
428
    if len(secondary_node_uuids) != 1:
429
      raise errors.ProgrammerError("Wrong template configuration")
430
    remote_node_uuid = secondary_node_uuids[0]
431
    minors = lu.cfg.AllocateDRBDMinor(
432
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
433

    
434
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
435
                                                       full_disk_params)
436
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
437

    
438
    names = []
439
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
440
                                               for i in range(disk_count)]):
441
      names.append(lv_prefix + "_data")
442
      names.append(lv_prefix + "_meta")
443
    for idx, disk in enumerate(disk_info):
444
      disk_index = idx + base_index
445
      data_vg = disk.get(constants.IDISK_VG, vgname)
446
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
447
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
448
                                      disk[constants.IDISK_SIZE],
449
                                      [data_vg, meta_vg],
450
                                      names[idx * 2:idx * 2 + 2],
451
                                      "disk/%d" % disk_index,
452
                                      minors[idx * 2], minors[idx * 2 + 1])
453
      disk_dev.mode = disk[constants.IDISK_MODE]
454
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
455
      disks.append(disk_dev)
456
  else:
457
    if secondary_node_uuids:
458
      raise errors.ProgrammerError("Wrong template configuration")
459

    
460
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
461
    if name_prefix is None:
462
      names = None
463
    else:
464
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
465
                                        (name_prefix, base_index + i)
466
                                        for i in range(disk_count)])
467

    
468
    if template_name == constants.DT_PLAIN:
469

    
470
      def logical_id_fn(idx, _, disk):
471
        vg = disk.get(constants.IDISK_VG, vgname)
472
        return (vg, names[idx])
473

    
474
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
475
      logical_id_fn = \
476
        lambda _, disk_index, disk: (file_driver,
477
                                     "%s/disk%d" % (file_storage_dir,
478
                                                    disk_index))
479
    elif template_name == constants.DT_BLOCK:
480
      logical_id_fn = \
481
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
482
                                       disk[constants.IDISK_ADOPT])
483
    elif template_name == constants.DT_RBD:
484
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
485
    elif template_name == constants.DT_EXT:
486
      def logical_id_fn(idx, _, disk):
487
        provider = disk.get(constants.IDISK_PROVIDER, None)
488
        if provider is None:
489
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
490
                                       " not found", constants.DT_EXT,
491
                                       constants.IDISK_PROVIDER)
492
        return (provider, names[idx])
493
    else:
494
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
495

    
496
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
497

    
498
    for idx, disk in enumerate(disk_info):
499
      params = {}
500
      # Only for the Ext template add disk_info to params
501
      if template_name == constants.DT_EXT:
502
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
503
        for key in disk:
504
          if key not in constants.IDISK_PARAMS:
505
            params[key] = disk[key]
506
      disk_index = idx + base_index
507
      size = disk[constants.IDISK_SIZE]
508
      feedback_fn("* disk %s, size %s" %
509
                  (disk_index, utils.FormatUnit(size, "h")))
510
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
511
                              logical_id=logical_id_fn(idx, disk_index, disk),
512
                              iv_name="disk/%d" % disk_index,
513
                              mode=disk[constants.IDISK_MODE],
514
                              params=params,
515
                              spindles=disk.get(constants.IDISK_SPINDLES))
516
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
517
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
518
      disks.append(disk_dev)
519

    
520
  return disks
521

    
522

    
523
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
524
  """Check the presence of the spindle options with exclusive_storage.
525

526
  @type diskdict: dict
527
  @param diskdict: disk parameters
528
  @type es_flag: bool
529
  @param es_flag: the effective value of the exlusive_storage flag
530
  @type required: bool
531
  @param required: whether spindles are required or just optional
532
  @raise errors.OpPrereqError when spindles are given and they should not
533

534
  """
535
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
536
      diskdict[constants.IDISK_SPINDLES] is not None):
537
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
538
                               " when exclusive storage is not active",
539
                               errors.ECODE_INVAL)
540
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
541
                                diskdict[constants.IDISK_SPINDLES] is None)):
542
    raise errors.OpPrereqError("You must specify spindles in instance disks"
543
                               " when exclusive storage is active",
544
                               errors.ECODE_INVAL)
545

    
546

    
547
class LUInstanceRecreateDisks(LogicalUnit):
548
  """Recreate an instance's missing disks.
549

550
  """
551
  HPATH = "instance-recreate-disks"
552
  HTYPE = constants.HTYPE_INSTANCE
553
  REQ_BGL = False
554

    
555
  _MODIFYABLE = compat.UniqueFrozenset([
556
    constants.IDISK_SIZE,
557
    constants.IDISK_MODE,
558
    constants.IDISK_SPINDLES,
559
    ])
560

    
561
  # New or changed disk parameters may have different semantics
562
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
563
    constants.IDISK_ADOPT,
564

    
565
    # TODO: Implement support changing VG while recreating
566
    constants.IDISK_VG,
567
    constants.IDISK_METAVG,
568
    constants.IDISK_PROVIDER,
569
    constants.IDISK_NAME,
570
    ]))
571

    
572
  def _RunAllocator(self):
573
    """Run the allocator based on input opcode.
574

575
    """
576
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
577

    
578
    # FIXME
579
    # The allocator should actually run in "relocate" mode, but current
580
    # allocators don't support relocating all the nodes of an instance at
581
    # the same time. As a workaround we use "allocate" mode, but this is
582
    # suboptimal for two reasons:
583
    # - The instance name passed to the allocator is present in the list of
584
    #   existing instances, so there could be a conflict within the
585
    #   internal structures of the allocator. This doesn't happen with the
586
    #   current allocators, but it's a liability.
587
    # - The allocator counts the resources used by the instance twice: once
588
    #   because the instance exists already, and once because it tries to
589
    #   allocate a new instance.
590
    # The allocator could choose some of the nodes on which the instance is
591
    # running, but that's not a problem. If the instance nodes are broken,
592
    # they should be already be marked as drained or offline, and hence
593
    # skipped by the allocator. If instance disks have been lost for other
594
    # reasons, then recreating the disks on the same nodes should be fine.
595
    disk_template = self.instance.disk_template
596
    spindle_use = be_full[constants.BE_SPINDLE_USE]
597
    disks = [{
598
      constants.IDISK_SIZE: d.size,
599
      constants.IDISK_MODE: d.mode,
600
      constants.IDISK_SPINDLES: d.spindles,
601
      } for d in self.instance.disks]
602
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
603
                                        disk_template=disk_template,
604
                                        tags=list(self.instance.GetTags()),
605
                                        os=self.instance.os,
606
                                        nics=[{}],
607
                                        vcpus=be_full[constants.BE_VCPUS],
608
                                        memory=be_full[constants.BE_MAXMEM],
609
                                        spindle_use=spindle_use,
610
                                        disks=disks,
611
                                        hypervisor=self.instance.hypervisor,
612
                                        node_whitelist=None)
613
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
614

    
615
    ial.Run(self.op.iallocator)
616

    
617
    assert req.RequiredNodes() == len(self.instance.all_nodes)
618

    
619
    if not ial.success:
620
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
621
                                 " %s" % (self.op.iallocator, ial.info),
622
                                 errors.ECODE_NORES)
623

    
624
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
625
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
626
                 self.op.instance_name, self.op.iallocator,
627
                 utils.CommaJoin(self.op.nodes))
628

    
629
  def CheckArguments(self):
630
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
631
      # Normalize and convert deprecated list of disk indices
632
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
633

    
634
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
635
    if duplicates:
636
      raise errors.OpPrereqError("Some disks have been specified more than"
637
                                 " once: %s" % utils.CommaJoin(duplicates),
638
                                 errors.ECODE_INVAL)
639

    
640
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
641
    # when neither iallocator nor nodes are specified
642
    if self.op.iallocator or self.op.nodes:
643
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
644

    
645
    for (idx, params) in self.op.disks:
646
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
647
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
648
      if unsupported:
649
        raise errors.OpPrereqError("Parameters for disk %s try to change"
650
                                   " unmodifyable parameter(s): %s" %
651
                                   (idx, utils.CommaJoin(unsupported)),
652
                                   errors.ECODE_INVAL)
653

    
654
  def ExpandNames(self):
655
    self._ExpandAndLockInstance()
656
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
657

    
658
    if self.op.nodes:
659
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
660
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
661
    else:
662
      self.needed_locks[locking.LEVEL_NODE] = []
663
      if self.op.iallocator:
664
        # iallocator will select a new node in the same group
665
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
666
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
667

    
668
    self.needed_locks[locking.LEVEL_NODE_RES] = []
669

    
670
  def DeclareLocks(self, level):
671
    if level == locking.LEVEL_NODEGROUP:
672
      assert self.op.iallocator is not None
673
      assert not self.op.nodes
674
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
675
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
676
      # Lock the primary group used by the instance optimistically; this
677
      # requires going via the node before it's locked, requiring
678
      # verification later on
679
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
680
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
681

    
682
    elif level == locking.LEVEL_NODE:
683
      # If an allocator is used, then we lock all the nodes in the current
684
      # instance group, as we don't know yet which ones will be selected;
685
      # if we replace the nodes without using an allocator, locks are
686
      # already declared in ExpandNames; otherwise, we need to lock all the
687
      # instance nodes for disk re-creation
688
      if self.op.iallocator:
689
        assert not self.op.nodes
690
        assert not self.needed_locks[locking.LEVEL_NODE]
691
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
692

    
693
        # Lock member nodes of the group of the primary node
694
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
695
          self.needed_locks[locking.LEVEL_NODE].extend(
696
            self.cfg.GetNodeGroup(group_uuid).members)
697

    
698
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
699
      elif not self.op.nodes:
700
        self._LockInstancesNodes(primary_only=False)
701
    elif level == locking.LEVEL_NODE_RES:
702
      # Copy node locks
703
      self.needed_locks[locking.LEVEL_NODE_RES] = \
704
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
705

    
706
  def BuildHooksEnv(self):
707
    """Build hooks env.
708

709
    This runs on master, primary and secondary nodes of the instance.
710

711
    """
712
    return BuildInstanceHookEnvByObject(self, self.instance)
713

    
714
  def BuildHooksNodes(self):
715
    """Build hooks nodes.
716

717
    """
718
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
719
    return (nl, nl)
720

    
721
  def CheckPrereq(self):
722
    """Check prerequisites.
723

724
    This checks that the instance is in the cluster and is not running.
725

726
    """
727
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
728
    assert instance is not None, \
729
      "Cannot retrieve locked instance %s" % self.op.instance_name
730
    if self.op.node_uuids:
731
      if len(self.op.node_uuids) != len(instance.all_nodes):
732
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
733
                                   " %d replacement nodes were specified" %
734
                                   (instance.name, len(instance.all_nodes),
735
                                    len(self.op.node_uuids)),
736
                                   errors.ECODE_INVAL)
737
      assert instance.disk_template != constants.DT_DRBD8 or \
738
             len(self.op.node_uuids) == 2
739
      assert instance.disk_template != constants.DT_PLAIN or \
740
             len(self.op.node_uuids) == 1
741
      primary_node = self.op.node_uuids[0]
742
    else:
743
      primary_node = instance.primary_node
744
    if not self.op.iallocator:
745
      CheckNodeOnline(self, primary_node)
746

    
747
    if instance.disk_template == constants.DT_DISKLESS:
748
      raise errors.OpPrereqError("Instance '%s' has no disks" %
749
                                 self.op.instance_name, errors.ECODE_INVAL)
750

    
751
    # Verify if node group locks are still correct
752
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
753
    if owned_groups:
754
      # Node group locks are acquired only for the primary node (and only
755
      # when the allocator is used)
756
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
757
                              primary_only=True)
758

    
759
    # if we replace nodes *and* the old primary is offline, we don't
760
    # check the instance state
761
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
762
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
763
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
764
                         msg="cannot recreate disks")
765

    
766
    if self.op.disks:
767
      self.disks = dict(self.op.disks)
768
    else:
769
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
770

    
771
    maxidx = max(self.disks.keys())
772
    if maxidx >= len(instance.disks):
773
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
774
                                 errors.ECODE_INVAL)
775

    
776
    if ((self.op.node_uuids or self.op.iallocator) and
777
         sorted(self.disks.keys()) != range(len(instance.disks))):
778
      raise errors.OpPrereqError("Can't recreate disks partially and"
779
                                 " change the nodes at the same time",
780
                                 errors.ECODE_INVAL)
781

    
782
    self.instance = instance
783

    
784
    if self.op.iallocator:
785
      self._RunAllocator()
786
      # Release unneeded node and node resource locks
787
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
788
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
789
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
790

    
791
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
792

    
793
    if self.op.node_uuids:
794
      node_uuids = self.op.node_uuids
795
    else:
796
      node_uuids = instance.all_nodes
797
    excl_stor = compat.any(
798
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
799
      )
800
    for new_params in self.disks.values():
801
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
802

    
803
  def Exec(self, feedback_fn):
804
    """Recreate the disks.
805

806
    """
807
    assert (self.owned_locks(locking.LEVEL_NODE) ==
808
            self.owned_locks(locking.LEVEL_NODE_RES))
809

    
810
    to_skip = []
811
    mods = [] # keeps track of needed changes
812

    
813
    for idx, disk in enumerate(self.instance.disks):
814
      try:
815
        changes = self.disks[idx]
816
      except KeyError:
817
        # Disk should not be recreated
818
        to_skip.append(idx)
819
        continue
820

    
821
      # update secondaries for disks, if needed
822
      if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
823
        # need to update the nodes and minors
824
        assert len(self.op.node_uuids) == 2
825
        assert len(disk.logical_id) == 6 # otherwise disk internals
826
                                         # have changed
827
        (_, _, old_port, _, _, old_secret) = disk.logical_id
828
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
829
                                                self.instance.uuid)
830
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
831
                  new_minors[0], new_minors[1], old_secret)
832
        assert len(disk.logical_id) == len(new_id)
833
      else:
834
        new_id = None
835

    
836
      mods.append((idx, new_id, changes))
837

    
838
    # now that we have passed all asserts above, we can apply the mods
839
    # in a single run (to avoid partial changes)
840
    for idx, new_id, changes in mods:
841
      disk = self.instance.disks[idx]
842
      if new_id is not None:
843
        assert disk.dev_type == constants.LD_DRBD8
844
        disk.logical_id = new_id
845
      if changes:
846
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
847
                    mode=changes.get(constants.IDISK_MODE, None),
848
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
849

    
850
    # change primary node, if needed
851
    if self.op.node_uuids:
852
      self.instance.primary_node = self.op.node_uuids[0]
853
      self.LogWarning("Changing the instance's nodes, you will have to"
854
                      " remove any disks left on the older nodes manually")
855

    
856
    if self.op.node_uuids:
857
      self.cfg.Update(self.instance, feedback_fn)
858

    
859
    # All touched nodes must be locked
860
    mylocks = self.owned_locks(locking.LEVEL_NODE)
861
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
862
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
863

    
864
    # TODO: Release node locks before wiping, or explain why it's not possible
865
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
866
      wipedisks = [(idx, disk, 0)
867
                   for (idx, disk) in enumerate(self.instance.disks)
868
                   if idx not in to_skip]
869
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
870
                         cleanup=new_disks)
871

    
872

    
873
def _PerformNodeInfoCall(lu, node_uuids, vg):
874
  """Prepares the input and performs a node info call.
875

876
  @type lu: C{LogicalUnit}
877
  @param lu: a logical unit from which we get configuration data
878
  @type node_uuids: list of string
879
  @param node_uuids: list of node UUIDs to perform the call for
880
  @type vg: string
881
  @param vg: the volume group's name
882

883
  """
884
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
885
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
886
                                                  node_uuids)
887
  hvname = lu.cfg.GetHypervisorType()
888
  hvparams = lu.cfg.GetClusterInfo().hvparams
889
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
890
                                   [(hvname, hvparams[hvname])])
891
  return nodeinfo
892

    
893

    
894
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
895
  """Checks the vg capacity for a given node.
896

897
  @type node_info: tuple (_, list of dicts, _)
898
  @param node_info: the result of the node info call for one node
899
  @type node_name: string
900
  @param node_name: the name of the node
901
  @type vg: string
902
  @param vg: volume group name
903
  @type requested: int
904
  @param requested: the amount of disk in MiB to check for
905
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
906
      or we cannot check the node
907

908
  """
909
  (_, space_info, _) = node_info
910
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
911
      space_info, constants.ST_LVM_VG)
912
  if not lvm_vg_info:
913
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
914
  vg_free = lvm_vg_info.get("storage_free", None)
915
  if not isinstance(vg_free, int):
916
    raise errors.OpPrereqError("Can't compute free disk space on node"
917
                               " %s for vg %s, result was '%s'" %
918
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
919
  if requested > vg_free:
920
    raise errors.OpPrereqError("Not enough disk space on target node %s"
921
                               " vg %s: required %d MiB, available %d MiB" %
922
                               (node_name, vg, requested, vg_free),
923
                               errors.ECODE_NORES)
924

    
925

    
926
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
927
  """Checks if nodes have enough free disk space in the specified VG.
928

929
  This function checks if all given nodes have the needed amount of
930
  free disk. In case any node has less disk or we cannot get the
931
  information from the node, this function raises an OpPrereqError
932
  exception.
933

934
  @type lu: C{LogicalUnit}
935
  @param lu: a logical unit from which we get configuration data
936
  @type node_uuids: C{list}
937
  @param node_uuids: the list of node UUIDs to check
938
  @type vg: C{str}
939
  @param vg: the volume group to check
940
  @type requested: C{int}
941
  @param requested: the amount of disk in MiB to check for
942
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
943
      or we cannot check the node
944

945
  """
946
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
947
  for node in node_uuids:
948
    node_name = lu.cfg.GetNodeName(node)
949
    info = nodeinfo[node]
950
    info.Raise("Cannot get current information from node %s" % node_name,
951
               prereq=True, ecode=errors.ECODE_ENVIRON)
952
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
953

    
954

    
955
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
956
  """Checks if nodes have enough free disk space in all the VGs.
957

958
  This function checks if all given nodes have the needed amount of
959
  free disk. In case any node has less disk or we cannot get the
960
  information from the node, this function raises an OpPrereqError
961
  exception.
962

963
  @type lu: C{LogicalUnit}
964
  @param lu: a logical unit from which we get configuration data
965
  @type node_uuids: C{list}
966
  @param node_uuids: the list of node UUIDs to check
967
  @type req_sizes: C{dict}
968
  @param req_sizes: the hash of vg and corresponding amount of disk in
969
      MiB to check for
970
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
971
      or we cannot check the node
972

973
  """
974
  for vg, req_size in req_sizes.items():
975
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
976

    
977

    
978
def _DiskSizeInBytesToMebibytes(lu, size):
979
  """Converts a disk size in bytes to mebibytes.
980

981
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
982

983
  """
984
  (mib, remainder) = divmod(size, 1024 * 1024)
985

    
986
  if remainder != 0:
987
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
988
                  " to not overwrite existing data (%s bytes will not be"
989
                  " wiped)", (1024 * 1024) - remainder)
990
    mib += 1
991

    
992
  return mib
993

    
994

    
995
def _CalcEta(time_taken, written, total_size):
996
  """Calculates the ETA based on size written and total size.
997

998
  @param time_taken: The time taken so far
999
  @param written: amount written so far
1000
  @param total_size: The total size of data to be written
1001
  @return: The remaining time in seconds
1002

1003
  """
1004
  avg_time = time_taken / float(written)
1005
  return (total_size - written) * avg_time
1006

    
1007

    
1008
def WipeDisks(lu, instance, disks=None):
1009
  """Wipes instance disks.
1010

1011
  @type lu: L{LogicalUnit}
1012
  @param lu: the logical unit on whose behalf we execute
1013
  @type instance: L{objects.Instance}
1014
  @param instance: the instance whose disks we should create
1015
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1016
  @param disks: Disk details; tuple contains disk index, disk object and the
1017
    start offset
1018

1019
  """
1020
  node_uuid = instance.primary_node
1021
  node_name = lu.cfg.GetNodeName(node_uuid)
1022

    
1023
  if disks is None:
1024
    disks = [(idx, disk, 0)
1025
             for (idx, disk) in enumerate(instance.disks)]
1026

    
1027
  for (_, device, _) in disks:
1028
    lu.cfg.SetDiskID(device, node_uuid)
1029

    
1030
  logging.info("Pausing synchronization of disks of instance '%s'",
1031
               instance.name)
1032
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1033
                                                  (map(compat.snd, disks),
1034
                                                   instance),
1035
                                                  True)
1036
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1037

    
1038
  for idx, success in enumerate(result.payload):
1039
    if not success:
1040
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1041
                   " failed", idx, instance.name)
1042

    
1043
  try:
1044
    for (idx, device, offset) in disks:
1045
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1046
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1047
      wipe_chunk_size = \
1048
        int(min(constants.MAX_WIPE_CHUNK,
1049
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1050

    
1051
      size = device.size
1052
      last_output = 0
1053
      start_time = time.time()
1054

    
1055
      if offset == 0:
1056
        info_text = ""
1057
      else:
1058
        info_text = (" (from %s to %s)" %
1059
                     (utils.FormatUnit(offset, "h"),
1060
                      utils.FormatUnit(size, "h")))
1061

    
1062
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1063

    
1064
      logging.info("Wiping disk %d for instance %s on node %s using"
1065
                   " chunk size %s", idx, instance.name, node_name,
1066
                   wipe_chunk_size)
1067

    
1068
      while offset < size:
1069
        wipe_size = min(wipe_chunk_size, size - offset)
1070

    
1071
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1072
                      idx, offset, wipe_size)
1073

    
1074
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1075
                                           offset, wipe_size)
1076
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1077
                     (idx, offset, wipe_size))
1078

    
1079
        now = time.time()
1080
        offset += wipe_size
1081
        if now - last_output >= 60:
1082
          eta = _CalcEta(now - start_time, offset, size)
1083
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1084
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1085
          last_output = now
1086
  finally:
1087
    logging.info("Resuming synchronization of disks for instance '%s'",
1088
                 instance.name)
1089

    
1090
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1091
                                                    (map(compat.snd, disks),
1092
                                                     instance),
1093
                                                    False)
1094

    
1095
    if result.fail_msg:
1096
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1097
                    node_name, result.fail_msg)
1098
    else:
1099
      for idx, success in enumerate(result.payload):
1100
        if not success:
1101
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1102
                        " failed", idx, instance.name)
1103

    
1104

    
1105
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1106
  """Wrapper for L{WipeDisks} that handles errors.
1107

1108
  @type lu: L{LogicalUnit}
1109
  @param lu: the logical unit on whose behalf we execute
1110
  @type instance: L{objects.Instance}
1111
  @param instance: the instance whose disks we should wipe
1112
  @param disks: see L{WipeDisks}
1113
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1114
      case of error
1115
  @raise errors.OpPrereqError: in case of failure
1116

1117
  """
1118
  try:
1119
    WipeDisks(lu, instance, disks=disks)
1120
  except errors.OpExecError:
1121
    logging.warning("Wiping disks for instance '%s' failed",
1122
                    instance.name)
1123
    _UndoCreateDisks(lu, cleanup)
1124
    raise
1125

    
1126

    
1127
def ExpandCheckDisks(instance, disks):
1128
  """Return the instance disks selected by the disks list
1129

1130
  @type disks: list of L{objects.Disk} or None
1131
  @param disks: selected disks
1132
  @rtype: list of L{objects.Disk}
1133
  @return: selected instance disks to act on
1134

1135
  """
1136
  if disks is None:
1137
    return instance.disks
1138
  else:
1139
    if not set(disks).issubset(instance.disks):
1140
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1141
                                   " target instance: expected a subset of %r,"
1142
                                   " got %r" % (instance.disks, disks))
1143
    return disks
1144

    
1145

    
1146
def WaitForSync(lu, instance, disks=None, oneshot=False):
1147
  """Sleep and poll for an instance's disk to sync.
1148

1149
  """
1150
  if not instance.disks or disks is not None and not disks:
1151
    return True
1152

    
1153
  disks = ExpandCheckDisks(instance, disks)
1154

    
1155
  if not oneshot:
1156
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1157

    
1158
  node_uuid = instance.primary_node
1159
  node_name = lu.cfg.GetNodeName(node_uuid)
1160

    
1161
  for dev in disks:
1162
    lu.cfg.SetDiskID(dev, node_uuid)
1163

    
1164
  # TODO: Convert to utils.Retry
1165

    
1166
  retries = 0
1167
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1168
  while True:
1169
    max_time = 0
1170
    done = True
1171
    cumul_degraded = False
1172
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1173
    msg = rstats.fail_msg
1174
    if msg:
1175
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1176
      retries += 1
1177
      if retries >= 10:
1178
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1179
                                 " aborting." % node_name)
1180
      time.sleep(6)
1181
      continue
1182
    rstats = rstats.payload
1183
    retries = 0
1184
    for i, mstat in enumerate(rstats):
1185
      if mstat is None:
1186
        lu.LogWarning("Can't compute data for node %s/%s",
1187
                      node_name, disks[i].iv_name)
1188
        continue
1189

    
1190
      cumul_degraded = (cumul_degraded or
1191
                        (mstat.is_degraded and mstat.sync_percent is None))
1192
      if mstat.sync_percent is not None:
1193
        done = False
1194
        if mstat.estimated_time is not None:
1195
          rem_time = ("%s remaining (estimated)" %
1196
                      utils.FormatSeconds(mstat.estimated_time))
1197
          max_time = mstat.estimated_time
1198
        else:
1199
          rem_time = "no time estimate"
1200
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1201
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1202

    
1203
    # if we're done but degraded, let's do a few small retries, to
1204
    # make sure we see a stable and not transient situation; therefore
1205
    # we force restart of the loop
1206
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1207
      logging.info("Degraded disks found, %d retries left", degr_retries)
1208
      degr_retries -= 1
1209
      time.sleep(1)
1210
      continue
1211

    
1212
    if done or oneshot:
1213
      break
1214

    
1215
    time.sleep(min(60, max_time))
1216

    
1217
  if done:
1218
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1219

    
1220
  return not cumul_degraded
1221

    
1222

    
1223
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1224
  """Shutdown block devices of an instance.
1225

1226
  This does the shutdown on all nodes of the instance.
1227

1228
  If the ignore_primary is false, errors on the primary node are
1229
  ignored.
1230

1231
  """
1232
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1233
  all_result = True
1234
  disks = ExpandCheckDisks(instance, disks)
1235

    
1236
  for disk in disks:
1237
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1238
      lu.cfg.SetDiskID(top_disk, node_uuid)
1239
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1240
      msg = result.fail_msg
1241
      if msg:
1242
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1243
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1244
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1245
            (node_uuid != instance.primary_node and not result.offline)):
1246
          all_result = False
1247
  return all_result
1248

    
1249

    
1250
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1251
  """Shutdown block devices of an instance.
1252

1253
  This function checks if an instance is running, before calling
1254
  _ShutdownInstanceDisks.
1255

1256
  """
1257
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1258
  ShutdownInstanceDisks(lu, instance, disks=disks)
1259

    
1260

    
1261
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1262
                           ignore_size=False):
1263
  """Prepare the block devices for an instance.
1264

1265
  This sets up the block devices on all nodes.
1266

1267
  @type lu: L{LogicalUnit}
1268
  @param lu: the logical unit on whose behalf we execute
1269
  @type instance: L{objects.Instance}
1270
  @param instance: the instance for whose disks we assemble
1271
  @type disks: list of L{objects.Disk} or None
1272
  @param disks: which disks to assemble (or all, if None)
1273
  @type ignore_secondaries: boolean
1274
  @param ignore_secondaries: if true, errors on secondary nodes
1275
      won't result in an error return from the function
1276
  @type ignore_size: boolean
1277
  @param ignore_size: if true, the current known size of the disk
1278
      will not be used during the disk activation, useful for cases
1279
      when the size is wrong
1280
  @return: False if the operation failed, otherwise a list of
1281
      (host, instance_visible_name, node_visible_name)
1282
      with the mapping from node devices to instance devices
1283

1284
  """
1285
  device_info = []
1286
  disks_ok = True
1287
  disks = ExpandCheckDisks(instance, disks)
1288

    
1289
  # With the two passes mechanism we try to reduce the window of
1290
  # opportunity for the race condition of switching DRBD to primary
1291
  # before handshaking occured, but we do not eliminate it
1292

    
1293
  # The proper fix would be to wait (with some limits) until the
1294
  # connection has been made and drbd transitions from WFConnection
1295
  # into any other network-connected state (Connected, SyncTarget,
1296
  # SyncSource, etc.)
1297

    
1298
  # mark instance disks as active before doing actual work, so watcher does
1299
  # not try to shut them down erroneously
1300
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1301

    
1302
  # 1st pass, assemble on all nodes in secondary mode
1303
  for idx, inst_disk in enumerate(disks):
1304
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1305
                                  instance.primary_node):
1306
      if ignore_size:
1307
        node_disk = node_disk.Copy()
1308
        node_disk.UnsetSize()
1309
      lu.cfg.SetDiskID(node_disk, node_uuid)
1310
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1311
                                             instance.name, False, idx)
1312
      msg = result.fail_msg
1313
      if msg:
1314
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1315
                                result.offline)
1316
        lu.LogWarning("Could not prepare block device %s on node %s"
1317
                      " (is_primary=False, pass=1): %s",
1318
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1319
        if not (ignore_secondaries or is_offline_secondary):
1320
          disks_ok = False
1321

    
1322
  # FIXME: race condition on drbd migration to primary
1323

    
1324
  # 2nd pass, do only the primary node
1325
  for idx, inst_disk in enumerate(disks):
1326
    dev_path = None
1327

    
1328
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1329
                                  instance.primary_node):
1330
      if node_uuid != instance.primary_node:
1331
        continue
1332
      if ignore_size:
1333
        node_disk = node_disk.Copy()
1334
        node_disk.UnsetSize()
1335
      lu.cfg.SetDiskID(node_disk, node_uuid)
1336
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1337
                                             instance.name, True, idx)
1338
      msg = result.fail_msg
1339
      if msg:
1340
        lu.LogWarning("Could not prepare block device %s on node %s"
1341
                      " (is_primary=True, pass=2): %s",
1342
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1343
        disks_ok = False
1344
      else:
1345
        dev_path = result.payload
1346

    
1347
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1348
                        inst_disk.iv_name, dev_path))
1349

    
1350
  # leave the disks configured for the primary node
1351
  # this is a workaround that would be fixed better by
1352
  # improving the logical/physical id handling
1353
  for disk in disks:
1354
    lu.cfg.SetDiskID(disk, instance.primary_node)
1355

    
1356
  if not disks_ok:
1357
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1358

    
1359
  return disks_ok, device_info
1360

    
1361

    
1362
def StartInstanceDisks(lu, instance, force):
1363
  """Start the disks of an instance.
1364

1365
  """
1366
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1367
                                      ignore_secondaries=force)
1368
  if not disks_ok:
1369
    ShutdownInstanceDisks(lu, instance)
1370
    if force is not None and not force:
1371
      lu.LogWarning("",
1372
                    hint=("If the message above refers to a secondary node,"
1373
                          " you can retry the operation using '--force'"))
1374
    raise errors.OpExecError("Disk consistency error")
1375

    
1376

    
1377
class LUInstanceGrowDisk(LogicalUnit):
1378
  """Grow a disk of an instance.
1379

1380
  """
1381
  HPATH = "disk-grow"
1382
  HTYPE = constants.HTYPE_INSTANCE
1383
  REQ_BGL = False
1384

    
1385
  def ExpandNames(self):
1386
    self._ExpandAndLockInstance()
1387
    self.needed_locks[locking.LEVEL_NODE] = []
1388
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1389
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1390
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1391

    
1392
  def DeclareLocks(self, level):
1393
    if level == locking.LEVEL_NODE:
1394
      self._LockInstancesNodes()
1395
    elif level == locking.LEVEL_NODE_RES:
1396
      # Copy node locks
1397
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1398
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1399

    
1400
  def BuildHooksEnv(self):
1401
    """Build hooks env.
1402

1403
    This runs on the master, the primary and all the secondaries.
1404

1405
    """
1406
    env = {
1407
      "DISK": self.op.disk,
1408
      "AMOUNT": self.op.amount,
1409
      "ABSOLUTE": self.op.absolute,
1410
      }
1411
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1412
    return env
1413

    
1414
  def BuildHooksNodes(self):
1415
    """Build hooks nodes.
1416

1417
    """
1418
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1419
    return (nl, nl)
1420

    
1421
  def CheckPrereq(self):
1422
    """Check prerequisites.
1423

1424
    This checks that the instance is in the cluster.
1425

1426
    """
1427
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1428
    assert self.instance is not None, \
1429
      "Cannot retrieve locked instance %s" % self.op.instance_name
1430
    node_uuids = list(self.instance.all_nodes)
1431
    for node_uuid in node_uuids:
1432
      CheckNodeOnline(self, node_uuid)
1433
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1434

    
1435
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1436
      raise errors.OpPrereqError("Instance's disk layout does not support"
1437
                                 " growing", errors.ECODE_INVAL)
1438

    
1439
    self.disk = self.instance.FindDisk(self.op.disk)
1440

    
1441
    if self.op.absolute:
1442
      self.target = self.op.amount
1443
      self.delta = self.target - self.disk.size
1444
      if self.delta < 0:
1445
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1446
                                   "current disk size (%s)" %
1447
                                   (utils.FormatUnit(self.target, "h"),
1448
                                    utils.FormatUnit(self.disk.size, "h")),
1449
                                   errors.ECODE_STATE)
1450
    else:
1451
      self.delta = self.op.amount
1452
      self.target = self.disk.size + self.delta
1453
      if self.delta < 0:
1454
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1455
                                   utils.FormatUnit(self.delta, "h"),
1456
                                   errors.ECODE_INVAL)
1457

    
1458
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1459

    
1460
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1461
    template = self.instance.disk_template
1462
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1463
        not any(self.node_es_flags.values())):
1464
      # TODO: check the free disk space for file, when that feature will be
1465
      # supported
1466
      # With exclusive storage we need to do something smarter than just looking
1467
      # at free space, which, in the end, is basically a dry run. So we rely on
1468
      # the dry run performed in Exec() instead.
1469
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1470

    
1471
  def Exec(self, feedback_fn):
1472
    """Execute disk grow.
1473

1474
    """
1475
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1476
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1477
            self.owned_locks(locking.LEVEL_NODE_RES))
1478

    
1479
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1480

    
1481
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1482
    if not disks_ok:
1483
      raise errors.OpExecError("Cannot activate block device to grow")
1484

    
1485
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1486
                (self.op.disk, self.instance.name,
1487
                 utils.FormatUnit(self.delta, "h"),
1488
                 utils.FormatUnit(self.target, "h")))
1489

    
1490
    # First run all grow ops in dry-run mode
1491
    for node_uuid in self.instance.all_nodes:
1492
      self.cfg.SetDiskID(self.disk, node_uuid)
1493
      result = self.rpc.call_blockdev_grow(node_uuid,
1494
                                           (self.disk, self.instance),
1495
                                           self.delta, True, True,
1496
                                           self.node_es_flags[node_uuid])
1497
      result.Raise("Dry-run grow request failed to node %s" %
1498
                   self.cfg.GetNodeName(node_uuid))
1499

    
1500
    if wipe_disks:
1501
      # Get disk size from primary node for wiping
1502
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1503
      result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1504
                                                    [self.disk])
1505
      result.Raise("Failed to retrieve disk size from node '%s'" %
1506
                   self.instance.primary_node)
1507

    
1508
      (disk_dimensions, ) = result.payload
1509

    
1510
      if disk_dimensions is None:
1511
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1512
                                 " node '%s'" % self.instance.primary_node)
1513
      (disk_size_in_bytes, _) = disk_dimensions
1514

    
1515
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1516

    
1517
      assert old_disk_size >= self.disk.size, \
1518
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1519
         (old_disk_size, self.disk.size))
1520
    else:
1521
      old_disk_size = None
1522

    
1523
    # We know that (as far as we can test) operations across different
1524
    # nodes will succeed, time to run it for real on the backing storage
1525
    for node_uuid in self.instance.all_nodes:
1526
      self.cfg.SetDiskID(self.disk, node_uuid)
1527
      result = self.rpc.call_blockdev_grow(node_uuid,
1528
                                           (self.disk, self.instance),
1529
                                           self.delta, False, True,
1530
                                           self.node_es_flags[node_uuid])
1531
      result.Raise("Grow request failed to node %s" %
1532
                   self.cfg.GetNodeName(node_uuid))
1533

    
1534
    # And now execute it for logical storage, on the primary node
1535
    node_uuid = self.instance.primary_node
1536
    self.cfg.SetDiskID(self.disk, node_uuid)
1537
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1538
                                         self.delta, False, False,
1539
                                         self.node_es_flags[node_uuid])
1540
    result.Raise("Grow request failed to node %s" %
1541
                 self.cfg.GetNodeName(node_uuid))
1542

    
1543
    self.disk.RecordGrow(self.delta)
1544
    self.cfg.Update(self.instance, feedback_fn)
1545

    
1546
    # Changes have been recorded, release node lock
1547
    ReleaseLocks(self, locking.LEVEL_NODE)
1548

    
1549
    # Downgrade lock while waiting for sync
1550
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1551

    
1552
    assert wipe_disks ^ (old_disk_size is None)
1553

    
1554
    if wipe_disks:
1555
      assert self.instance.disks[self.op.disk] == self.disk
1556

    
1557
      # Wipe newly added disk space
1558
      WipeDisks(self, self.instance,
1559
                disks=[(self.op.disk, self.disk, old_disk_size)])
1560

    
1561
    if self.op.wait_for_sync:
1562
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1563
      if disk_abort:
1564
        self.LogWarning("Disk syncing has not returned a good status; check"
1565
                        " the instance")
1566
      if not self.instance.disks_active:
1567
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1568
    elif not self.instance.disks_active:
1569
      self.LogWarning("Not shutting down the disk even if the instance is"
1570
                      " not supposed to be running because no wait for"
1571
                      " sync mode was requested")
1572

    
1573
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1574
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1575

    
1576

    
1577
class LUInstanceReplaceDisks(LogicalUnit):
1578
  """Replace the disks of an instance.
1579

1580
  """
1581
  HPATH = "mirrors-replace"
1582
  HTYPE = constants.HTYPE_INSTANCE
1583
  REQ_BGL = False
1584

    
1585
  def CheckArguments(self):
1586
    """Check arguments.
1587

1588
    """
1589
    if self.op.mode == constants.REPLACE_DISK_CHG:
1590
      if self.op.remote_node is None and self.op.iallocator is None:
1591
        raise errors.OpPrereqError("When changing the secondary either an"
1592
                                   " iallocator script must be used or the"
1593
                                   " new node given", errors.ECODE_INVAL)
1594
      else:
1595
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1596

    
1597
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1598
      # Not replacing the secondary
1599
      raise errors.OpPrereqError("The iallocator and new node options can"
1600
                                 " only be used when changing the"
1601
                                 " secondary node", errors.ECODE_INVAL)
1602

    
1603
  def ExpandNames(self):
1604
    self._ExpandAndLockInstance()
1605

    
1606
    assert locking.LEVEL_NODE not in self.needed_locks
1607
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1608
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1609

    
1610
    assert self.op.iallocator is None or self.op.remote_node is None, \
1611
      "Conflicting options"
1612

    
1613
    if self.op.remote_node is not None:
1614
      (self.op.remote_node_uuid, self.op.remote_node) = \
1615
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1616
                              self.op.remote_node)
1617

    
1618
      # Warning: do not remove the locking of the new secondary here
1619
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1620
      # currently it doesn't since parallel invocations of
1621
      # FindUnusedMinor will conflict
1622
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1623
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1624
    else:
1625
      self.needed_locks[locking.LEVEL_NODE] = []
1626
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1627

    
1628
      if self.op.iallocator is not None:
1629
        # iallocator will select a new node in the same group
1630
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1631
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1632

    
1633
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1634

    
1635
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1636
                                   self.op.instance_name, self.op.mode,
1637
                                   self.op.iallocator, self.op.remote_node_uuid,
1638
                                   self.op.disks, self.op.early_release,
1639
                                   self.op.ignore_ipolicy)
1640

    
1641
    self.tasklets = [self.replacer]
1642

    
1643
  def DeclareLocks(self, level):
1644
    if level == locking.LEVEL_NODEGROUP:
1645
      assert self.op.remote_node_uuid is None
1646
      assert self.op.iallocator is not None
1647
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1648

    
1649
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1650
      # Lock all groups used by instance optimistically; this requires going
1651
      # via the node before it's locked, requiring verification later on
1652
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1653
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1654

    
1655
    elif level == locking.LEVEL_NODE:
1656
      if self.op.iallocator is not None:
1657
        assert self.op.remote_node_uuid is None
1658
        assert not self.needed_locks[locking.LEVEL_NODE]
1659
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1660

    
1661
        # Lock member nodes of all locked groups
1662
        self.needed_locks[locking.LEVEL_NODE] = \
1663
          [node_uuid
1664
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1665
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1666
      else:
1667
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1668

    
1669
        self._LockInstancesNodes()
1670

    
1671
    elif level == locking.LEVEL_NODE_RES:
1672
      # Reuse node locks
1673
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1674
        self.needed_locks[locking.LEVEL_NODE]
1675

    
1676
  def BuildHooksEnv(self):
1677
    """Build hooks env.
1678

1679
    This runs on the master, the primary and all the secondaries.
1680

1681
    """
1682
    instance = self.replacer.instance
1683
    env = {
1684
      "MODE": self.op.mode,
1685
      "NEW_SECONDARY": self.op.remote_node,
1686
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1687
      }
1688
    env.update(BuildInstanceHookEnvByObject(self, instance))
1689
    return env
1690

    
1691
  def BuildHooksNodes(self):
1692
    """Build hooks nodes.
1693

1694
    """
1695
    instance = self.replacer.instance
1696
    nl = [
1697
      self.cfg.GetMasterNode(),
1698
      instance.primary_node,
1699
      ]
1700
    if self.op.remote_node_uuid is not None:
1701
      nl.append(self.op.remote_node_uuid)
1702
    return nl, nl
1703

    
1704
  def CheckPrereq(self):
1705
    """Check prerequisites.
1706

1707
    """
1708
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1709
            self.op.iallocator is None)
1710

    
1711
    # Verify if node group locks are still correct
1712
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1713
    if owned_groups:
1714
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1715

    
1716
    return LogicalUnit.CheckPrereq(self)
1717

    
1718

    
1719
class LUInstanceActivateDisks(NoHooksLU):
1720
  """Bring up an instance's disks.
1721

1722
  """
1723
  REQ_BGL = False
1724

    
1725
  def ExpandNames(self):
1726
    self._ExpandAndLockInstance()
1727
    self.needed_locks[locking.LEVEL_NODE] = []
1728
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1729

    
1730
  def DeclareLocks(self, level):
1731
    if level == locking.LEVEL_NODE:
1732
      self._LockInstancesNodes()
1733

    
1734
  def CheckPrereq(self):
1735
    """Check prerequisites.
1736

1737
    This checks that the instance is in the cluster.
1738

1739
    """
1740
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1741
    assert self.instance is not None, \
1742
      "Cannot retrieve locked instance %s" % self.op.instance_name
1743
    CheckNodeOnline(self, self.instance.primary_node)
1744

    
1745
  def Exec(self, feedback_fn):
1746
    """Activate the disks.
1747

1748
    """
1749
    disks_ok, disks_info = \
1750
              AssembleInstanceDisks(self, self.instance,
1751
                                    ignore_size=self.op.ignore_size)
1752
    if not disks_ok:
1753
      raise errors.OpExecError("Cannot activate block devices")
1754

    
1755
    if self.op.wait_for_sync:
1756
      if not WaitForSync(self, self.instance):
1757
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1758
        raise errors.OpExecError("Some disks of the instance are degraded!")
1759

    
1760
    return disks_info
1761

    
1762

    
1763
class LUInstanceDeactivateDisks(NoHooksLU):
1764
  """Shutdown an instance's disks.
1765

1766
  """
1767
  REQ_BGL = False
1768

    
1769
  def ExpandNames(self):
1770
    self._ExpandAndLockInstance()
1771
    self.needed_locks[locking.LEVEL_NODE] = []
1772
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1773

    
1774
  def DeclareLocks(self, level):
1775
    if level == locking.LEVEL_NODE:
1776
      self._LockInstancesNodes()
1777

    
1778
  def CheckPrereq(self):
1779
    """Check prerequisites.
1780

1781
    This checks that the instance is in the cluster.
1782

1783
    """
1784
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1785
    assert self.instance is not None, \
1786
      "Cannot retrieve locked instance %s" % self.op.instance_name
1787

    
1788
  def Exec(self, feedback_fn):
1789
    """Deactivate the disks
1790

1791
    """
1792
    if self.op.force:
1793
      ShutdownInstanceDisks(self, self.instance)
1794
    else:
1795
      _SafeShutdownInstanceDisks(self, self.instance)
1796

    
1797

    
1798
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1799
                               ldisk=False):
1800
  """Check that mirrors are not degraded.
1801

1802
  @attention: The device has to be annotated already.
1803

1804
  The ldisk parameter, if True, will change the test from the
1805
  is_degraded attribute (which represents overall non-ok status for
1806
  the device(s)) to the ldisk (representing the local storage status).
1807

1808
  """
1809
  lu.cfg.SetDiskID(dev, node_uuid)
1810

    
1811
  result = True
1812

    
1813
  if on_primary or dev.AssembleOnSecondary():
1814
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1815
    msg = rstats.fail_msg
1816
    if msg:
1817
      lu.LogWarning("Can't find disk on node %s: %s",
1818
                    lu.cfg.GetNodeName(node_uuid), msg)
1819
      result = False
1820
    elif not rstats.payload:
1821
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1822
      result = False
1823
    else:
1824
      if ldisk:
1825
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1826
      else:
1827
        result = result and not rstats.payload.is_degraded
1828

    
1829
  if dev.children:
1830
    for child in dev.children:
1831
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1832
                                                     node_uuid, on_primary)
1833

    
1834
  return result
1835

    
1836

    
1837
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1838
  """Wrapper around L{_CheckDiskConsistencyInner}.
1839

1840
  """
1841
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1842
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1843
                                    ldisk=ldisk)
1844

    
1845

    
1846
def _BlockdevFind(lu, node_uuid, dev, instance):
1847
  """Wrapper around call_blockdev_find to annotate diskparams.
1848

1849
  @param lu: A reference to the lu object
1850
  @param node_uuid: The node to call out
1851
  @param dev: The device to find
1852
  @param instance: The instance object the device belongs to
1853
  @returns The result of the rpc call
1854

1855
  """
1856
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1857
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1858

    
1859

    
1860
def _GenerateUniqueNames(lu, exts):
1861
  """Generate a suitable LV name.
1862

1863
  This will generate a logical volume name for the given instance.
1864

1865
  """
1866
  results = []
1867
  for val in exts:
1868
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1869
    results.append("%s%s" % (new_id, val))
1870
  return results
1871

    
1872

    
1873
class TLReplaceDisks(Tasklet):
1874
  """Replaces disks for an instance.
1875

1876
  Note: Locking is not within the scope of this class.
1877

1878
  """
1879
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1880
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1881
    """Initializes this class.
1882

1883
    """
1884
    Tasklet.__init__(self, lu)
1885

    
1886
    # Parameters
1887
    self.instance_uuid = instance_uuid
1888
    self.instance_name = instance_name
1889
    self.mode = mode
1890
    self.iallocator_name = iallocator_name
1891
    self.remote_node_uuid = remote_node_uuid
1892
    self.disks = disks
1893
    self.early_release = early_release
1894
    self.ignore_ipolicy = ignore_ipolicy
1895

    
1896
    # Runtime data
1897
    self.instance = None
1898
    self.new_node_uuid = None
1899
    self.target_node_uuid = None
1900
    self.other_node_uuid = None
1901
    self.remote_node_info = None
1902
    self.node_secondary_ip = None
1903

    
1904
  @staticmethod
1905
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1906
                    relocate_from_node_uuids):
1907
    """Compute a new secondary node using an IAllocator.
1908

1909
    """
1910
    req = iallocator.IAReqRelocate(
1911
          inst_uuid=instance_uuid,
1912
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1913
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1914

    
1915
    ial.Run(iallocator_name)
1916

    
1917
    if not ial.success:
1918
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1919
                                 " %s" % (iallocator_name, ial.info),
1920
                                 errors.ECODE_NORES)
1921

    
1922
    remote_node_name = ial.result[0]
1923
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1924

    
1925
    if remote_node is None:
1926
      raise errors.OpPrereqError("Node %s not found in configuration" %
1927
                                 remote_node_name, errors.ECODE_NOENT)
1928

    
1929
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1930
               instance_uuid, remote_node_name)
1931

    
1932
    return remote_node.uuid
1933

    
1934
  def _FindFaultyDisks(self, node_uuid):
1935
    """Wrapper for L{FindFaultyInstanceDisks}.
1936

1937
    """
1938
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1939
                                   node_uuid, True)
1940

    
1941
  def _CheckDisksActivated(self, instance):
1942
    """Checks if the instance disks are activated.
1943

1944
    @param instance: The instance to check disks
1945
    @return: True if they are activated, False otherwise
1946

1947
    """
1948
    node_uuids = instance.all_nodes
1949

    
1950
    for idx, dev in enumerate(instance.disks):
1951
      for node_uuid in node_uuids:
1952
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1953
                        self.cfg.GetNodeName(node_uuid))
1954
        self.cfg.SetDiskID(dev, node_uuid)
1955

    
1956
        result = _BlockdevFind(self, node_uuid, dev, instance)
1957

    
1958
        if result.offline:
1959
          continue
1960
        elif result.fail_msg or not result.payload:
1961
          return False
1962

    
1963
    return True
1964

    
1965
  def CheckPrereq(self):
1966
    """Check prerequisites.
1967

1968
    This checks that the instance is in the cluster.
1969

1970
    """
1971
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1972
    assert self.instance is not None, \
1973
      "Cannot retrieve locked instance %s" % self.instance_name
1974

    
1975
    if self.instance.disk_template != constants.DT_DRBD8:
1976
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1977
                                 " instances", errors.ECODE_INVAL)
1978

    
1979
    if len(self.instance.secondary_nodes) != 1:
1980
      raise errors.OpPrereqError("The instance has a strange layout,"
1981
                                 " expected one secondary but found %d" %
1982
                                 len(self.instance.secondary_nodes),
1983
                                 errors.ECODE_FAULT)
1984

    
1985
    secondary_node_uuid = self.instance.secondary_nodes[0]
1986

    
1987
    if self.iallocator_name is None:
1988
      remote_node_uuid = self.remote_node_uuid
1989
    else:
1990
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1991
                                            self.instance.uuid,
1992
                                            self.instance.secondary_nodes)
1993

    
1994
    if remote_node_uuid is None:
1995
      self.remote_node_info = None
1996
    else:
1997
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1998
             "Remote node '%s' is not locked" % remote_node_uuid
1999

    
2000
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2001
      assert self.remote_node_info is not None, \
2002
        "Cannot retrieve locked node %s" % remote_node_uuid
2003

    
2004
    if remote_node_uuid == self.instance.primary_node:
2005
      raise errors.OpPrereqError("The specified node is the primary node of"
2006
                                 " the instance", errors.ECODE_INVAL)
2007

    
2008
    if remote_node_uuid == secondary_node_uuid:
2009
      raise errors.OpPrereqError("The specified node is already the"
2010
                                 " secondary node of the instance",
2011
                                 errors.ECODE_INVAL)
2012

    
2013
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2014
                                    constants.REPLACE_DISK_CHG):
2015
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2016
                                 errors.ECODE_INVAL)
2017

    
2018
    if self.mode == constants.REPLACE_DISK_AUTO:
2019
      if not self._CheckDisksActivated(self.instance):
2020
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2021
                                   " first" % self.instance_name,
2022
                                   errors.ECODE_STATE)
2023
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2024
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2025

    
2026
      if faulty_primary and faulty_secondary:
2027
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2028
                                   " one node and can not be repaired"
2029
                                   " automatically" % self.instance_name,
2030
                                   errors.ECODE_STATE)
2031

    
2032
      if faulty_primary:
2033
        self.disks = faulty_primary
2034
        self.target_node_uuid = self.instance.primary_node
2035
        self.other_node_uuid = secondary_node_uuid
2036
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2037
      elif faulty_secondary:
2038
        self.disks = faulty_secondary
2039
        self.target_node_uuid = secondary_node_uuid
2040
        self.other_node_uuid = self.instance.primary_node
2041
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2042
      else:
2043
        self.disks = []
2044
        check_nodes = []
2045

    
2046
    else:
2047
      # Non-automatic modes
2048
      if self.mode == constants.REPLACE_DISK_PRI:
2049
        self.target_node_uuid = self.instance.primary_node
2050
        self.other_node_uuid = secondary_node_uuid
2051
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2052

    
2053
      elif self.mode == constants.REPLACE_DISK_SEC:
2054
        self.target_node_uuid = secondary_node_uuid
2055
        self.other_node_uuid = self.instance.primary_node
2056
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2057

    
2058
      elif self.mode == constants.REPLACE_DISK_CHG:
2059
        self.new_node_uuid = remote_node_uuid
2060
        self.other_node_uuid = self.instance.primary_node
2061
        self.target_node_uuid = secondary_node_uuid
2062
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2063

    
2064
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2065
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2066

    
2067
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2068
        assert old_node_info is not None
2069
        if old_node_info.offline and not self.early_release:
2070
          # doesn't make sense to delay the release
2071
          self.early_release = True
2072
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2073
                          " early-release mode", secondary_node_uuid)
2074

    
2075
      else:
2076
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2077
                                     self.mode)
2078

    
2079
      # If not specified all disks should be replaced
2080
      if not self.disks:
2081
        self.disks = range(len(self.instance.disks))
2082

    
2083
    # TODO: This is ugly, but right now we can't distinguish between internal
2084
    # submitted opcode and external one. We should fix that.
2085
    if self.remote_node_info:
2086
      # We change the node, lets verify it still meets instance policy
2087
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2088
      cluster = self.cfg.GetClusterInfo()
2089
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2090
                                                              new_group_info)
2091
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2092
                             self.remote_node_info, self.cfg,
2093
                             ignore=self.ignore_ipolicy)
2094

    
2095
    for node_uuid in check_nodes:
2096
      CheckNodeOnline(self.lu, node_uuid)
2097

    
2098
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2099
                                                          self.other_node_uuid,
2100
                                                          self.target_node_uuid]
2101
                              if node_uuid is not None)
2102

    
2103
    # Release unneeded node and node resource locks
2104
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2105
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2106
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2107

    
2108
    # Release any owned node group
2109
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2110

    
2111
    # Check whether disks are valid
2112
    for disk_idx in self.disks:
2113
      self.instance.FindDisk(disk_idx)
2114

    
2115
    # Get secondary node IP addresses
2116
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2117
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2118

    
2119
  def Exec(self, feedback_fn):
2120
    """Execute disk replacement.
2121

2122
    This dispatches the disk replacement to the appropriate handler.
2123

2124
    """
2125
    if __debug__:
2126
      # Verify owned locks before starting operation
2127
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2128
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2129
          ("Incorrect node locks, owning %s, expected %s" %
2130
           (owned_nodes, self.node_secondary_ip.keys()))
2131
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2132
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2133
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2134

    
2135
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2136
      assert list(owned_instances) == [self.instance_name], \
2137
          "Instance '%s' not locked" % self.instance_name
2138

    
2139
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2140
          "Should not own any node group lock at this point"
2141

    
2142
    if not self.disks:
2143
      feedback_fn("No disks need replacement for instance '%s'" %
2144
                  self.instance.name)
2145
      return
2146

    
2147
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2148
                (utils.CommaJoin(self.disks), self.instance.name))
2149
    feedback_fn("Current primary node: %s" %
2150
                self.cfg.GetNodeName(self.instance.primary_node))
2151
    feedback_fn("Current seconary node: %s" %
2152
                utils.CommaJoin(self.cfg.GetNodeNames(
2153
                                  self.instance.secondary_nodes)))
2154

    
2155
    activate_disks = not self.instance.disks_active
2156

    
2157
    # Activate the instance disks if we're replacing them on a down instance
2158
    if activate_disks:
2159
      StartInstanceDisks(self.lu, self.instance, True)
2160

    
2161
    try:
2162
      # Should we replace the secondary node?
2163
      if self.new_node_uuid is not None:
2164
        fn = self._ExecDrbd8Secondary
2165
      else:
2166
        fn = self._ExecDrbd8DiskOnly
2167

    
2168
      result = fn(feedback_fn)
2169
    finally:
2170
      # Deactivate the instance disks if we're replacing them on a
2171
      # down instance
2172
      if activate_disks:
2173
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2174

    
2175
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2176

    
2177
    if __debug__:
2178
      # Verify owned locks
2179
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2180
      nodes = frozenset(self.node_secondary_ip)
2181
      assert ((self.early_release and not owned_nodes) or
2182
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2183
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2184
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2185

    
2186
    return result
2187

    
2188
  def _CheckVolumeGroup(self, node_uuids):
2189
    self.lu.LogInfo("Checking volume groups")
2190

    
2191
    vgname = self.cfg.GetVGName()
2192

    
2193
    # Make sure volume group exists on all involved nodes
2194
    results = self.rpc.call_vg_list(node_uuids)
2195
    if not results:
2196
      raise errors.OpExecError("Can't list volume groups on the nodes")
2197

    
2198
    for node_uuid in node_uuids:
2199
      res = results[node_uuid]
2200
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2201
      if vgname not in res.payload:
2202
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2203
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2204

    
2205
  def _CheckDisksExistence(self, node_uuids):
2206
    # Check disk existence
2207
    for idx, dev in enumerate(self.instance.disks):
2208
      if idx not in self.disks:
2209
        continue
2210

    
2211
      for node_uuid in node_uuids:
2212
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2213
                        self.cfg.GetNodeName(node_uuid))
2214
        self.cfg.SetDiskID(dev, node_uuid)
2215

    
2216
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2217

    
2218
        msg = result.fail_msg
2219
        if msg or not result.payload:
2220
          if not msg:
2221
            msg = "disk not found"
2222
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2223
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2224

    
2225
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2226
    for idx, dev in enumerate(self.instance.disks):
2227
      if idx not in self.disks:
2228
        continue
2229

    
2230
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2231
                      (idx, self.cfg.GetNodeName(node_uuid)))
2232

    
2233
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2234
                                  on_primary, ldisk=ldisk):
2235
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2236
                                 " replace disks for instance %s" %
2237
                                 (self.cfg.GetNodeName(node_uuid),
2238
                                  self.instance.name))
2239

    
2240
  def _CreateNewStorage(self, node_uuid):
2241
    """Create new storage on the primary or secondary node.
2242

2243
    This is only used for same-node replaces, not for changing the
2244
    secondary node, hence we don't want to modify the existing disk.
2245

2246
    """
2247
    iv_names = {}
2248

    
2249
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2250
    for idx, dev in enumerate(disks):
2251
      if idx not in self.disks:
2252
        continue
2253

    
2254
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2255
                      self.cfg.GetNodeName(node_uuid), idx)
2256

    
2257
      self.cfg.SetDiskID(dev, node_uuid)
2258

    
2259
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2260
      names = _GenerateUniqueNames(self.lu, lv_names)
2261

    
2262
      (data_disk, meta_disk) = dev.children
2263
      vg_data = data_disk.logical_id[0]
2264
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2265
                             logical_id=(vg_data, names[0]),
2266
                             params=data_disk.params)
2267
      vg_meta = meta_disk.logical_id[0]
2268
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2269
                             size=constants.DRBD_META_SIZE,
2270
                             logical_id=(vg_meta, names[1]),
2271
                             params=meta_disk.params)
2272

    
2273
      new_lvs = [lv_data, lv_meta]
2274
      old_lvs = [child.Copy() for child in dev.children]
2275
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2276
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2277

    
2278
      # we pass force_create=True to force the LVM creation
2279
      for new_lv in new_lvs:
2280
        try:
2281
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2282
                               GetInstanceInfoText(self.instance), False,
2283
                               excl_stor)
2284
        except errors.DeviceCreationError, e:
2285
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2286

    
2287
    return iv_names
2288

    
2289
  def _CheckDevices(self, node_uuid, iv_names):
2290
    for name, (dev, _, _) in iv_names.iteritems():
2291
      self.cfg.SetDiskID(dev, node_uuid)
2292

    
2293
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2294

    
2295
      msg = result.fail_msg
2296
      if msg or not result.payload:
2297
        if not msg:
2298
          msg = "disk not found"
2299
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2300
                                 (name, msg))
2301

    
2302
      if result.payload.is_degraded:
2303
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2304

    
2305
  def _RemoveOldStorage(self, node_uuid, iv_names):
2306
    for name, (_, old_lvs, _) in iv_names.iteritems():
2307
      self.lu.LogInfo("Remove logical volumes for %s", name)
2308

    
2309
      for lv in old_lvs:
2310
        self.cfg.SetDiskID(lv, node_uuid)
2311

    
2312
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2313
        if msg:
2314
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2315
                             hint="remove unused LVs manually")
2316

    
2317
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2318
    """Replace a disk on the primary or secondary for DRBD 8.
2319

2320
    The algorithm for replace is quite complicated:
2321

2322
      1. for each disk to be replaced:
2323

2324
        1. create new LVs on the target node with unique names
2325
        1. detach old LVs from the drbd device
2326
        1. rename old LVs to name_replaced.<time_t>
2327
        1. rename new LVs to old LVs
2328
        1. attach the new LVs (with the old names now) to the drbd device
2329

2330
      1. wait for sync across all devices
2331

2332
      1. for each modified disk:
2333

2334
        1. remove old LVs (which have the name name_replaces.<time_t>)
2335

2336
    Failures are not very well handled.
2337

2338
    """
2339
    steps_total = 6
2340

    
2341
    # Step: check device activation
2342
    self.lu.LogStep(1, steps_total, "Check device existence")
2343
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2344
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2345

    
2346
    # Step: check other node consistency
2347
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2348
    self._CheckDisksConsistency(
2349
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2350
      False)
2351

    
2352
    # Step: create new storage
2353
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2354
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2355

    
2356
    # Step: for each lv, detach+rename*2+attach
2357
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2358
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2359
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2360

    
2361
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2362
                                                     old_lvs)
2363
      result.Raise("Can't detach drbd from local storage on node"
2364
                   " %s for device %s" %
2365
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2366
      #dev.children = []
2367
      #cfg.Update(instance)
2368

    
2369
      # ok, we created the new LVs, so now we know we have the needed
2370
      # storage; as such, we proceed on the target node to rename
2371
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2372
      # using the assumption that logical_id == physical_id (which in
2373
      # turn is the unique_id on that node)
2374

    
2375
      # FIXME(iustin): use a better name for the replaced LVs
2376
      temp_suffix = int(time.time())
2377
      ren_fn = lambda d, suff: (d.physical_id[0],
2378
                                d.physical_id[1] + "_replaced-%s" % suff)
2379

    
2380
      # Build the rename list based on what LVs exist on the node
2381
      rename_old_to_new = []
2382
      for to_ren in old_lvs:
2383
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2384
        if not result.fail_msg and result.payload:
2385
          # device exists
2386
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2387

    
2388
      self.lu.LogInfo("Renaming the old LVs on the target node")
2389
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2390
                                             rename_old_to_new)
2391
      result.Raise("Can't rename old LVs on node %s" %
2392
                   self.cfg.GetNodeName(self.target_node_uuid))
2393

    
2394
      # Now we rename the new LVs to the old LVs
2395
      self.lu.LogInfo("Renaming the new LVs on the target node")
2396
      rename_new_to_old = [(new, old.physical_id)
2397
                           for old, new in zip(old_lvs, new_lvs)]
2398
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2399
                                             rename_new_to_old)
2400
      result.Raise("Can't rename new LVs on node %s" %
2401
                   self.cfg.GetNodeName(self.target_node_uuid))
2402

    
2403
      # Intermediate steps of in memory modifications
2404
      for old, new in zip(old_lvs, new_lvs):
2405
        new.logical_id = old.logical_id
2406
        self.cfg.SetDiskID(new, self.target_node_uuid)
2407

    
2408
      # We need to modify old_lvs so that removal later removes the
2409
      # right LVs, not the newly added ones; note that old_lvs is a
2410
      # copy here
2411
      for disk in old_lvs:
2412
        disk.logical_id = ren_fn(disk, temp_suffix)
2413
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2414

    
2415
      # Now that the new lvs have the old name, we can add them to the device
2416
      self.lu.LogInfo("Adding new mirror component on %s",
2417
                      self.cfg.GetNodeName(self.target_node_uuid))
2418
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2419
                                                  (dev, self.instance), new_lvs)
2420
      msg = result.fail_msg
2421
      if msg:
2422
        for new_lv in new_lvs:
2423
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2424
                                               new_lv).fail_msg
2425
          if msg2:
2426
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2427
                               hint=("cleanup manually the unused logical"
2428
                                     "volumes"))
2429
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2430

    
2431
    cstep = itertools.count(5)
2432

    
2433
    if self.early_release:
2434
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2435
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2436
      # TODO: Check if releasing locks early still makes sense
2437
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2438
    else:
2439
      # Release all resource locks except those used by the instance
2440
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2441
                   keep=self.node_secondary_ip.keys())
2442

    
2443
    # Release all node locks while waiting for sync
2444
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2445

    
2446
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2447
    # shutdown in the caller into consideration.
2448

    
2449
    # Wait for sync
2450
    # This can fail as the old devices are degraded and _WaitForSync
2451
    # does a combined result over all disks, so we don't check its return value
2452
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2453
    WaitForSync(self.lu, self.instance)
2454

    
2455
    # Check all devices manually
2456
    self._CheckDevices(self.instance.primary_node, iv_names)
2457

    
2458
    # Step: remove old storage
2459
    if not self.early_release:
2460
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2461
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2462

    
2463
  def _ExecDrbd8Secondary(self, feedback_fn):
2464
    """Replace the secondary node for DRBD 8.
2465

2466
    The algorithm for replace is quite complicated:
2467
      - for all disks of the instance:
2468
        - create new LVs on the new node with same names
2469
        - shutdown the drbd device on the old secondary
2470
        - disconnect the drbd network on the primary
2471
        - create the drbd device on the new secondary
2472
        - network attach the drbd on the primary, using an artifice:
2473
          the drbd code for Attach() will connect to the network if it
2474
          finds a device which is connected to the good local disks but
2475
          not network enabled
2476
      - wait for sync across all devices
2477
      - remove all disks from the old secondary
2478

2479
    Failures are not very well handled.
2480

2481
    """
2482
    steps_total = 6
2483

    
2484
    pnode = self.instance.primary_node
2485

    
2486
    # Step: check device activation
2487
    self.lu.LogStep(1, steps_total, "Check device existence")
2488
    self._CheckDisksExistence([self.instance.primary_node])
2489
    self._CheckVolumeGroup([self.instance.primary_node])
2490

    
2491
    # Step: check other node consistency
2492
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2493
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2494

    
2495
    # Step: create new storage
2496
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2497
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2498
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2499
                                                  self.new_node_uuid)
2500
    for idx, dev in enumerate(disks):
2501
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2502
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2503
      # we pass force_create=True to force LVM creation
2504
      for new_lv in dev.children:
2505
        try:
2506
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2507
                               new_lv, True, GetInstanceInfoText(self.instance),
2508
                               False, excl_stor)
2509
        except errors.DeviceCreationError, e:
2510
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2511

    
2512
    # Step 4: dbrd minors and drbd setups changes
2513
    # after this, we must manually remove the drbd minors on both the
2514
    # error and the success paths
2515
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2516
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2517
                                         for _ in self.instance.disks],
2518
                                        self.instance.uuid)
2519
    logging.debug("Allocated minors %r", minors)
2520

    
2521
    iv_names = {}
2522
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2523
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2524
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2525
      # create new devices on new_node; note that we create two IDs:
2526
      # one without port, so the drbd will be activated without
2527
      # networking information on the new node at this stage, and one
2528
      # with network, for the latter activation in step 4
2529
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2530
      if self.instance.primary_node == o_node1:
2531
        p_minor = o_minor1
2532
      else:
2533
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2534
        p_minor = o_minor2
2535

    
2536
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2537
                      p_minor, new_minor, o_secret)
2538
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2539
                    p_minor, new_minor, o_secret)
2540

    
2541
      iv_names[idx] = (dev, dev.children, new_net_id)
2542
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2543
                    new_net_id)
2544
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2545
                              logical_id=new_alone_id,
2546
                              children=dev.children,
2547
                              size=dev.size,
2548
                              params={})
2549
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2550
                                            self.cfg)
2551
      try:
2552
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2553
                             anno_new_drbd,
2554
                             GetInstanceInfoText(self.instance), False,
2555
                             excl_stor)
2556
      except errors.GenericError:
2557
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2558
        raise
2559

    
2560
    # We have new devices, shutdown the drbd on the old secondary
2561
    for idx, dev in enumerate(self.instance.disks):
2562
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2563
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2564
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2565
                                            (dev, self.instance)).fail_msg
2566
      if msg:
2567
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2568
                           "node: %s" % (idx, msg),
2569
                           hint=("Please cleanup this device manually as"
2570
                                 " soon as possible"))
2571

    
2572
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2573
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2574
                                               self.instance.disks)[pnode]
2575

    
2576
    msg = result.fail_msg
2577
    if msg:
2578
      # detaches didn't succeed (unlikely)
2579
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2580
      raise errors.OpExecError("Can't detach the disks from the network on"
2581
                               " old node: %s" % (msg,))
2582

    
2583
    # if we managed to detach at least one, we update all the disks of
2584
    # the instance to point to the new secondary
2585
    self.lu.LogInfo("Updating instance configuration")
2586
    for dev, _, new_logical_id in iv_names.itervalues():
2587
      dev.logical_id = new_logical_id
2588
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2589

    
2590
    self.cfg.Update(self.instance, feedback_fn)
2591

    
2592
    # Release all node locks (the configuration has been updated)
2593
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2594

    
2595
    # and now perform the drbd attach
2596
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2597
                    " (standalone => connected)")
2598
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2599
                                            self.new_node_uuid],
2600
                                           self.node_secondary_ip,
2601
                                           (self.instance.disks, self.instance),
2602
                                           self.instance.name,
2603
                                           False)
2604
    for to_node, to_result in result.items():
2605
      msg = to_result.fail_msg
2606
      if msg:
2607
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2608
                           self.cfg.GetNodeName(to_node), msg,
2609
                           hint=("please do a gnt-instance info to see the"
2610
                                 " status of disks"))
2611

    
2612
    cstep = itertools.count(5)
2613

    
2614
    if self.early_release:
2615
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2616
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2617
      # TODO: Check if releasing locks early still makes sense
2618
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2619
    else:
2620
      # Release all resource locks except those used by the instance
2621
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2622
                   keep=self.node_secondary_ip.keys())
2623

    
2624
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2625
    # shutdown in the caller into consideration.
2626

    
2627
    # Wait for sync
2628
    # This can fail as the old devices are degraded and _WaitForSync
2629
    # does a combined result over all disks, so we don't check its return value
2630
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2631
    WaitForSync(self.lu, self.instance)
2632

    
2633
    # Check all devices manually
2634
    self._CheckDevices(self.instance.primary_node, iv_names)
2635

    
2636
    # Step: remove old storage
2637
    if not self.early_release:
2638
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2639
      self._RemoveOldStorage(self.target_node_uuid, iv_names)