Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 850c53f1

History | View | Annotate | Download (99.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
44
  CheckDiskTemplateEnabled
45
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
46
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
47
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48

    
49
import ganeti.masterd.instance
50

    
51

    
52
_DISK_TEMPLATE_NAME_PREFIX = {
53
  constants.DT_PLAIN: "",
54
  constants.DT_RBD: ".rbd",
55
  constants.DT_EXT: ".ext",
56
  }
57

    
58

    
59
_DISK_TEMPLATE_DEVICE_TYPE = {
60
  constants.DT_PLAIN: constants.LD_LV,
61
  constants.DT_FILE: constants.LD_FILE,
62
  constants.DT_SHARED_FILE: constants.LD_FILE,
63
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
64
  constants.DT_RBD: constants.LD_RBD,
65
  constants.DT_EXT: constants.LD_EXT,
66
  }
67

    
68

    
69
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
70
                         excl_stor):
71
  """Create a single block device on a given node.
72

73
  This will not recurse over children of the device, so they must be
74
  created in advance.
75

76
  @param lu: the lu on whose behalf we execute
77
  @param node_uuid: the node on which to create the device
78
  @type instance: L{objects.Instance}
79
  @param instance: the instance which owns the device
80
  @type device: L{objects.Disk}
81
  @param device: the device to create
82
  @param info: the extra 'metadata' we should attach to the device
83
      (this will be represented as a LVM tag)
84
  @type force_open: boolean
85
  @param force_open: this parameter will be passes to the
86
      L{backend.BlockdevCreate} function where it specifies
87
      whether we run on primary or not, and it affects both
88
      the child assembly and the device own Open() execution
89
  @type excl_stor: boolean
90
  @param excl_stor: Whether exclusive_storage is active for the node
91

92
  """
93
  lu.cfg.SetDiskID(device, node_uuid)
94
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
95
                                       instance.name, force_open, info,
96
                                       excl_stor)
97
  result.Raise("Can't create block device %s on"
98
               " node %s for instance %s" % (device,
99
                                             lu.cfg.GetNodeName(node_uuid),
100
                                             instance.name))
101
  if device.physical_id is None:
102
    device.physical_id = result.payload
103

    
104

    
105
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
106
                         info, force_open, excl_stor):
107
  """Create a tree of block devices on a given node.
108

109
  If this device type has to be created on secondaries, create it and
110
  all its children.
111

112
  If not, just recurse to children keeping the same 'force' value.
113

114
  @attention: The device has to be annotated already.
115

116
  @param lu: the lu on whose behalf we execute
117
  @param node_uuid: the node on which to create the device
118
  @type instance: L{objects.Instance}
119
  @param instance: the instance which owns the device
120
  @type device: L{objects.Disk}
121
  @param device: the device to create
122
  @type force_create: boolean
123
  @param force_create: whether to force creation of this device; this
124
      will be change to True whenever we find a device which has
125
      CreateOnSecondary() attribute
126
  @param info: the extra 'metadata' we should attach to the device
127
      (this will be represented as a LVM tag)
128
  @type force_open: boolean
129
  @param force_open: this parameter will be passes to the
130
      L{backend.BlockdevCreate} function where it specifies
131
      whether we run on primary or not, and it affects both
132
      the child assembly and the device own Open() execution
133
  @type excl_stor: boolean
134
  @param excl_stor: Whether exclusive_storage is active for the node
135

136
  @return: list of created devices
137
  """
138
  created_devices = []
139
  try:
140
    if device.CreateOnSecondary():
141
      force_create = True
142

    
143
    if device.children:
144
      for child in device.children:
145
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
146
                                    force_create, info, force_open, excl_stor)
147
        created_devices.extend(devs)
148

    
149
    if not force_create:
150
      return created_devices
151

    
152
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
153
                         excl_stor)
154
    # The device has been completely created, so there is no point in keeping
155
    # its subdevices in the list. We just add the device itself instead.
156
    created_devices = [(node_uuid, device)]
157
    return created_devices
158

    
159
  except errors.DeviceCreationError, e:
160
    e.created_devices.extend(created_devices)
161
    raise e
162
  except errors.OpExecError, e:
163
    raise errors.DeviceCreationError(str(e), created_devices)
164

    
165

    
166
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
167
  """Whether exclusive_storage is in effect for the given node.
168

169
  @type cfg: L{config.ConfigWriter}
170
  @param cfg: The cluster configuration
171
  @type node_uuid: string
172
  @param node_uuid: The node UUID
173
  @rtype: bool
174
  @return: The effective value of exclusive_storage
175
  @raise errors.OpPrereqError: if no node exists with the given name
176

177
  """
178
  ni = cfg.GetNodeInfo(node_uuid)
179
  if ni is None:
180
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
181
                               errors.ECODE_NOENT)
182
  return IsExclusiveStorageEnabledNode(cfg, ni)
183

    
184

    
185
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
186
                    force_open):
187
  """Wrapper around L{_CreateBlockDevInner}.
188

189
  This method annotates the root device first.
190

191
  """
192
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
193
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
194
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
195
                              force_open, excl_stor)
196

    
197

    
198
def _UndoCreateDisks(lu, disks_created):
199
  """Undo the work performed by L{CreateDisks}.
200

201
  This function is called in case of an error to undo the work of
202
  L{CreateDisks}.
203

204
  @type lu: L{LogicalUnit}
205
  @param lu: the logical unit on whose behalf we execute
206
  @param disks_created: the result returned by L{CreateDisks}
207

208
  """
209
  for (node_uuid, disk) in disks_created:
210
    lu.cfg.SetDiskID(disk, node_uuid)
211
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
212
    result.Warn("Failed to remove newly-created disk %s on node %s" %
213
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
214

    
215

    
216
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
217
  """Create all disks for an instance.
218

219
  This abstracts away some work from AddInstance.
220

221
  @type lu: L{LogicalUnit}
222
  @param lu: the logical unit on whose behalf we execute
223
  @type instance: L{objects.Instance}
224
  @param instance: the instance whose disks we should create
225
  @type to_skip: list
226
  @param to_skip: list of indices to skip
227
  @type target_node_uuid: string
228
  @param target_node_uuid: if passed, overrides the target node for creation
229
  @type disks: list of {objects.Disk}
230
  @param disks: the disks to create; if not specified, all the disks of the
231
      instance are created
232
  @return: information about the created disks, to be used to call
233
      L{_UndoCreateDisks}
234
  @raise errors.OpPrereqError: in case of error
235

236
  """
237
  info = GetInstanceInfoText(instance)
238
  if target_node_uuid is None:
239
    pnode_uuid = instance.primary_node
240
    all_node_uuids = instance.all_nodes
241
  else:
242
    pnode_uuid = target_node_uuid
243
    all_node_uuids = [pnode_uuid]
244

    
245
  if disks is None:
246
    disks = instance.disks
247

    
248
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
249

    
250
  if instance.disk_template in constants.DTS_FILEBASED:
251
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
252
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
253

    
254
    result.Raise("Failed to create directory '%s' on"
255
                 " node %s" % (file_storage_dir,
256
                               lu.cfg.GetNodeName(pnode_uuid)))
257

    
258
  disks_created = []
259
  for idx, device in enumerate(disks):
260
    if to_skip and idx in to_skip:
261
      continue
262
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
263
    for node_uuid in all_node_uuids:
264
      f_create = node_uuid == pnode_uuid
265
      try:
266
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
267
                        f_create)
268
        disks_created.append((node_uuid, device))
269
      except errors.DeviceCreationError, e:
270
        logging.warning("Creating disk %s for instance '%s' failed",
271
                        idx, instance.name)
272
        disks_created.extend(e.created_devices)
273
        _UndoCreateDisks(lu, disks_created)
274
        raise errors.OpExecError(e.message)
275
  return disks_created
276

    
277

    
278
def ComputeDiskSizePerVG(disk_template, disks):
279
  """Compute disk size requirements in the volume group
280

281
  """
282
  def _compute(disks, payload):
283
    """Universal algorithm.
284

285
    """
286
    vgs = {}
287
    for disk in disks:
288
      vgs[disk[constants.IDISK_VG]] = \
289
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
290

    
291
    return vgs
292

    
293
  # Required free disk space as a function of disk and swap space
294
  req_size_dict = {
295
    constants.DT_DISKLESS: {},
296
    constants.DT_PLAIN: _compute(disks, 0),
297
    # 128 MB are added for drbd metadata for each disk
298
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
299
    constants.DT_FILE: {},
300
    constants.DT_SHARED_FILE: {},
301
    }
302

    
303
  if disk_template not in req_size_dict:
304
    raise errors.ProgrammerError("Disk template '%s' size requirement"
305
                                 " is unknown" % disk_template)
306

    
307
  return req_size_dict[disk_template]
308

    
309

    
310
def ComputeDisks(op, default_vg):
311
  """Computes the instance disks.
312

313
  @param op: The instance opcode
314
  @param default_vg: The default_vg to assume
315

316
  @return: The computed disks
317

318
  """
319
  disks = []
320
  for disk in op.disks:
321
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
322
    if mode not in constants.DISK_ACCESS_SET:
323
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
324
                                 mode, errors.ECODE_INVAL)
325
    size = disk.get(constants.IDISK_SIZE, None)
326
    if size is None:
327
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
328
    try:
329
      size = int(size)
330
    except (TypeError, ValueError):
331
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
332
                                 errors.ECODE_INVAL)
333

    
334
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
335
    if ext_provider and op.disk_template != constants.DT_EXT:
336
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
337
                                 " disk template, not %s" %
338
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
339
                                  op.disk_template), errors.ECODE_INVAL)
340

    
341
    data_vg = disk.get(constants.IDISK_VG, default_vg)
342
    name = disk.get(constants.IDISK_NAME, None)
343
    if name is not None and name.lower() == constants.VALUE_NONE:
344
      name = None
345
    new_disk = {
346
      constants.IDISK_SIZE: size,
347
      constants.IDISK_MODE: mode,
348
      constants.IDISK_VG: data_vg,
349
      constants.IDISK_NAME: name,
350
      }
351

    
352
    for key in [
353
      constants.IDISK_METAVG,
354
      constants.IDISK_ADOPT,
355
      constants.IDISK_SPINDLES,
356
      ]:
357
      if key in disk:
358
        new_disk[key] = disk[key]
359

    
360
    # For extstorage, demand the `provider' option and add any
361
    # additional parameters (ext-params) to the dict
362
    if op.disk_template == constants.DT_EXT:
363
      if ext_provider:
364
        new_disk[constants.IDISK_PROVIDER] = ext_provider
365
        for key in disk:
366
          if key not in constants.IDISK_PARAMS:
367
            new_disk[key] = disk[key]
368
      else:
369
        raise errors.OpPrereqError("Missing provider for template '%s'" %
370
                                   constants.DT_EXT, errors.ECODE_INVAL)
371

    
372
    disks.append(new_disk)
373

    
374
  return disks
375

    
376

    
377
def CheckRADOSFreeSpace():
378
  """Compute disk size requirements inside the RADOS cluster.
379

380
  """
381
  # For the RADOS cluster we assume there is always enough space.
382
  pass
383

    
384

    
385
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
386
                         iv_name, p_minor, s_minor):
387
  """Generate a drbd8 device complete with its children.
388

389
  """
390
  assert len(vgnames) == len(names) == 2
391
  port = lu.cfg.AllocatePort()
392
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
393

    
394
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
395
                          logical_id=(vgnames[0], names[0]),
396
                          params={})
397
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
398
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
399
                          size=constants.DRBD_META_SIZE,
400
                          logical_id=(vgnames[1], names[1]),
401
                          params={})
402
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
403
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
404
                          logical_id=(primary_uuid, secondary_uuid, port,
405
                                      p_minor, s_minor,
406
                                      shared_secret),
407
                          children=[dev_data, dev_meta],
408
                          iv_name=iv_name, params={})
409
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
410
  return drbd_dev
411

    
412

    
413
def GenerateDiskTemplate(
414
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
415
  disk_info, file_storage_dir, file_driver, base_index,
416
  feedback_fn, full_disk_params):
417
  """Generate the entire disk layout for a given template type.
418

419
  """
420
  vgname = lu.cfg.GetVGName()
421
  disk_count = len(disk_info)
422
  disks = []
423

    
424
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
425

    
426
  if template_name == constants.DT_DISKLESS:
427
    pass
428
  elif template_name == constants.DT_DRBD8:
429
    if len(secondary_node_uuids) != 1:
430
      raise errors.ProgrammerError("Wrong template configuration")
431
    remote_node_uuid = secondary_node_uuids[0]
432
    minors = lu.cfg.AllocateDRBDMinor(
433
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
434

    
435
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
436
                                                       full_disk_params)
437
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
438

    
439
    names = []
440
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
441
                                               for i in range(disk_count)]):
442
      names.append(lv_prefix + "_data")
443
      names.append(lv_prefix + "_meta")
444
    for idx, disk in enumerate(disk_info):
445
      disk_index = idx + base_index
446
      data_vg = disk.get(constants.IDISK_VG, vgname)
447
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
448
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
449
                                      disk[constants.IDISK_SIZE],
450
                                      [data_vg, meta_vg],
451
                                      names[idx * 2:idx * 2 + 2],
452
                                      "disk/%d" % disk_index,
453
                                      minors[idx * 2], minors[idx * 2 + 1])
454
      disk_dev.mode = disk[constants.IDISK_MODE]
455
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
456
      disks.append(disk_dev)
457
  else:
458
    if secondary_node_uuids:
459
      raise errors.ProgrammerError("Wrong template configuration")
460

    
461
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
462
    if name_prefix is None:
463
      names = None
464
    else:
465
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
466
                                        (name_prefix, base_index + i)
467
                                        for i in range(disk_count)])
468

    
469
    if template_name == constants.DT_PLAIN:
470

    
471
      def logical_id_fn(idx, _, disk):
472
        vg = disk.get(constants.IDISK_VG, vgname)
473
        return (vg, names[idx])
474

    
475
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
476
      logical_id_fn = \
477
        lambda _, disk_index, disk: (file_driver,
478
                                     "%s/disk%d" % (file_storage_dir,
479
                                                    disk_index))
480
    elif template_name == constants.DT_BLOCK:
481
      logical_id_fn = \
482
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
483
                                       disk[constants.IDISK_ADOPT])
484
    elif template_name == constants.DT_RBD:
485
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
486
    elif template_name == constants.DT_EXT:
487
      def logical_id_fn(idx, _, disk):
488
        provider = disk.get(constants.IDISK_PROVIDER, None)
489
        if provider is None:
490
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
491
                                       " not found", constants.DT_EXT,
492
                                       constants.IDISK_PROVIDER)
493
        return (provider, names[idx])
494
    else:
495
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
496

    
497
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
498

    
499
    for idx, disk in enumerate(disk_info):
500
      params = {}
501
      # Only for the Ext template add disk_info to params
502
      if template_name == constants.DT_EXT:
503
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
504
        for key in disk:
505
          if key not in constants.IDISK_PARAMS:
506
            params[key] = disk[key]
507
      disk_index = idx + base_index
508
      size = disk[constants.IDISK_SIZE]
509
      feedback_fn("* disk %s, size %s" %
510
                  (disk_index, utils.FormatUnit(size, "h")))
511
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
512
                              logical_id=logical_id_fn(idx, disk_index, disk),
513
                              iv_name="disk/%d" % disk_index,
514
                              mode=disk[constants.IDISK_MODE],
515
                              params=params,
516
                              spindles=disk.get(constants.IDISK_SPINDLES))
517
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
518
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
519
      disks.append(disk_dev)
520

    
521
  return disks
522

    
523

    
524
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
525
  """Check the presence of the spindle options with exclusive_storage.
526

527
  @type diskdict: dict
528
  @param diskdict: disk parameters
529
  @type es_flag: bool
530
  @param es_flag: the effective value of the exlusive_storage flag
531
  @type required: bool
532
  @param required: whether spindles are required or just optional
533
  @raise errors.OpPrereqError when spindles are given and they should not
534

535
  """
536
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
537
      diskdict[constants.IDISK_SPINDLES] is not None):
538
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
539
                               " when exclusive storage is not active",
540
                               errors.ECODE_INVAL)
541
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
542
                                diskdict[constants.IDISK_SPINDLES] is None)):
543
    raise errors.OpPrereqError("You must specify spindles in instance disks"
544
                               " when exclusive storage is active",
545
                               errors.ECODE_INVAL)
546

    
547

    
548
class LUInstanceRecreateDisks(LogicalUnit):
549
  """Recreate an instance's missing disks.
550

551
  """
552
  HPATH = "instance-recreate-disks"
553
  HTYPE = constants.HTYPE_INSTANCE
554
  REQ_BGL = False
555

    
556
  _MODIFYABLE = compat.UniqueFrozenset([
557
    constants.IDISK_SIZE,
558
    constants.IDISK_MODE,
559
    constants.IDISK_SPINDLES,
560
    ])
561

    
562
  # New or changed disk parameters may have different semantics
563
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
564
    constants.IDISK_ADOPT,
565

    
566
    # TODO: Implement support changing VG while recreating
567
    constants.IDISK_VG,
568
    constants.IDISK_METAVG,
569
    constants.IDISK_PROVIDER,
570
    constants.IDISK_NAME,
571
    ]))
572

    
573
  def _RunAllocator(self):
574
    """Run the allocator based on input opcode.
575

576
    """
577
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
578

    
579
    # FIXME
580
    # The allocator should actually run in "relocate" mode, but current
581
    # allocators don't support relocating all the nodes of an instance at
582
    # the same time. As a workaround we use "allocate" mode, but this is
583
    # suboptimal for two reasons:
584
    # - The instance name passed to the allocator is present in the list of
585
    #   existing instances, so there could be a conflict within the
586
    #   internal structures of the allocator. This doesn't happen with the
587
    #   current allocators, but it's a liability.
588
    # - The allocator counts the resources used by the instance twice: once
589
    #   because the instance exists already, and once because it tries to
590
    #   allocate a new instance.
591
    # The allocator could choose some of the nodes on which the instance is
592
    # running, but that's not a problem. If the instance nodes are broken,
593
    # they should be already be marked as drained or offline, and hence
594
    # skipped by the allocator. If instance disks have been lost for other
595
    # reasons, then recreating the disks on the same nodes should be fine.
596
    disk_template = self.instance.disk_template
597
    spindle_use = be_full[constants.BE_SPINDLE_USE]
598
    disks = [{
599
      constants.IDISK_SIZE: d.size,
600
      constants.IDISK_MODE: d.mode,
601
      constants.IDISK_SPINDLES: d.spindles,
602
      } for d in self.instance.disks]
603
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
604
                                        disk_template=disk_template,
605
                                        tags=list(self.instance.GetTags()),
606
                                        os=self.instance.os,
607
                                        nics=[{}],
608
                                        vcpus=be_full[constants.BE_VCPUS],
609
                                        memory=be_full[constants.BE_MAXMEM],
610
                                        spindle_use=spindle_use,
611
                                        disks=disks,
612
                                        hypervisor=self.instance.hypervisor,
613
                                        node_whitelist=None)
614
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
615

    
616
    ial.Run(self.op.iallocator)
617

    
618
    assert req.RequiredNodes() == len(self.instance.all_nodes)
619

    
620
    if not ial.success:
621
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
622
                                 " %s" % (self.op.iallocator, ial.info),
623
                                 errors.ECODE_NORES)
624

    
625
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
626
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
627
                 self.op.instance_name, self.op.iallocator,
628
                 utils.CommaJoin(self.op.nodes))
629

    
630
  def CheckArguments(self):
631
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
632
      # Normalize and convert deprecated list of disk indices
633
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
634

    
635
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
636
    if duplicates:
637
      raise errors.OpPrereqError("Some disks have been specified more than"
638
                                 " once: %s" % utils.CommaJoin(duplicates),
639
                                 errors.ECODE_INVAL)
640

    
641
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
642
    # when neither iallocator nor nodes are specified
643
    if self.op.iallocator or self.op.nodes:
644
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
645

    
646
    for (idx, params) in self.op.disks:
647
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
648
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
649
      if unsupported:
650
        raise errors.OpPrereqError("Parameters for disk %s try to change"
651
                                   " unmodifyable parameter(s): %s" %
652
                                   (idx, utils.CommaJoin(unsupported)),
653
                                   errors.ECODE_INVAL)
654

    
655
  def ExpandNames(self):
656
    self._ExpandAndLockInstance()
657
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
658

    
659
    if self.op.nodes:
660
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
661
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
662
    else:
663
      self.needed_locks[locking.LEVEL_NODE] = []
664
      if self.op.iallocator:
665
        # iallocator will select a new node in the same group
666
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
667
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
668

    
669
    self.needed_locks[locking.LEVEL_NODE_RES] = []
670

    
671
  def DeclareLocks(self, level):
672
    if level == locking.LEVEL_NODEGROUP:
673
      assert self.op.iallocator is not None
674
      assert not self.op.nodes
675
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
676
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
677
      # Lock the primary group used by the instance optimistically; this
678
      # requires going via the node before it's locked, requiring
679
      # verification later on
680
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
681
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
682

    
683
    elif level == locking.LEVEL_NODE:
684
      # If an allocator is used, then we lock all the nodes in the current
685
      # instance group, as we don't know yet which ones will be selected;
686
      # if we replace the nodes without using an allocator, locks are
687
      # already declared in ExpandNames; otherwise, we need to lock all the
688
      # instance nodes for disk re-creation
689
      if self.op.iallocator:
690
        assert not self.op.nodes
691
        assert not self.needed_locks[locking.LEVEL_NODE]
692
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
693

    
694
        # Lock member nodes of the group of the primary node
695
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
696
          self.needed_locks[locking.LEVEL_NODE].extend(
697
            self.cfg.GetNodeGroup(group_uuid).members)
698

    
699
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
700
      elif not self.op.nodes:
701
        self._LockInstancesNodes(primary_only=False)
702
    elif level == locking.LEVEL_NODE_RES:
703
      # Copy node locks
704
      self.needed_locks[locking.LEVEL_NODE_RES] = \
705
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
706

    
707
  def BuildHooksEnv(self):
708
    """Build hooks env.
709

710
    This runs on master, primary and secondary nodes of the instance.
711

712
    """
713
    return BuildInstanceHookEnvByObject(self, self.instance)
714

    
715
  def BuildHooksNodes(self):
716
    """Build hooks nodes.
717

718
    """
719
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
720
    return (nl, nl)
721

    
722
  def CheckPrereq(self):
723
    """Check prerequisites.
724

725
    This checks that the instance is in the cluster and is not running.
726

727
    """
728
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
729
    assert instance is not None, \
730
      "Cannot retrieve locked instance %s" % self.op.instance_name
731
    if self.op.node_uuids:
732
      if len(self.op.node_uuids) != len(instance.all_nodes):
733
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
734
                                   " %d replacement nodes were specified" %
735
                                   (instance.name, len(instance.all_nodes),
736
                                    len(self.op.node_uuids)),
737
                                   errors.ECODE_INVAL)
738
      assert instance.disk_template != constants.DT_DRBD8 or \
739
             len(self.op.node_uuids) == 2
740
      assert instance.disk_template != constants.DT_PLAIN or \
741
             len(self.op.node_uuids) == 1
742
      primary_node = self.op.node_uuids[0]
743
    else:
744
      primary_node = instance.primary_node
745
    if not self.op.iallocator:
746
      CheckNodeOnline(self, primary_node)
747

    
748
    if instance.disk_template == constants.DT_DISKLESS:
749
      raise errors.OpPrereqError("Instance '%s' has no disks" %
750
                                 self.op.instance_name, errors.ECODE_INVAL)
751

    
752
    # Verify if node group locks are still correct
753
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
754
    if owned_groups:
755
      # Node group locks are acquired only for the primary node (and only
756
      # when the allocator is used)
757
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
758
                              primary_only=True)
759

    
760
    # if we replace nodes *and* the old primary is offline, we don't
761
    # check the instance state
762
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
763
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
764
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
765
                         msg="cannot recreate disks")
766

    
767
    if self.op.disks:
768
      self.disks = dict(self.op.disks)
769
    else:
770
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
771

    
772
    maxidx = max(self.disks.keys())
773
    if maxidx >= len(instance.disks):
774
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
775
                                 errors.ECODE_INVAL)
776

    
777
    if ((self.op.node_uuids or self.op.iallocator) and
778
         sorted(self.disks.keys()) != range(len(instance.disks))):
779
      raise errors.OpPrereqError("Can't recreate disks partially and"
780
                                 " change the nodes at the same time",
781
                                 errors.ECODE_INVAL)
782

    
783
    self.instance = instance
784

    
785
    if self.op.iallocator:
786
      self._RunAllocator()
787
      # Release unneeded node and node resource locks
788
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
789
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
790
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
791

    
792
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
793

    
794
    if self.op.node_uuids:
795
      node_uuids = self.op.node_uuids
796
    else:
797
      node_uuids = instance.all_nodes
798
    excl_stor = compat.any(
799
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
800
      )
801
    for new_params in self.disks.values():
802
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
803

    
804
  def Exec(self, feedback_fn):
805
    """Recreate the disks.
806

807
    """
808
    assert (self.owned_locks(locking.LEVEL_NODE) ==
809
            self.owned_locks(locking.LEVEL_NODE_RES))
810

    
811
    to_skip = []
812
    mods = [] # keeps track of needed changes
813

    
814
    for idx, disk in enumerate(self.instance.disks):
815
      try:
816
        changes = self.disks[idx]
817
      except KeyError:
818
        # Disk should not be recreated
819
        to_skip.append(idx)
820
        continue
821

    
822
      # update secondaries for disks, if needed
823
      if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
824
        # need to update the nodes and minors
825
        assert len(self.op.node_uuids) == 2
826
        assert len(disk.logical_id) == 6 # otherwise disk internals
827
                                         # have changed
828
        (_, _, old_port, _, _, old_secret) = disk.logical_id
829
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
830
                                                self.instance.uuid)
831
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
832
                  new_minors[0], new_minors[1], old_secret)
833
        assert len(disk.logical_id) == len(new_id)
834
      else:
835
        new_id = None
836

    
837
      mods.append((idx, new_id, changes))
838

    
839
    # now that we have passed all asserts above, we can apply the mods
840
    # in a single run (to avoid partial changes)
841
    for idx, new_id, changes in mods:
842
      disk = self.instance.disks[idx]
843
      if new_id is not None:
844
        assert disk.dev_type == constants.LD_DRBD8
845
        disk.logical_id = new_id
846
      if changes:
847
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
848
                    mode=changes.get(constants.IDISK_MODE, None),
849
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
850

    
851
    # change primary node, if needed
852
    if self.op.node_uuids:
853
      self.instance.primary_node = self.op.node_uuids[0]
854
      self.LogWarning("Changing the instance's nodes, you will have to"
855
                      " remove any disks left on the older nodes manually")
856

    
857
    if self.op.node_uuids:
858
      self.cfg.Update(self.instance, feedback_fn)
859

    
860
    # All touched nodes must be locked
861
    mylocks = self.owned_locks(locking.LEVEL_NODE)
862
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
863
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
864

    
865
    # TODO: Release node locks before wiping, or explain why it's not possible
866
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
867
      wipedisks = [(idx, disk, 0)
868
                   for (idx, disk) in enumerate(self.instance.disks)
869
                   if idx not in to_skip]
870
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
871
                         cleanup=new_disks)
872

    
873

    
874
def _PerformNodeInfoCall(lu, node_uuids, vg):
875
  """Prepares the input and performs a node info call.
876

877
  @type lu: C{LogicalUnit}
878
  @param lu: a logical unit from which we get configuration data
879
  @type node_uuids: list of string
880
  @param node_uuids: list of node UUIDs to perform the call for
881
  @type vg: string
882
  @param vg: the volume group's name
883

884
  """
885
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
886
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
887
                                                  node_uuids)
888
  hvname = lu.cfg.GetHypervisorType()
889
  hvparams = lu.cfg.GetClusterInfo().hvparams
890
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
891
                                   [(hvname, hvparams[hvname])])
892
  return nodeinfo
893

    
894

    
895
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
896
  """Checks the vg capacity for a given node.
897

898
  @type node_info: tuple (_, list of dicts, _)
899
  @param node_info: the result of the node info call for one node
900
  @type node_name: string
901
  @param node_name: the name of the node
902
  @type vg: string
903
  @param vg: volume group name
904
  @type requested: int
905
  @param requested: the amount of disk in MiB to check for
906
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
907
      or we cannot check the node
908

909
  """
910
  (_, space_info, _) = node_info
911
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
912
      space_info, constants.ST_LVM_VG)
913
  if not lvm_vg_info:
914
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
915
  vg_free = lvm_vg_info.get("storage_free", None)
916
  if not isinstance(vg_free, int):
917
    raise errors.OpPrereqError("Can't compute free disk space on node"
918
                               " %s for vg %s, result was '%s'" %
919
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
920
  if requested > vg_free:
921
    raise errors.OpPrereqError("Not enough disk space on target node %s"
922
                               " vg %s: required %d MiB, available %d MiB" %
923
                               (node_name, vg, requested, vg_free),
924
                               errors.ECODE_NORES)
925

    
926

    
927
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
928
  """Checks if nodes have enough free disk space in the specified VG.
929

930
  This function checks if all given nodes have the needed amount of
931
  free disk. In case any node has less disk or we cannot get the
932
  information from the node, this function raises an OpPrereqError
933
  exception.
934

935
  @type lu: C{LogicalUnit}
936
  @param lu: a logical unit from which we get configuration data
937
  @type node_uuids: C{list}
938
  @param node_uuids: the list of node UUIDs to check
939
  @type vg: C{str}
940
  @param vg: the volume group to check
941
  @type requested: C{int}
942
  @param requested: the amount of disk in MiB to check for
943
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
944
      or we cannot check the node
945

946
  """
947
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
948
  for node in node_uuids:
949
    node_name = lu.cfg.GetNodeName(node)
950
    info = nodeinfo[node]
951
    info.Raise("Cannot get current information from node %s" % node_name,
952
               prereq=True, ecode=errors.ECODE_ENVIRON)
953
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
954

    
955

    
956
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
957
  """Checks if nodes have enough free disk space in all the VGs.
958

959
  This function checks if all given nodes have the needed amount of
960
  free disk. In case any node has less disk or we cannot get the
961
  information from the node, this function raises an OpPrereqError
962
  exception.
963

964
  @type lu: C{LogicalUnit}
965
  @param lu: a logical unit from which we get configuration data
966
  @type node_uuids: C{list}
967
  @param node_uuids: the list of node UUIDs to check
968
  @type req_sizes: C{dict}
969
  @param req_sizes: the hash of vg and corresponding amount of disk in
970
      MiB to check for
971
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
972
      or we cannot check the node
973

974
  """
975
  for vg, req_size in req_sizes.items():
976
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
977

    
978

    
979
def _DiskSizeInBytesToMebibytes(lu, size):
980
  """Converts a disk size in bytes to mebibytes.
981

982
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
983

984
  """
985
  (mib, remainder) = divmod(size, 1024 * 1024)
986

    
987
  if remainder != 0:
988
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
989
                  " to not overwrite existing data (%s bytes will not be"
990
                  " wiped)", (1024 * 1024) - remainder)
991
    mib += 1
992

    
993
  return mib
994

    
995

    
996
def _CalcEta(time_taken, written, total_size):
997
  """Calculates the ETA based on size written and total size.
998

999
  @param time_taken: The time taken so far
1000
  @param written: amount written so far
1001
  @param total_size: The total size of data to be written
1002
  @return: The remaining time in seconds
1003

1004
  """
1005
  avg_time = time_taken / float(written)
1006
  return (total_size - written) * avg_time
1007

    
1008

    
1009
def WipeDisks(lu, instance, disks=None):
1010
  """Wipes instance disks.
1011

1012
  @type lu: L{LogicalUnit}
1013
  @param lu: the logical unit on whose behalf we execute
1014
  @type instance: L{objects.Instance}
1015
  @param instance: the instance whose disks we should create
1016
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1017
  @param disks: Disk details; tuple contains disk index, disk object and the
1018
    start offset
1019

1020
  """
1021
  node_uuid = instance.primary_node
1022
  node_name = lu.cfg.GetNodeName(node_uuid)
1023

    
1024
  if disks is None:
1025
    disks = [(idx, disk, 0)
1026
             for (idx, disk) in enumerate(instance.disks)]
1027

    
1028
  for (_, device, _) in disks:
1029
    lu.cfg.SetDiskID(device, node_uuid)
1030

    
1031
  logging.info("Pausing synchronization of disks of instance '%s'",
1032
               instance.name)
1033
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1034
                                                  (map(compat.snd, disks),
1035
                                                   instance),
1036
                                                  True)
1037
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1038

    
1039
  for idx, success in enumerate(result.payload):
1040
    if not success:
1041
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1042
                   " failed", idx, instance.name)
1043

    
1044
  try:
1045
    for (idx, device, offset) in disks:
1046
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1047
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1048
      wipe_chunk_size = \
1049
        int(min(constants.MAX_WIPE_CHUNK,
1050
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1051

    
1052
      size = device.size
1053
      last_output = 0
1054
      start_time = time.time()
1055

    
1056
      if offset == 0:
1057
        info_text = ""
1058
      else:
1059
        info_text = (" (from %s to %s)" %
1060
                     (utils.FormatUnit(offset, "h"),
1061
                      utils.FormatUnit(size, "h")))
1062

    
1063
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1064

    
1065
      logging.info("Wiping disk %d for instance %s on node %s using"
1066
                   " chunk size %s", idx, instance.name, node_name,
1067
                   wipe_chunk_size)
1068

    
1069
      while offset < size:
1070
        wipe_size = min(wipe_chunk_size, size - offset)
1071

    
1072
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1073
                      idx, offset, wipe_size)
1074

    
1075
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1076
                                           offset, wipe_size)
1077
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1078
                     (idx, offset, wipe_size))
1079

    
1080
        now = time.time()
1081
        offset += wipe_size
1082
        if now - last_output >= 60:
1083
          eta = _CalcEta(now - start_time, offset, size)
1084
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1085
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1086
          last_output = now
1087
  finally:
1088
    logging.info("Resuming synchronization of disks for instance '%s'",
1089
                 instance.name)
1090

    
1091
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1092
                                                    (map(compat.snd, disks),
1093
                                                     instance),
1094
                                                    False)
1095

    
1096
    if result.fail_msg:
1097
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1098
                    node_name, result.fail_msg)
1099
    else:
1100
      for idx, success in enumerate(result.payload):
1101
        if not success:
1102
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1103
                        " failed", idx, instance.name)
1104

    
1105

    
1106
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1107
  """Wrapper for L{WipeDisks} that handles errors.
1108

1109
  @type lu: L{LogicalUnit}
1110
  @param lu: the logical unit on whose behalf we execute
1111
  @type instance: L{objects.Instance}
1112
  @param instance: the instance whose disks we should wipe
1113
  @param disks: see L{WipeDisks}
1114
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1115
      case of error
1116
  @raise errors.OpPrereqError: in case of failure
1117

1118
  """
1119
  try:
1120
    WipeDisks(lu, instance, disks=disks)
1121
  except errors.OpExecError:
1122
    logging.warning("Wiping disks for instance '%s' failed",
1123
                    instance.name)
1124
    _UndoCreateDisks(lu, cleanup)
1125
    raise
1126

    
1127

    
1128
def ExpandCheckDisks(instance, disks):
1129
  """Return the instance disks selected by the disks list
1130

1131
  @type disks: list of L{objects.Disk} or None
1132
  @param disks: selected disks
1133
  @rtype: list of L{objects.Disk}
1134
  @return: selected instance disks to act on
1135

1136
  """
1137
  if disks is None:
1138
    return instance.disks
1139
  else:
1140
    if not set(disks).issubset(instance.disks):
1141
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1142
                                   " target instance: expected a subset of %r,"
1143
                                   " got %r" % (instance.disks, disks))
1144
    return disks
1145

    
1146

    
1147
def WaitForSync(lu, instance, disks=None, oneshot=False):
1148
  """Sleep and poll for an instance's disk to sync.
1149

1150
  """
1151
  if not instance.disks or disks is not None and not disks:
1152
    return True
1153

    
1154
  disks = ExpandCheckDisks(instance, disks)
1155

    
1156
  if not oneshot:
1157
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1158

    
1159
  node_uuid = instance.primary_node
1160
  node_name = lu.cfg.GetNodeName(node_uuid)
1161

    
1162
  for dev in disks:
1163
    lu.cfg.SetDiskID(dev, node_uuid)
1164

    
1165
  # TODO: Convert to utils.Retry
1166

    
1167
  retries = 0
1168
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1169
  while True:
1170
    max_time = 0
1171
    done = True
1172
    cumul_degraded = False
1173
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1174
    msg = rstats.fail_msg
1175
    if msg:
1176
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1177
      retries += 1
1178
      if retries >= 10:
1179
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1180
                                 " aborting." % node_name)
1181
      time.sleep(6)
1182
      continue
1183
    rstats = rstats.payload
1184
    retries = 0
1185
    for i, mstat in enumerate(rstats):
1186
      if mstat is None:
1187
        lu.LogWarning("Can't compute data for node %s/%s",
1188
                      node_name, disks[i].iv_name)
1189
        continue
1190

    
1191
      cumul_degraded = (cumul_degraded or
1192
                        (mstat.is_degraded and mstat.sync_percent is None))
1193
      if mstat.sync_percent is not None:
1194
        done = False
1195
        if mstat.estimated_time is not None:
1196
          rem_time = ("%s remaining (estimated)" %
1197
                      utils.FormatSeconds(mstat.estimated_time))
1198
          max_time = mstat.estimated_time
1199
        else:
1200
          rem_time = "no time estimate"
1201
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1202
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1203

    
1204
    # if we're done but degraded, let's do a few small retries, to
1205
    # make sure we see a stable and not transient situation; therefore
1206
    # we force restart of the loop
1207
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1208
      logging.info("Degraded disks found, %d retries left", degr_retries)
1209
      degr_retries -= 1
1210
      time.sleep(1)
1211
      continue
1212

    
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1220

    
1221
  return not cumul_degraded
1222

    
1223

    
1224
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1225
  """Shutdown block devices of an instance.
1226

1227
  This does the shutdown on all nodes of the instance.
1228

1229
  If the ignore_primary is false, errors on the primary node are
1230
  ignored.
1231

1232
  """
1233
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1234
  all_result = True
1235
  disks = ExpandCheckDisks(instance, disks)
1236

    
1237
  for disk in disks:
1238
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1239
      lu.cfg.SetDiskID(top_disk, node_uuid)
1240
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1241
      msg = result.fail_msg
1242
      if msg:
1243
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1244
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1245
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1246
            (node_uuid != instance.primary_node and not result.offline)):
1247
          all_result = False
1248
  return all_result
1249

    
1250

    
1251
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1252
  """Shutdown block devices of an instance.
1253

1254
  This function checks if an instance is running, before calling
1255
  _ShutdownInstanceDisks.
1256

1257
  """
1258
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1259
  ShutdownInstanceDisks(lu, instance, disks=disks)
1260

    
1261

    
1262
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1263
                           ignore_size=False):
1264
  """Prepare the block devices for an instance.
1265

1266
  This sets up the block devices on all nodes.
1267

1268
  @type lu: L{LogicalUnit}
1269
  @param lu: the logical unit on whose behalf we execute
1270
  @type instance: L{objects.Instance}
1271
  @param instance: the instance for whose disks we assemble
1272
  @type disks: list of L{objects.Disk} or None
1273
  @param disks: which disks to assemble (or all, if None)
1274
  @type ignore_secondaries: boolean
1275
  @param ignore_secondaries: if true, errors on secondary nodes
1276
      won't result in an error return from the function
1277
  @type ignore_size: boolean
1278
  @param ignore_size: if true, the current known size of the disk
1279
      will not be used during the disk activation, useful for cases
1280
      when the size is wrong
1281
  @return: False if the operation failed, otherwise a list of
1282
      (host, instance_visible_name, node_visible_name)
1283
      with the mapping from node devices to instance devices
1284

1285
  """
1286
  device_info = []
1287
  disks_ok = True
1288
  disks = ExpandCheckDisks(instance, disks)
1289

    
1290
  # With the two passes mechanism we try to reduce the window of
1291
  # opportunity for the race condition of switching DRBD to primary
1292
  # before handshaking occured, but we do not eliminate it
1293

    
1294
  # The proper fix would be to wait (with some limits) until the
1295
  # connection has been made and drbd transitions from WFConnection
1296
  # into any other network-connected state (Connected, SyncTarget,
1297
  # SyncSource, etc.)
1298

    
1299
  # mark instance disks as active before doing actual work, so watcher does
1300
  # not try to shut them down erroneously
1301
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1302

    
1303
  # 1st pass, assemble on all nodes in secondary mode
1304
  for idx, inst_disk in enumerate(disks):
1305
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1306
                                  instance.primary_node):
1307
      if ignore_size:
1308
        node_disk = node_disk.Copy()
1309
        node_disk.UnsetSize()
1310
      lu.cfg.SetDiskID(node_disk, node_uuid)
1311
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1312
                                             instance.name, False, idx)
1313
      msg = result.fail_msg
1314
      if msg:
1315
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1316
                                result.offline)
1317
        lu.LogWarning("Could not prepare block device %s on node %s"
1318
                      " (is_primary=False, pass=1): %s",
1319
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1320
        if not (ignore_secondaries or is_offline_secondary):
1321
          disks_ok = False
1322

    
1323
  # FIXME: race condition on drbd migration to primary
1324

    
1325
  # 2nd pass, do only the primary node
1326
  for idx, inst_disk in enumerate(disks):
1327
    dev_path = None
1328

    
1329
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1330
                                  instance.primary_node):
1331
      if node_uuid != instance.primary_node:
1332
        continue
1333
      if ignore_size:
1334
        node_disk = node_disk.Copy()
1335
        node_disk.UnsetSize()
1336
      lu.cfg.SetDiskID(node_disk, node_uuid)
1337
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1338
                                             instance.name, True, idx)
1339
      msg = result.fail_msg
1340
      if msg:
1341
        lu.LogWarning("Could not prepare block device %s on node %s"
1342
                      " (is_primary=True, pass=2): %s",
1343
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1344
        disks_ok = False
1345
      else:
1346
        dev_path = result.payload
1347

    
1348
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1349
                        inst_disk.iv_name, dev_path))
1350

    
1351
  # leave the disks configured for the primary node
1352
  # this is a workaround that would be fixed better by
1353
  # improving the logical/physical id handling
1354
  for disk in disks:
1355
    lu.cfg.SetDiskID(disk, instance.primary_node)
1356

    
1357
  if not disks_ok:
1358
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1359

    
1360
  return disks_ok, device_info
1361

    
1362

    
1363
def StartInstanceDisks(lu, instance, force):
1364
  """Start the disks of an instance.
1365

1366
  """
1367
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1368
                                      ignore_secondaries=force)
1369
  if not disks_ok:
1370
    ShutdownInstanceDisks(lu, instance)
1371
    if force is not None and not force:
1372
      lu.LogWarning("",
1373
                    hint=("If the message above refers to a secondary node,"
1374
                          " you can retry the operation using '--force'"))
1375
    raise errors.OpExecError("Disk consistency error")
1376

    
1377

    
1378
class LUInstanceGrowDisk(LogicalUnit):
1379
  """Grow a disk of an instance.
1380

1381
  """
1382
  HPATH = "disk-grow"
1383
  HTYPE = constants.HTYPE_INSTANCE
1384
  REQ_BGL = False
1385

    
1386
  def ExpandNames(self):
1387
    self._ExpandAndLockInstance()
1388
    self.needed_locks[locking.LEVEL_NODE] = []
1389
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1390
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1391
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1392

    
1393
  def DeclareLocks(self, level):
1394
    if level == locking.LEVEL_NODE:
1395
      self._LockInstancesNodes()
1396
    elif level == locking.LEVEL_NODE_RES:
1397
      # Copy node locks
1398
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1399
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1400

    
1401
  def BuildHooksEnv(self):
1402
    """Build hooks env.
1403

1404
    This runs on the master, the primary and all the secondaries.
1405

1406
    """
1407
    env = {
1408
      "DISK": self.op.disk,
1409
      "AMOUNT": self.op.amount,
1410
      "ABSOLUTE": self.op.absolute,
1411
      }
1412
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1413
    return env
1414

    
1415
  def BuildHooksNodes(self):
1416
    """Build hooks nodes.
1417

1418
    """
1419
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1420
    return (nl, nl)
1421

    
1422
  def CheckPrereq(self):
1423
    """Check prerequisites.
1424

1425
    This checks that the instance is in the cluster.
1426

1427
    """
1428
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1429
    assert self.instance is not None, \
1430
      "Cannot retrieve locked instance %s" % self.op.instance_name
1431
    node_uuids = list(self.instance.all_nodes)
1432
    for node_uuid in node_uuids:
1433
      CheckNodeOnline(self, node_uuid)
1434
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1435

    
1436
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1437
      raise errors.OpPrereqError("Instance's disk layout does not support"
1438
                                 " growing", errors.ECODE_INVAL)
1439

    
1440
    self.disk = self.instance.FindDisk(self.op.disk)
1441

    
1442
    if self.op.absolute:
1443
      self.target = self.op.amount
1444
      self.delta = self.target - self.disk.size
1445
      if self.delta < 0:
1446
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1447
                                   "current disk size (%s)" %
1448
                                   (utils.FormatUnit(self.target, "h"),
1449
                                    utils.FormatUnit(self.disk.size, "h")),
1450
                                   errors.ECODE_STATE)
1451
    else:
1452
      self.delta = self.op.amount
1453
      self.target = self.disk.size + self.delta
1454
      if self.delta < 0:
1455
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1456
                                   utils.FormatUnit(self.delta, "h"),
1457
                                   errors.ECODE_INVAL)
1458

    
1459
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1460

    
1461
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1462
    template = self.instance.disk_template
1463
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1464
        not any(self.node_es_flags.values())):
1465
      # TODO: check the free disk space for file, when that feature will be
1466
      # supported
1467
      # With exclusive storage we need to do something smarter than just looking
1468
      # at free space, which, in the end, is basically a dry run. So we rely on
1469
      # the dry run performed in Exec() instead.
1470
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1471

    
1472
  def Exec(self, feedback_fn):
1473
    """Execute disk grow.
1474

1475
    """
1476
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1477
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1478
            self.owned_locks(locking.LEVEL_NODE_RES))
1479

    
1480
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1481

    
1482
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1483
    if not disks_ok:
1484
      raise errors.OpExecError("Cannot activate block device to grow")
1485

    
1486
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1487
                (self.op.disk, self.instance.name,
1488
                 utils.FormatUnit(self.delta, "h"),
1489
                 utils.FormatUnit(self.target, "h")))
1490

    
1491
    # First run all grow ops in dry-run mode
1492
    for node_uuid in self.instance.all_nodes:
1493
      self.cfg.SetDiskID(self.disk, node_uuid)
1494
      result = self.rpc.call_blockdev_grow(node_uuid,
1495
                                           (self.disk, self.instance),
1496
                                           self.delta, True, True,
1497
                                           self.node_es_flags[node_uuid])
1498
      result.Raise("Dry-run grow request failed to node %s" %
1499
                   self.cfg.GetNodeName(node_uuid))
1500

    
1501
    if wipe_disks:
1502
      # Get disk size from primary node for wiping
1503
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1504
      result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1505
                                                    [self.disk])
1506
      result.Raise("Failed to retrieve disk size from node '%s'" %
1507
                   self.instance.primary_node)
1508

    
1509
      (disk_dimensions, ) = result.payload
1510

    
1511
      if disk_dimensions is None:
1512
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1513
                                 " node '%s'" % self.instance.primary_node)
1514
      (disk_size_in_bytes, _) = disk_dimensions
1515

    
1516
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1517

    
1518
      assert old_disk_size >= self.disk.size, \
1519
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1520
         (old_disk_size, self.disk.size))
1521
    else:
1522
      old_disk_size = None
1523

    
1524
    # We know that (as far as we can test) operations across different
1525
    # nodes will succeed, time to run it for real on the backing storage
1526
    for node_uuid in self.instance.all_nodes:
1527
      self.cfg.SetDiskID(self.disk, node_uuid)
1528
      result = self.rpc.call_blockdev_grow(node_uuid,
1529
                                           (self.disk, self.instance),
1530
                                           self.delta, False, True,
1531
                                           self.node_es_flags[node_uuid])
1532
      result.Raise("Grow request failed to node %s" %
1533
                   self.cfg.GetNodeName(node_uuid))
1534

    
1535
    # And now execute it for logical storage, on the primary node
1536
    node_uuid = self.instance.primary_node
1537
    self.cfg.SetDiskID(self.disk, node_uuid)
1538
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1539
                                         self.delta, False, False,
1540
                                         self.node_es_flags[node_uuid])
1541
    result.Raise("Grow request failed to node %s" %
1542
                 self.cfg.GetNodeName(node_uuid))
1543

    
1544
    self.disk.RecordGrow(self.delta)
1545
    self.cfg.Update(self.instance, feedback_fn)
1546

    
1547
    # Changes have been recorded, release node lock
1548
    ReleaseLocks(self, locking.LEVEL_NODE)
1549

    
1550
    # Downgrade lock while waiting for sync
1551
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1552

    
1553
    assert wipe_disks ^ (old_disk_size is None)
1554

    
1555
    if wipe_disks:
1556
      assert self.instance.disks[self.op.disk] == self.disk
1557

    
1558
      # Wipe newly added disk space
1559
      WipeDisks(self, self.instance,
1560
                disks=[(self.op.disk, self.disk, old_disk_size)])
1561

    
1562
    if self.op.wait_for_sync:
1563
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1564
      if disk_abort:
1565
        self.LogWarning("Disk syncing has not returned a good status; check"
1566
                        " the instance")
1567
      if not self.instance.disks_active:
1568
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1569
    elif not self.instance.disks_active:
1570
      self.LogWarning("Not shutting down the disk even if the instance is"
1571
                      " not supposed to be running because no wait for"
1572
                      " sync mode was requested")
1573

    
1574
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1575
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1576

    
1577

    
1578
class LUInstanceReplaceDisks(LogicalUnit):
1579
  """Replace the disks of an instance.
1580

1581
  """
1582
  HPATH = "mirrors-replace"
1583
  HTYPE = constants.HTYPE_INSTANCE
1584
  REQ_BGL = False
1585

    
1586
  def CheckArguments(self):
1587
    """Check arguments.
1588

1589
    """
1590
    if self.op.mode == constants.REPLACE_DISK_CHG:
1591
      if self.op.remote_node is None and self.op.iallocator is None:
1592
        raise errors.OpPrereqError("When changing the secondary either an"
1593
                                   " iallocator script must be used or the"
1594
                                   " new node given", errors.ECODE_INVAL)
1595
      else:
1596
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1597

    
1598
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1599
      # Not replacing the secondary
1600
      raise errors.OpPrereqError("The iallocator and new node options can"
1601
                                 " only be used when changing the"
1602
                                 " secondary node", errors.ECODE_INVAL)
1603

    
1604
  def ExpandNames(self):
1605
    self._ExpandAndLockInstance()
1606

    
1607
    assert locking.LEVEL_NODE not in self.needed_locks
1608
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1609
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1610

    
1611
    assert self.op.iallocator is None or self.op.remote_node is None, \
1612
      "Conflicting options"
1613

    
1614
    if self.op.remote_node is not None:
1615
      (self.op.remote_node_uuid, self.op.remote_node) = \
1616
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1617
                              self.op.remote_node)
1618

    
1619
      # Warning: do not remove the locking of the new secondary here
1620
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1621
      # currently it doesn't since parallel invocations of
1622
      # FindUnusedMinor will conflict
1623
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1624
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1625
    else:
1626
      self.needed_locks[locking.LEVEL_NODE] = []
1627
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1628

    
1629
      if self.op.iallocator is not None:
1630
        # iallocator will select a new node in the same group
1631
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1632
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1633

    
1634
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1635

    
1636
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1637
                                   self.op.instance_name, self.op.mode,
1638
                                   self.op.iallocator, self.op.remote_node_uuid,
1639
                                   self.op.disks, self.op.early_release,
1640
                                   self.op.ignore_ipolicy)
1641

    
1642
    self.tasklets = [self.replacer]
1643

    
1644
  def DeclareLocks(self, level):
1645
    if level == locking.LEVEL_NODEGROUP:
1646
      assert self.op.remote_node_uuid is None
1647
      assert self.op.iallocator is not None
1648
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1649

    
1650
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1651
      # Lock all groups used by instance optimistically; this requires going
1652
      # via the node before it's locked, requiring verification later on
1653
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1654
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1655

    
1656
    elif level == locking.LEVEL_NODE:
1657
      if self.op.iallocator is not None:
1658
        assert self.op.remote_node_uuid is None
1659
        assert not self.needed_locks[locking.LEVEL_NODE]
1660
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1661

    
1662
        # Lock member nodes of all locked groups
1663
        self.needed_locks[locking.LEVEL_NODE] = \
1664
          [node_uuid
1665
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1666
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1667
      else:
1668
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1669

    
1670
        self._LockInstancesNodes()
1671

    
1672
    elif level == locking.LEVEL_NODE_RES:
1673
      # Reuse node locks
1674
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1675
        self.needed_locks[locking.LEVEL_NODE]
1676

    
1677
  def BuildHooksEnv(self):
1678
    """Build hooks env.
1679

1680
    This runs on the master, the primary and all the secondaries.
1681

1682
    """
1683
    instance = self.replacer.instance
1684
    env = {
1685
      "MODE": self.op.mode,
1686
      "NEW_SECONDARY": self.op.remote_node,
1687
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1688
      }
1689
    env.update(BuildInstanceHookEnvByObject(self, instance))
1690
    return env
1691

    
1692
  def BuildHooksNodes(self):
1693
    """Build hooks nodes.
1694

1695
    """
1696
    instance = self.replacer.instance
1697
    nl = [
1698
      self.cfg.GetMasterNode(),
1699
      instance.primary_node,
1700
      ]
1701
    if self.op.remote_node_uuid is not None:
1702
      nl.append(self.op.remote_node_uuid)
1703
    return nl, nl
1704

    
1705
  def CheckPrereq(self):
1706
    """Check prerequisites.
1707

1708
    """
1709
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1710
            self.op.iallocator is None)
1711

    
1712
    # Verify if node group locks are still correct
1713
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1714
    if owned_groups:
1715
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1716

    
1717
    return LogicalUnit.CheckPrereq(self)
1718

    
1719

    
1720
class LUInstanceActivateDisks(NoHooksLU):
1721
  """Bring up an instance's disks.
1722

1723
  """
1724
  REQ_BGL = False
1725

    
1726
  def ExpandNames(self):
1727
    self._ExpandAndLockInstance()
1728
    self.needed_locks[locking.LEVEL_NODE] = []
1729
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1730

    
1731
  def DeclareLocks(self, level):
1732
    if level == locking.LEVEL_NODE:
1733
      self._LockInstancesNodes()
1734

    
1735
  def CheckPrereq(self):
1736
    """Check prerequisites.
1737

1738
    This checks that the instance is in the cluster.
1739

1740
    """
1741
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1742
    assert self.instance is not None, \
1743
      "Cannot retrieve locked instance %s" % self.op.instance_name
1744
    CheckNodeOnline(self, self.instance.primary_node)
1745

    
1746
  def Exec(self, feedback_fn):
1747
    """Activate the disks.
1748

1749
    """
1750
    disks_ok, disks_info = \
1751
              AssembleInstanceDisks(self, self.instance,
1752
                                    ignore_size=self.op.ignore_size)
1753
    if not disks_ok:
1754
      raise errors.OpExecError("Cannot activate block devices")
1755

    
1756
    if self.op.wait_for_sync:
1757
      if not WaitForSync(self, self.instance):
1758
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1759
        raise errors.OpExecError("Some disks of the instance are degraded!")
1760

    
1761
    return disks_info
1762

    
1763

    
1764
class LUInstanceDeactivateDisks(NoHooksLU):
1765
  """Shutdown an instance's disks.
1766

1767
  """
1768
  REQ_BGL = False
1769

    
1770
  def ExpandNames(self):
1771
    self._ExpandAndLockInstance()
1772
    self.needed_locks[locking.LEVEL_NODE] = []
1773
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1774

    
1775
  def DeclareLocks(self, level):
1776
    if level == locking.LEVEL_NODE:
1777
      self._LockInstancesNodes()
1778

    
1779
  def CheckPrereq(self):
1780
    """Check prerequisites.
1781

1782
    This checks that the instance is in the cluster.
1783

1784
    """
1785
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1786
    assert self.instance is not None, \
1787
      "Cannot retrieve locked instance %s" % self.op.instance_name
1788

    
1789
  def Exec(self, feedback_fn):
1790
    """Deactivate the disks
1791

1792
    """
1793
    if self.op.force:
1794
      ShutdownInstanceDisks(self, self.instance)
1795
    else:
1796
      _SafeShutdownInstanceDisks(self, self.instance)
1797

    
1798

    
1799
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1800
                               ldisk=False):
1801
  """Check that mirrors are not degraded.
1802

1803
  @attention: The device has to be annotated already.
1804

1805
  The ldisk parameter, if True, will change the test from the
1806
  is_degraded attribute (which represents overall non-ok status for
1807
  the device(s)) to the ldisk (representing the local storage status).
1808

1809
  """
1810
  lu.cfg.SetDiskID(dev, node_uuid)
1811

    
1812
  result = True
1813

    
1814
  if on_primary or dev.AssembleOnSecondary():
1815
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1816
    msg = rstats.fail_msg
1817
    if msg:
1818
      lu.LogWarning("Can't find disk on node %s: %s",
1819
                    lu.cfg.GetNodeName(node_uuid), msg)
1820
      result = False
1821
    elif not rstats.payload:
1822
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1823
      result = False
1824
    else:
1825
      if ldisk:
1826
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1827
      else:
1828
        result = result and not rstats.payload.is_degraded
1829

    
1830
  if dev.children:
1831
    for child in dev.children:
1832
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1833
                                                     node_uuid, on_primary)
1834

    
1835
  return result
1836

    
1837

    
1838
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1839
  """Wrapper around L{_CheckDiskConsistencyInner}.
1840

1841
  """
1842
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1843
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1844
                                    ldisk=ldisk)
1845

    
1846

    
1847
def _BlockdevFind(lu, node_uuid, dev, instance):
1848
  """Wrapper around call_blockdev_find to annotate diskparams.
1849

1850
  @param lu: A reference to the lu object
1851
  @param node_uuid: The node to call out
1852
  @param dev: The device to find
1853
  @param instance: The instance object the device belongs to
1854
  @returns The result of the rpc call
1855

1856
  """
1857
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1858
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1859

    
1860

    
1861
def _GenerateUniqueNames(lu, exts):
1862
  """Generate a suitable LV name.
1863

1864
  This will generate a logical volume name for the given instance.
1865

1866
  """
1867
  results = []
1868
  for val in exts:
1869
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1870
    results.append("%s%s" % (new_id, val))
1871
  return results
1872

    
1873

    
1874
class TLReplaceDisks(Tasklet):
1875
  """Replaces disks for an instance.
1876

1877
  Note: Locking is not within the scope of this class.
1878

1879
  """
1880
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1881
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1882
    """Initializes this class.
1883

1884
    """
1885
    Tasklet.__init__(self, lu)
1886

    
1887
    # Parameters
1888
    self.instance_uuid = instance_uuid
1889
    self.instance_name = instance_name
1890
    self.mode = mode
1891
    self.iallocator_name = iallocator_name
1892
    self.remote_node_uuid = remote_node_uuid
1893
    self.disks = disks
1894
    self.early_release = early_release
1895
    self.ignore_ipolicy = ignore_ipolicy
1896

    
1897
    # Runtime data
1898
    self.instance = None
1899
    self.new_node_uuid = None
1900
    self.target_node_uuid = None
1901
    self.other_node_uuid = None
1902
    self.remote_node_info = None
1903
    self.node_secondary_ip = None
1904

    
1905
  @staticmethod
1906
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1907
                    relocate_from_node_uuids):
1908
    """Compute a new secondary node using an IAllocator.
1909

1910
    """
1911
    req = iallocator.IAReqRelocate(
1912
          inst_uuid=instance_uuid,
1913
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1914
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1915

    
1916
    ial.Run(iallocator_name)
1917

    
1918
    if not ial.success:
1919
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1920
                                 " %s" % (iallocator_name, ial.info),
1921
                                 errors.ECODE_NORES)
1922

    
1923
    remote_node_name = ial.result[0]
1924
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1925

    
1926
    if remote_node is None:
1927
      raise errors.OpPrereqError("Node %s not found in configuration" %
1928
                                 remote_node_name, errors.ECODE_NOENT)
1929

    
1930
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1931
               instance_uuid, remote_node_name)
1932

    
1933
    return remote_node.uuid
1934

    
1935
  def _FindFaultyDisks(self, node_uuid):
1936
    """Wrapper for L{FindFaultyInstanceDisks}.
1937

1938
    """
1939
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1940
                                   node_uuid, True)
1941

    
1942
  def _CheckDisksActivated(self, instance):
1943
    """Checks if the instance disks are activated.
1944

1945
    @param instance: The instance to check disks
1946
    @return: True if they are activated, False otherwise
1947

1948
    """
1949
    node_uuids = instance.all_nodes
1950

    
1951
    for idx, dev in enumerate(instance.disks):
1952
      for node_uuid in node_uuids:
1953
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1954
                        self.cfg.GetNodeName(node_uuid))
1955
        self.cfg.SetDiskID(dev, node_uuid)
1956

    
1957
        result = _BlockdevFind(self, node_uuid, dev, instance)
1958

    
1959
        if result.offline:
1960
          continue
1961
        elif result.fail_msg or not result.payload:
1962
          return False
1963

    
1964
    return True
1965

    
1966
  def CheckPrereq(self):
1967
    """Check prerequisites.
1968

1969
    This checks that the instance is in the cluster.
1970

1971
    """
1972
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1973
    assert self.instance is not None, \
1974
      "Cannot retrieve locked instance %s" % self.instance_name
1975

    
1976
    if self.instance.disk_template != constants.DT_DRBD8:
1977
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1978
                                 " instances", errors.ECODE_INVAL)
1979

    
1980
    if len(self.instance.secondary_nodes) != 1:
1981
      raise errors.OpPrereqError("The instance has a strange layout,"
1982
                                 " expected one secondary but found %d" %
1983
                                 len(self.instance.secondary_nodes),
1984
                                 errors.ECODE_FAULT)
1985

    
1986
    secondary_node_uuid = self.instance.secondary_nodes[0]
1987

    
1988
    if self.iallocator_name is None:
1989
      remote_node_uuid = self.remote_node_uuid
1990
    else:
1991
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1992
                                            self.instance.uuid,
1993
                                            self.instance.secondary_nodes)
1994

    
1995
    if remote_node_uuid is None:
1996
      self.remote_node_info = None
1997
    else:
1998
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1999
             "Remote node '%s' is not locked" % remote_node_uuid
2000

    
2001
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2002
      assert self.remote_node_info is not None, \
2003
        "Cannot retrieve locked node %s" % remote_node_uuid
2004

    
2005
    if remote_node_uuid == self.instance.primary_node:
2006
      raise errors.OpPrereqError("The specified node is the primary node of"
2007
                                 " the instance", errors.ECODE_INVAL)
2008

    
2009
    if remote_node_uuid == secondary_node_uuid:
2010
      raise errors.OpPrereqError("The specified node is already the"
2011
                                 " secondary node of the instance",
2012
                                 errors.ECODE_INVAL)
2013

    
2014
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2015
                                    constants.REPLACE_DISK_CHG):
2016
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2017
                                 errors.ECODE_INVAL)
2018

    
2019
    if self.mode == constants.REPLACE_DISK_AUTO:
2020
      if not self._CheckDisksActivated(self.instance):
2021
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2022
                                   " first" % self.instance_name,
2023
                                   errors.ECODE_STATE)
2024
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2025
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2026

    
2027
      if faulty_primary and faulty_secondary:
2028
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2029
                                   " one node and can not be repaired"
2030
                                   " automatically" % self.instance_name,
2031
                                   errors.ECODE_STATE)
2032

    
2033
      if faulty_primary:
2034
        self.disks = faulty_primary
2035
        self.target_node_uuid = self.instance.primary_node
2036
        self.other_node_uuid = secondary_node_uuid
2037
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2038
      elif faulty_secondary:
2039
        self.disks = faulty_secondary
2040
        self.target_node_uuid = secondary_node_uuid
2041
        self.other_node_uuid = self.instance.primary_node
2042
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2043
      else:
2044
        self.disks = []
2045
        check_nodes = []
2046

    
2047
    else:
2048
      # Non-automatic modes
2049
      if self.mode == constants.REPLACE_DISK_PRI:
2050
        self.target_node_uuid = self.instance.primary_node
2051
        self.other_node_uuid = secondary_node_uuid
2052
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2053

    
2054
      elif self.mode == constants.REPLACE_DISK_SEC:
2055
        self.target_node_uuid = secondary_node_uuid
2056
        self.other_node_uuid = self.instance.primary_node
2057
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2058

    
2059
      elif self.mode == constants.REPLACE_DISK_CHG:
2060
        self.new_node_uuid = remote_node_uuid
2061
        self.other_node_uuid = self.instance.primary_node
2062
        self.target_node_uuid = secondary_node_uuid
2063
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2064

    
2065
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2066
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2067

    
2068
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2069
        assert old_node_info is not None
2070
        if old_node_info.offline and not self.early_release:
2071
          # doesn't make sense to delay the release
2072
          self.early_release = True
2073
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2074
                          " early-release mode", secondary_node_uuid)
2075

    
2076
      else:
2077
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2078
                                     self.mode)
2079

    
2080
      # If not specified all disks should be replaced
2081
      if not self.disks:
2082
        self.disks = range(len(self.instance.disks))
2083

    
2084
    # TODO: This is ugly, but right now we can't distinguish between internal
2085
    # submitted opcode and external one. We should fix that.
2086
    if self.remote_node_info:
2087
      # We change the node, lets verify it still meets instance policy
2088
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2089
      cluster = self.cfg.GetClusterInfo()
2090
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2091
                                                              new_group_info)
2092
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2093
                             self.remote_node_info, self.cfg,
2094
                             ignore=self.ignore_ipolicy)
2095

    
2096
    for node_uuid in check_nodes:
2097
      CheckNodeOnline(self.lu, node_uuid)
2098

    
2099
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2100
                                                          self.other_node_uuid,
2101
                                                          self.target_node_uuid]
2102
                              if node_uuid is not None)
2103

    
2104
    # Release unneeded node and node resource locks
2105
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2106
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2107
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2108

    
2109
    # Release any owned node group
2110
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2111

    
2112
    # Check whether disks are valid
2113
    for disk_idx in self.disks:
2114
      self.instance.FindDisk(disk_idx)
2115

    
2116
    # Get secondary node IP addresses
2117
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2118
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2119

    
2120
  def Exec(self, feedback_fn):
2121
    """Execute disk replacement.
2122

2123
    This dispatches the disk replacement to the appropriate handler.
2124

2125
    """
2126
    if __debug__:
2127
      # Verify owned locks before starting operation
2128
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2129
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2130
          ("Incorrect node locks, owning %s, expected %s" %
2131
           (owned_nodes, self.node_secondary_ip.keys()))
2132
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2133
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2134
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2135

    
2136
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2137
      assert list(owned_instances) == [self.instance_name], \
2138
          "Instance '%s' not locked" % self.instance_name
2139

    
2140
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2141
          "Should not own any node group lock at this point"
2142

    
2143
    if not self.disks:
2144
      feedback_fn("No disks need replacement for instance '%s'" %
2145
                  self.instance.name)
2146
      return
2147

    
2148
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2149
                (utils.CommaJoin(self.disks), self.instance.name))
2150
    feedback_fn("Current primary node: %s" %
2151
                self.cfg.GetNodeName(self.instance.primary_node))
2152
    feedback_fn("Current seconary node: %s" %
2153
                utils.CommaJoin(self.cfg.GetNodeNames(
2154
                                  self.instance.secondary_nodes)))
2155

    
2156
    activate_disks = not self.instance.disks_active
2157

    
2158
    # Activate the instance disks if we're replacing them on a down instance
2159
    if activate_disks:
2160
      StartInstanceDisks(self.lu, self.instance, True)
2161

    
2162
    try:
2163
      # Should we replace the secondary node?
2164
      if self.new_node_uuid is not None:
2165
        fn = self._ExecDrbd8Secondary
2166
      else:
2167
        fn = self._ExecDrbd8DiskOnly
2168

    
2169
      result = fn(feedback_fn)
2170
    finally:
2171
      # Deactivate the instance disks if we're replacing them on a
2172
      # down instance
2173
      if activate_disks:
2174
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2175

    
2176
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2177

    
2178
    if __debug__:
2179
      # Verify owned locks
2180
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2181
      nodes = frozenset(self.node_secondary_ip)
2182
      assert ((self.early_release and not owned_nodes) or
2183
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2184
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2185
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2186

    
2187
    return result
2188

    
2189
  def _CheckVolumeGroup(self, node_uuids):
2190
    self.lu.LogInfo("Checking volume groups")
2191

    
2192
    vgname = self.cfg.GetVGName()
2193

    
2194
    # Make sure volume group exists on all involved nodes
2195
    results = self.rpc.call_vg_list(node_uuids)
2196
    if not results:
2197
      raise errors.OpExecError("Can't list volume groups on the nodes")
2198

    
2199
    for node_uuid in node_uuids:
2200
      res = results[node_uuid]
2201
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2202
      if vgname not in res.payload:
2203
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2204
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2205

    
2206
  def _CheckDisksExistence(self, node_uuids):
2207
    # Check disk existence
2208
    for idx, dev in enumerate(self.instance.disks):
2209
      if idx not in self.disks:
2210
        continue
2211

    
2212
      for node_uuid in node_uuids:
2213
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2214
                        self.cfg.GetNodeName(node_uuid))
2215
        self.cfg.SetDiskID(dev, node_uuid)
2216

    
2217
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2218

    
2219
        msg = result.fail_msg
2220
        if msg or not result.payload:
2221
          if not msg:
2222
            msg = "disk not found"
2223
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2224
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2225

    
2226
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2227
    for idx, dev in enumerate(self.instance.disks):
2228
      if idx not in self.disks:
2229
        continue
2230

    
2231
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2232
                      (idx, self.cfg.GetNodeName(node_uuid)))
2233

    
2234
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2235
                                  on_primary, ldisk=ldisk):
2236
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2237
                                 " replace disks for instance %s" %
2238
                                 (self.cfg.GetNodeName(node_uuid),
2239
                                  self.instance.name))
2240

    
2241
  def _CreateNewStorage(self, node_uuid):
2242
    """Create new storage on the primary or secondary node.
2243

2244
    This is only used for same-node replaces, not for changing the
2245
    secondary node, hence we don't want to modify the existing disk.
2246

2247
    """
2248
    iv_names = {}
2249

    
2250
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2251
    for idx, dev in enumerate(disks):
2252
      if idx not in self.disks:
2253
        continue
2254

    
2255
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2256
                      self.cfg.GetNodeName(node_uuid), idx)
2257

    
2258
      self.cfg.SetDiskID(dev, node_uuid)
2259

    
2260
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2261
      names = _GenerateUniqueNames(self.lu, lv_names)
2262

    
2263
      (data_disk, meta_disk) = dev.children
2264
      vg_data = data_disk.logical_id[0]
2265
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2266
                             logical_id=(vg_data, names[0]),
2267
                             params=data_disk.params)
2268
      vg_meta = meta_disk.logical_id[0]
2269
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2270
                             size=constants.DRBD_META_SIZE,
2271
                             logical_id=(vg_meta, names[1]),
2272
                             params=meta_disk.params)
2273

    
2274
      new_lvs = [lv_data, lv_meta]
2275
      old_lvs = [child.Copy() for child in dev.children]
2276
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2277
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2278

    
2279
      # we pass force_create=True to force the LVM creation
2280
      for new_lv in new_lvs:
2281
        try:
2282
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2283
                               GetInstanceInfoText(self.instance), False,
2284
                               excl_stor)
2285
        except errors.DeviceCreationError, e:
2286
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2287

    
2288
    return iv_names
2289

    
2290
  def _CheckDevices(self, node_uuid, iv_names):
2291
    for name, (dev, _, _) in iv_names.iteritems():
2292
      self.cfg.SetDiskID(dev, node_uuid)
2293

    
2294
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2295

    
2296
      msg = result.fail_msg
2297
      if msg or not result.payload:
2298
        if not msg:
2299
          msg = "disk not found"
2300
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2301
                                 (name, msg))
2302

    
2303
      if result.payload.is_degraded:
2304
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2305

    
2306
  def _RemoveOldStorage(self, node_uuid, iv_names):
2307
    for name, (_, old_lvs, _) in iv_names.iteritems():
2308
      self.lu.LogInfo("Remove logical volumes for %s", name)
2309

    
2310
      for lv in old_lvs:
2311
        self.cfg.SetDiskID(lv, node_uuid)
2312

    
2313
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2314
        if msg:
2315
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2316
                             hint="remove unused LVs manually")
2317

    
2318
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2319
    """Replace a disk on the primary or secondary for DRBD 8.
2320

2321
    The algorithm for replace is quite complicated:
2322

2323
      1. for each disk to be replaced:
2324

2325
        1. create new LVs on the target node with unique names
2326
        1. detach old LVs from the drbd device
2327
        1. rename old LVs to name_replaced.<time_t>
2328
        1. rename new LVs to old LVs
2329
        1. attach the new LVs (with the old names now) to the drbd device
2330

2331
      1. wait for sync across all devices
2332

2333
      1. for each modified disk:
2334

2335
        1. remove old LVs (which have the name name_replaces.<time_t>)
2336

2337
    Failures are not very well handled.
2338

2339
    """
2340
    steps_total = 6
2341

    
2342
    # Step: check device activation
2343
    self.lu.LogStep(1, steps_total, "Check device existence")
2344
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2345
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2346

    
2347
    # Step: check other node consistency
2348
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2349
    self._CheckDisksConsistency(
2350
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2351
      False)
2352

    
2353
    # Step: create new storage
2354
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2355
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2356

    
2357
    # Step: for each lv, detach+rename*2+attach
2358
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2359
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2360
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2361

    
2362
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2363
                                                     old_lvs)
2364
      result.Raise("Can't detach drbd from local storage on node"
2365
                   " %s for device %s" %
2366
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2367
      #dev.children = []
2368
      #cfg.Update(instance)
2369

    
2370
      # ok, we created the new LVs, so now we know we have the needed
2371
      # storage; as such, we proceed on the target node to rename
2372
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2373
      # using the assumption that logical_id == physical_id (which in
2374
      # turn is the unique_id on that node)
2375

    
2376
      # FIXME(iustin): use a better name for the replaced LVs
2377
      temp_suffix = int(time.time())
2378
      ren_fn = lambda d, suff: (d.physical_id[0],
2379
                                d.physical_id[1] + "_replaced-%s" % suff)
2380

    
2381
      # Build the rename list based on what LVs exist on the node
2382
      rename_old_to_new = []
2383
      for to_ren in old_lvs:
2384
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2385
        if not result.fail_msg and result.payload:
2386
          # device exists
2387
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2388

    
2389
      self.lu.LogInfo("Renaming the old LVs on the target node")
2390
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2391
                                             rename_old_to_new)
2392
      result.Raise("Can't rename old LVs on node %s" %
2393
                   self.cfg.GetNodeName(self.target_node_uuid))
2394

    
2395
      # Now we rename the new LVs to the old LVs
2396
      self.lu.LogInfo("Renaming the new LVs on the target node")
2397
      rename_new_to_old = [(new, old.physical_id)
2398
                           for old, new in zip(old_lvs, new_lvs)]
2399
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2400
                                             rename_new_to_old)
2401
      result.Raise("Can't rename new LVs on node %s" %
2402
                   self.cfg.GetNodeName(self.target_node_uuid))
2403

    
2404
      # Intermediate steps of in memory modifications
2405
      for old, new in zip(old_lvs, new_lvs):
2406
        new.logical_id = old.logical_id
2407
        self.cfg.SetDiskID(new, self.target_node_uuid)
2408

    
2409
      # We need to modify old_lvs so that removal later removes the
2410
      # right LVs, not the newly added ones; note that old_lvs is a
2411
      # copy here
2412
      for disk in old_lvs:
2413
        disk.logical_id = ren_fn(disk, temp_suffix)
2414
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2415

    
2416
      # Now that the new lvs have the old name, we can add them to the device
2417
      self.lu.LogInfo("Adding new mirror component on %s",
2418
                      self.cfg.GetNodeName(self.target_node_uuid))
2419
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2420
                                                  (dev, self.instance), new_lvs)
2421
      msg = result.fail_msg
2422
      if msg:
2423
        for new_lv in new_lvs:
2424
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2425
                                               new_lv).fail_msg
2426
          if msg2:
2427
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2428
                               hint=("cleanup manually the unused logical"
2429
                                     "volumes"))
2430
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2431

    
2432
    cstep = itertools.count(5)
2433

    
2434
    if self.early_release:
2435
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2436
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2437
      # TODO: Check if releasing locks early still makes sense
2438
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2439
    else:
2440
      # Release all resource locks except those used by the instance
2441
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2442
                   keep=self.node_secondary_ip.keys())
2443

    
2444
    # Release all node locks while waiting for sync
2445
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2446

    
2447
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2448
    # shutdown in the caller into consideration.
2449

    
2450
    # Wait for sync
2451
    # This can fail as the old devices are degraded and _WaitForSync
2452
    # does a combined result over all disks, so we don't check its return value
2453
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2454
    WaitForSync(self.lu, self.instance)
2455

    
2456
    # Check all devices manually
2457
    self._CheckDevices(self.instance.primary_node, iv_names)
2458

    
2459
    # Step: remove old storage
2460
    if not self.early_release:
2461
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2462
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2463

    
2464
  def _ExecDrbd8Secondary(self, feedback_fn):
2465
    """Replace the secondary node for DRBD 8.
2466

2467
    The algorithm for replace is quite complicated:
2468
      - for all disks of the instance:
2469
        - create new LVs on the new node with same names
2470
        - shutdown the drbd device on the old secondary
2471
        - disconnect the drbd network on the primary
2472
        - create the drbd device on the new secondary
2473
        - network attach the drbd on the primary, using an artifice:
2474
          the drbd code for Attach() will connect to the network if it
2475
          finds a device which is connected to the good local disks but
2476
          not network enabled
2477
      - wait for sync across all devices
2478
      - remove all disks from the old secondary
2479

2480
    Failures are not very well handled.
2481

2482
    """
2483
    steps_total = 6
2484

    
2485
    pnode = self.instance.primary_node
2486

    
2487
    # Step: check device activation
2488
    self.lu.LogStep(1, steps_total, "Check device existence")
2489
    self._CheckDisksExistence([self.instance.primary_node])
2490
    self._CheckVolumeGroup([self.instance.primary_node])
2491

    
2492
    # Step: check other node consistency
2493
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2494
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2495

    
2496
    # Step: create new storage
2497
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2498
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2499
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2500
                                                  self.new_node_uuid)
2501
    for idx, dev in enumerate(disks):
2502
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2503
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2504
      # we pass force_create=True to force LVM creation
2505
      for new_lv in dev.children:
2506
        try:
2507
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2508
                               new_lv, True, GetInstanceInfoText(self.instance),
2509
                               False, excl_stor)
2510
        except errors.DeviceCreationError, e:
2511
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2512

    
2513
    # Step 4: dbrd minors and drbd setups changes
2514
    # after this, we must manually remove the drbd minors on both the
2515
    # error and the success paths
2516
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2517
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2518
                                         for _ in self.instance.disks],
2519
                                        self.instance.uuid)
2520
    logging.debug("Allocated minors %r", minors)
2521

    
2522
    iv_names = {}
2523
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2524
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2525
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2526
      # create new devices on new_node; note that we create two IDs:
2527
      # one without port, so the drbd will be activated without
2528
      # networking information on the new node at this stage, and one
2529
      # with network, for the latter activation in step 4
2530
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2531
      if self.instance.primary_node == o_node1:
2532
        p_minor = o_minor1
2533
      else:
2534
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2535
        p_minor = o_minor2
2536

    
2537
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2538
                      p_minor, new_minor, o_secret)
2539
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2540
                    p_minor, new_minor, o_secret)
2541

    
2542
      iv_names[idx] = (dev, dev.children, new_net_id)
2543
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2544
                    new_net_id)
2545
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2546
                              logical_id=new_alone_id,
2547
                              children=dev.children,
2548
                              size=dev.size,
2549
                              params={})
2550
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2551
                                            self.cfg)
2552
      try:
2553
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2554
                             anno_new_drbd,
2555
                             GetInstanceInfoText(self.instance), False,
2556
                             excl_stor)
2557
      except errors.GenericError:
2558
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2559
        raise
2560

    
2561
    # We have new devices, shutdown the drbd on the old secondary
2562
    for idx, dev in enumerate(self.instance.disks):
2563
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2564
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2565
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2566
                                            (dev, self.instance)).fail_msg
2567
      if msg:
2568
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2569
                           "node: %s" % (idx, msg),
2570
                           hint=("Please cleanup this device manually as"
2571
                                 " soon as possible"))
2572

    
2573
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2574
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2575
                                               self.instance.disks)[pnode]
2576

    
2577
    msg = result.fail_msg
2578
    if msg:
2579
      # detaches didn't succeed (unlikely)
2580
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2581
      raise errors.OpExecError("Can't detach the disks from the network on"
2582
                               " old node: %s" % (msg,))
2583

    
2584
    # if we managed to detach at least one, we update all the disks of
2585
    # the instance to point to the new secondary
2586
    self.lu.LogInfo("Updating instance configuration")
2587
    for dev, _, new_logical_id in iv_names.itervalues():
2588
      dev.logical_id = new_logical_id
2589
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2590

    
2591
    self.cfg.Update(self.instance, feedback_fn)
2592

    
2593
    # Release all node locks (the configuration has been updated)
2594
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2595

    
2596
    # and now perform the drbd attach
2597
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2598
                    " (standalone => connected)")
2599
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2600
                                            self.new_node_uuid],
2601
                                           self.node_secondary_ip,
2602
                                           (self.instance.disks, self.instance),
2603
                                           self.instance.name,
2604
                                           False)
2605
    for to_node, to_result in result.items():
2606
      msg = to_result.fail_msg
2607
      if msg:
2608
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2609
                           self.cfg.GetNodeName(to_node), msg,
2610
                           hint=("please do a gnt-instance info to see the"
2611
                                 " status of disks"))
2612

    
2613
    cstep = itertools.count(5)
2614

    
2615
    if self.early_release:
2616
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2617
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2618
      # TODO: Check if releasing locks early still makes sense
2619
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2620
    else:
2621
      # Release all resource locks except those used by the instance
2622
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2623
                   keep=self.node_secondary_ip.keys())
2624

    
2625
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2626
    # shutdown in the caller into consideration.
2627

    
2628
    # Wait for sync
2629
    # This can fail as the old devices are degraded and _WaitForSync
2630
    # does a combined result over all disks, so we don't check its return value
2631
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2632
    WaitForSync(self.lu, self.instance)
2633

    
2634
    # Check all devices manually
2635
    self._CheckDevices(self.instance.primary_node, iv_names)
2636

    
2637
    # Step: remove old storage
2638
    if not self.early_release:
2639
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2640
      self._RemoveOldStorage(self.target_node_uuid, iv_names)