Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ c615590c

History | View | Annotate | Download (92.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    if constants.IDISK_METAVG in disk:
347
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348
    if constants.IDISK_ADOPT in disk:
349
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350

    
351
    # For extstorage, demand the `provider' option and add any
352
    # additional parameters (ext-params) to the dict
353
    if op.disk_template == constants.DT_EXT:
354
      if ext_provider:
355
        new_disk[constants.IDISK_PROVIDER] = ext_provider
356
        for key in disk:
357
          if key not in constants.IDISK_PARAMS:
358
            new_disk[key] = disk[key]
359
      else:
360
        raise errors.OpPrereqError("Missing provider for template '%s'" %
361
                                   constants.DT_EXT, errors.ECODE_INVAL)
362

    
363
    disks.append(new_disk)
364

    
365
  return disks
366

    
367

    
368
def CheckRADOSFreeSpace():
369
  """Compute disk size requirements inside the RADOS cluster.
370

371
  """
372
  # For the RADOS cluster we assume there is always enough space.
373
  pass
374

    
375

    
376
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377
                         iv_name, p_minor, s_minor):
378
  """Generate a drbd8 device complete with its children.
379

380
  """
381
  assert len(vgnames) == len(names) == 2
382
  port = lu.cfg.AllocatePort()
383
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384

    
385
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386
                          logical_id=(vgnames[0], names[0]),
387
                          params={})
388
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
390
                          size=constants.DRBD_META_SIZE,
391
                          logical_id=(vgnames[1], names[1]),
392
                          params={})
393
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395
                          logical_id=(primary, secondary, port,
396
                                      p_minor, s_minor,
397
                                      shared_secret),
398
                          children=[dev_data, dev_meta],
399
                          iv_name=iv_name, params={})
400
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401
  return drbd_dev
402

    
403

    
404
def GenerateDiskTemplate(
405
  lu, template_name, instance_name, primary_node, secondary_nodes,
406
  disk_info, file_storage_dir, file_driver, base_index,
407
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409
  """Generate the entire disk layout for a given template type.
410

411
  """
412
  vgname = lu.cfg.GetVGName()
413
  disk_count = len(disk_info)
414
  disks = []
415

    
416
  if template_name == constants.DT_DISKLESS:
417
    pass
418
  elif template_name == constants.DT_DRBD8:
419
    if len(secondary_nodes) != 1:
420
      raise errors.ProgrammerError("Wrong template configuration")
421
    remote_node = secondary_nodes[0]
422
    minors = lu.cfg.AllocateDRBDMinor(
423
      [primary_node, remote_node] * len(disk_info), instance_name)
424

    
425
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426
                                                       full_disk_params)
427
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428

    
429
    names = []
430
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431
                                               for i in range(disk_count)]):
432
      names.append(lv_prefix + "_data")
433
      names.append(lv_prefix + "_meta")
434
    for idx, disk in enumerate(disk_info):
435
      disk_index = idx + base_index
436
      data_vg = disk.get(constants.IDISK_VG, vgname)
437
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439
                                      disk[constants.IDISK_SIZE],
440
                                      [data_vg, meta_vg],
441
                                      names[idx * 2:idx * 2 + 2],
442
                                      "disk/%d" % disk_index,
443
                                      minors[idx * 2], minors[idx * 2 + 1])
444
      disk_dev.mode = disk[constants.IDISK_MODE]
445
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
446
      disks.append(disk_dev)
447
  else:
448
    if secondary_nodes:
449
      raise errors.ProgrammerError("Wrong template configuration")
450

    
451
    if template_name == constants.DT_FILE:
452
      _req_file_storage()
453
    elif template_name == constants.DT_SHARED_FILE:
454
      _req_shr_file_storage()
455

    
456
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457
    if name_prefix is None:
458
      names = None
459
    else:
460
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461
                                        (name_prefix, base_index + i)
462
                                        for i in range(disk_count)])
463

    
464
    if template_name == constants.DT_PLAIN:
465

    
466
      def logical_id_fn(idx, _, disk):
467
        vg = disk.get(constants.IDISK_VG, vgname)
468
        return (vg, names[idx])
469

    
470
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471
      logical_id_fn = \
472
        lambda _, disk_index, disk: (file_driver,
473
                                     "%s/disk%d" % (file_storage_dir,
474
                                                    disk_index))
475
    elif template_name == constants.DT_BLOCK:
476
      logical_id_fn = \
477
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478
                                       disk[constants.IDISK_ADOPT])
479
    elif template_name == constants.DT_RBD:
480
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481
    elif template_name == constants.DT_EXT:
482
      def logical_id_fn(idx, _, disk):
483
        provider = disk.get(constants.IDISK_PROVIDER, None)
484
        if provider is None:
485
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486
                                       " not found", constants.DT_EXT,
487
                                       constants.IDISK_PROVIDER)
488
        return (provider, names[idx])
489
    else:
490
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491

    
492
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493

    
494
    for idx, disk in enumerate(disk_info):
495
      params = {}
496
      # Only for the Ext template add disk_info to params
497
      if template_name == constants.DT_EXT:
498
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499
        for key in disk:
500
          if key not in constants.IDISK_PARAMS:
501
            params[key] = disk[key]
502
      disk_index = idx + base_index
503
      size = disk[constants.IDISK_SIZE]
504
      feedback_fn("* disk %s, size %s" %
505
                  (disk_index, utils.FormatUnit(size, "h")))
506
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
507
                              logical_id=logical_id_fn(idx, disk_index, disk),
508
                              iv_name="disk/%d" % disk_index,
509
                              mode=disk[constants.IDISK_MODE],
510
                              params=params)
511
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
512
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513
      disks.append(disk_dev)
514

    
515
  return disks
516

    
517

    
518
class LUInstanceRecreateDisks(LogicalUnit):
519
  """Recreate an instance's missing disks.
520

521
  """
522
  HPATH = "instance-recreate-disks"
523
  HTYPE = constants.HTYPE_INSTANCE
524
  REQ_BGL = False
525

    
526
  _MODIFYABLE = compat.UniqueFrozenset([
527
    constants.IDISK_SIZE,
528
    constants.IDISK_MODE,
529
    constants.IDISK_SPINDLES,
530
    ])
531

    
532
  # New or changed disk parameters may have different semantics
533
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
534
    constants.IDISK_ADOPT,
535

    
536
    # TODO: Implement support changing VG while recreating
537
    constants.IDISK_VG,
538
    constants.IDISK_METAVG,
539
    constants.IDISK_PROVIDER,
540
    constants.IDISK_NAME,
541
    ]))
542

    
543
  def _RunAllocator(self):
544
    """Run the allocator based on input opcode.
545

546
    """
547
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
548

    
549
    # FIXME
550
    # The allocator should actually run in "relocate" mode, but current
551
    # allocators don't support relocating all the nodes of an instance at
552
    # the same time. As a workaround we use "allocate" mode, but this is
553
    # suboptimal for two reasons:
554
    # - The instance name passed to the allocator is present in the list of
555
    #   existing instances, so there could be a conflict within the
556
    #   internal structures of the allocator. This doesn't happen with the
557
    #   current allocators, but it's a liability.
558
    # - The allocator counts the resources used by the instance twice: once
559
    #   because the instance exists already, and once because it tries to
560
    #   allocate a new instance.
561
    # The allocator could choose some of the nodes on which the instance is
562
    # running, but that's not a problem. If the instance nodes are broken,
563
    # they should be already be marked as drained or offline, and hence
564
    # skipped by the allocator. If instance disks have been lost for other
565
    # reasons, then recreating the disks on the same nodes should be fine.
566
    disk_template = self.instance.disk_template
567
    spindle_use = be_full[constants.BE_SPINDLE_USE]
568
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
569
                                        disk_template=disk_template,
570
                                        tags=list(self.instance.GetTags()),
571
                                        os=self.instance.os,
572
                                        nics=[{}],
573
                                        vcpus=be_full[constants.BE_VCPUS],
574
                                        memory=be_full[constants.BE_MAXMEM],
575
                                        spindle_use=spindle_use,
576
                                        disks=[{constants.IDISK_SIZE: d.size,
577
                                                constants.IDISK_MODE: d.mode}
578
                                               for d in self.instance.disks],
579
                                        hypervisor=self.instance.hypervisor,
580
                                        node_whitelist=None)
581
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
582

    
583
    ial.Run(self.op.iallocator)
584

    
585
    assert req.RequiredNodes() == len(self.instance.all_nodes)
586

    
587
    if not ial.success:
588
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
589
                                 " %s" % (self.op.iallocator, ial.info),
590
                                 errors.ECODE_NORES)
591

    
592
    self.op.nodes = ial.result
593
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
594
                 self.op.instance_name, self.op.iallocator,
595
                 utils.CommaJoin(ial.result))
596

    
597
  def CheckArguments(self):
598
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
599
      # Normalize and convert deprecated list of disk indices
600
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
601

    
602
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
603
    if duplicates:
604
      raise errors.OpPrereqError("Some disks have been specified more than"
605
                                 " once: %s" % utils.CommaJoin(duplicates),
606
                                 errors.ECODE_INVAL)
607

    
608
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
609
    # when neither iallocator nor nodes are specified
610
    if self.op.iallocator or self.op.nodes:
611
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
612

    
613
    for (idx, params) in self.op.disks:
614
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
615
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
616
      if unsupported:
617
        raise errors.OpPrereqError("Parameters for disk %s try to change"
618
                                   " unmodifyable parameter(s): %s" %
619
                                   (idx, utils.CommaJoin(unsupported)),
620
                                   errors.ECODE_INVAL)
621

    
622
  def ExpandNames(self):
623
    self._ExpandAndLockInstance()
624
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
625

    
626
    if self.op.nodes:
627
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
628
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
629
    else:
630
      self.needed_locks[locking.LEVEL_NODE] = []
631
      if self.op.iallocator:
632
        # iallocator will select a new node in the same group
633
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
634
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
635

    
636
    self.needed_locks[locking.LEVEL_NODE_RES] = []
637

    
638
  def DeclareLocks(self, level):
639
    if level == locking.LEVEL_NODEGROUP:
640
      assert self.op.iallocator is not None
641
      assert not self.op.nodes
642
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
643
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
644
      # Lock the primary group used by the instance optimistically; this
645
      # requires going via the node before it's locked, requiring
646
      # verification later on
647
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
648
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
649

    
650
    elif level == locking.LEVEL_NODE:
651
      # If an allocator is used, then we lock all the nodes in the current
652
      # instance group, as we don't know yet which ones will be selected;
653
      # if we replace the nodes without using an allocator, locks are
654
      # already declared in ExpandNames; otherwise, we need to lock all the
655
      # instance nodes for disk re-creation
656
      if self.op.iallocator:
657
        assert not self.op.nodes
658
        assert not self.needed_locks[locking.LEVEL_NODE]
659
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
660

    
661
        # Lock member nodes of the group of the primary node
662
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
663
          self.needed_locks[locking.LEVEL_NODE].extend(
664
            self.cfg.GetNodeGroup(group_uuid).members)
665

    
666
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
667
      elif not self.op.nodes:
668
        self._LockInstancesNodes(primary_only=False)
669
    elif level == locking.LEVEL_NODE_RES:
670
      # Copy node locks
671
      self.needed_locks[locking.LEVEL_NODE_RES] = \
672
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
673

    
674
  def BuildHooksEnv(self):
675
    """Build hooks env.
676

677
    This runs on master, primary and secondary nodes of the instance.
678

679
    """
680
    return BuildInstanceHookEnvByObject(self, self.instance)
681

    
682
  def BuildHooksNodes(self):
683
    """Build hooks nodes.
684

685
    """
686
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
687
    return (nl, nl)
688

    
689
  def CheckPrereq(self):
690
    """Check prerequisites.
691

692
    This checks that the instance is in the cluster and is not running.
693

694
    """
695
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
696
    assert instance is not None, \
697
      "Cannot retrieve locked instance %s" % self.op.instance_name
698
    if self.op.nodes:
699
      if len(self.op.nodes) != len(instance.all_nodes):
700
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
701
                                   " %d replacement nodes were specified" %
702
                                   (instance.name, len(instance.all_nodes),
703
                                    len(self.op.nodes)),
704
                                   errors.ECODE_INVAL)
705
      assert instance.disk_template != constants.DT_DRBD8 or \
706
             len(self.op.nodes) == 2
707
      assert instance.disk_template != constants.DT_PLAIN or \
708
             len(self.op.nodes) == 1
709
      primary_node = self.op.nodes[0]
710
    else:
711
      primary_node = instance.primary_node
712
    if not self.op.iallocator:
713
      CheckNodeOnline(self, primary_node)
714

    
715
    if instance.disk_template == constants.DT_DISKLESS:
716
      raise errors.OpPrereqError("Instance '%s' has no disks" %
717
                                 self.op.instance_name, errors.ECODE_INVAL)
718

    
719
    # Verify if node group locks are still correct
720
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
721
    if owned_groups:
722
      # Node group locks are acquired only for the primary node (and only
723
      # when the allocator is used)
724
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
725
                              primary_only=True)
726

    
727
    # if we replace nodes *and* the old primary is offline, we don't
728
    # check the instance state
729
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
730
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
731
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
732
                         msg="cannot recreate disks")
733

    
734
    if self.op.disks:
735
      self.disks = dict(self.op.disks)
736
    else:
737
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
738

    
739
    maxidx = max(self.disks.keys())
740
    if maxidx >= len(instance.disks):
741
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
742
                                 errors.ECODE_INVAL)
743

    
744
    if ((self.op.nodes or self.op.iallocator) and
745
         sorted(self.disks.keys()) != range(len(instance.disks))):
746
      raise errors.OpPrereqError("Can't recreate disks partially and"
747
                                 " change the nodes at the same time",
748
                                 errors.ECODE_INVAL)
749

    
750
    self.instance = instance
751

    
752
    if self.op.iallocator:
753
      self._RunAllocator()
754
      # Release unneeded node and node resource locks
755
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
756
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
757
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
758

    
759
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
760

    
761
  def Exec(self, feedback_fn):
762
    """Recreate the disks.
763

764
    """
765
    instance = self.instance
766

    
767
    assert (self.owned_locks(locking.LEVEL_NODE) ==
768
            self.owned_locks(locking.LEVEL_NODE_RES))
769

    
770
    to_skip = []
771
    mods = [] # keeps track of needed changes
772

    
773
    for idx, disk in enumerate(instance.disks):
774
      try:
775
        changes = self.disks[idx]
776
      except KeyError:
777
        # Disk should not be recreated
778
        to_skip.append(idx)
779
        continue
780

    
781
      # update secondaries for disks, if needed
782
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
783
        # need to update the nodes and minors
784
        assert len(self.op.nodes) == 2
785
        assert len(disk.logical_id) == 6 # otherwise disk internals
786
                                         # have changed
787
        (_, _, old_port, _, _, old_secret) = disk.logical_id
788
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
789
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
790
                  new_minors[0], new_minors[1], old_secret)
791
        assert len(disk.logical_id) == len(new_id)
792
      else:
793
        new_id = None
794

    
795
      mods.append((idx, new_id, changes))
796

    
797
    # now that we have passed all asserts above, we can apply the mods
798
    # in a single run (to avoid partial changes)
799
    for idx, new_id, changes in mods:
800
      disk = instance.disks[idx]
801
      if new_id is not None:
802
        assert disk.dev_type == constants.LD_DRBD8
803
        disk.logical_id = new_id
804
      if changes:
805
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
806
                    mode=changes.get(constants.IDISK_MODE, None))
807

    
808
    # change primary node, if needed
809
    if self.op.nodes:
810
      instance.primary_node = self.op.nodes[0]
811
      self.LogWarning("Changing the instance's nodes, you will have to"
812
                      " remove any disks left on the older nodes manually")
813

    
814
    if self.op.nodes:
815
      self.cfg.Update(instance, feedback_fn)
816

    
817
    # All touched nodes must be locked
818
    mylocks = self.owned_locks(locking.LEVEL_NODE)
819
    assert mylocks.issuperset(frozenset(instance.all_nodes))
820
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
821

    
822
    # TODO: Release node locks before wiping, or explain why it's not possible
823
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
824
      wipedisks = [(idx, disk, 0)
825
                   for (idx, disk) in enumerate(instance.disks)
826
                   if idx not in to_skip]
827
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
828

    
829

    
830
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
831
  """Checks if nodes have enough free disk space in the specified VG.
832

833
  This function checks if all given nodes have the needed amount of
834
  free disk. In case any node has less disk or we cannot get the
835
  information from the node, this function raises an OpPrereqError
836
  exception.
837

838
  @type lu: C{LogicalUnit}
839
  @param lu: a logical unit from which we get configuration data
840
  @type nodenames: C{list}
841
  @param nodenames: the list of node names to check
842
  @type vg: C{str}
843
  @param vg: the volume group to check
844
  @type requested: C{int}
845
  @param requested: the amount of disk in MiB to check for
846
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
847
      or we cannot check the node
848

849
  """
850
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
851
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
852
  # the current functionality. Refactor to make it more flexible.
853
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
854
                                   es_flags)
855
  for node in nodenames:
856
    info = nodeinfo[node]
857
    info.Raise("Cannot get current information from node %s" % node,
858
               prereq=True, ecode=errors.ECODE_ENVIRON)
859
    (_, (vg_info, ), _) = info.payload
860
    vg_free = vg_info.get("vg_free", None)
861
    if not isinstance(vg_free, int):
862
      raise errors.OpPrereqError("Can't compute free disk space on node"
863
                                 " %s for vg %s, result was '%s'" %
864
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
865
    if requested > vg_free:
866
      raise errors.OpPrereqError("Not enough disk space on target node %s"
867
                                 " vg %s: required %d MiB, available %d MiB" %
868
                                 (node, vg, requested, vg_free),
869
                                 errors.ECODE_NORES)
870

    
871

    
872
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
873
  """Checks if nodes have enough free disk space in all the VGs.
874

875
  This function checks if all given nodes have the needed amount of
876
  free disk. In case any node has less disk or we cannot get the
877
  information from the node, this function raises an OpPrereqError
878
  exception.
879

880
  @type lu: C{LogicalUnit}
881
  @param lu: a logical unit from which we get configuration data
882
  @type nodenames: C{list}
883
  @param nodenames: the list of node names to check
884
  @type req_sizes: C{dict}
885
  @param req_sizes: the hash of vg and corresponding amount of disk in
886
      MiB to check for
887
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
888
      or we cannot check the node
889

890
  """
891
  for vg, req_size in req_sizes.items():
892
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
893

    
894

    
895
def _DiskSizeInBytesToMebibytes(lu, size):
896
  """Converts a disk size in bytes to mebibytes.
897

898
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
899

900
  """
901
  (mib, remainder) = divmod(size, 1024 * 1024)
902

    
903
  if remainder != 0:
904
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
905
                  " to not overwrite existing data (%s bytes will not be"
906
                  " wiped)", (1024 * 1024) - remainder)
907
    mib += 1
908

    
909
  return mib
910

    
911

    
912
def _CalcEta(time_taken, written, total_size):
913
  """Calculates the ETA based on size written and total size.
914

915
  @param time_taken: The time taken so far
916
  @param written: amount written so far
917
  @param total_size: The total size of data to be written
918
  @return: The remaining time in seconds
919

920
  """
921
  avg_time = time_taken / float(written)
922
  return (total_size - written) * avg_time
923

    
924

    
925
def WipeDisks(lu, instance, disks=None):
926
  """Wipes instance disks.
927

928
  @type lu: L{LogicalUnit}
929
  @param lu: the logical unit on whose behalf we execute
930
  @type instance: L{objects.Instance}
931
  @param instance: the instance whose disks we should create
932
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
933
  @param disks: Disk details; tuple contains disk index, disk object and the
934
    start offset
935

936
  """
937
  node = instance.primary_node
938

    
939
  if disks is None:
940
    disks = [(idx, disk, 0)
941
             for (idx, disk) in enumerate(instance.disks)]
942

    
943
  for (_, device, _) in disks:
944
    lu.cfg.SetDiskID(device, node)
945

    
946
  logging.info("Pausing synchronization of disks of instance '%s'",
947
               instance.name)
948
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
949
                                                  (map(compat.snd, disks),
950
                                                   instance),
951
                                                  True)
952
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
953

    
954
  for idx, success in enumerate(result.payload):
955
    if not success:
956
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
957
                   " failed", idx, instance.name)
958

    
959
  try:
960
    for (idx, device, offset) in disks:
961
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
962
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
963
      wipe_chunk_size = \
964
        int(min(constants.MAX_WIPE_CHUNK,
965
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
966

    
967
      size = device.size
968
      last_output = 0
969
      start_time = time.time()
970

    
971
      if offset == 0:
972
        info_text = ""
973
      else:
974
        info_text = (" (from %s to %s)" %
975
                     (utils.FormatUnit(offset, "h"),
976
                      utils.FormatUnit(size, "h")))
977

    
978
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
979

    
980
      logging.info("Wiping disk %d for instance %s on node %s using"
981
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
982

    
983
      while offset < size:
984
        wipe_size = min(wipe_chunk_size, size - offset)
985

    
986
        logging.debug("Wiping disk %d, offset %s, chunk %s",
987
                      idx, offset, wipe_size)
988

    
989
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
990
                                           wipe_size)
991
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
992
                     (idx, offset, wipe_size))
993

    
994
        now = time.time()
995
        offset += wipe_size
996
        if now - last_output >= 60:
997
          eta = _CalcEta(now - start_time, offset, size)
998
          lu.LogInfo(" - done: %.1f%% ETA: %s",
999
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1000
          last_output = now
1001
  finally:
1002
    logging.info("Resuming synchronization of disks for instance '%s'",
1003
                 instance.name)
1004

    
1005
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1006
                                                    (map(compat.snd, disks),
1007
                                                     instance),
1008
                                                    False)
1009

    
1010
    if result.fail_msg:
1011
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1012
                    node, result.fail_msg)
1013
    else:
1014
      for idx, success in enumerate(result.payload):
1015
        if not success:
1016
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1017
                        " failed", idx, instance.name)
1018

    
1019

    
1020
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1021
  """Wrapper for L{WipeDisks} that handles errors.
1022

1023
  @type lu: L{LogicalUnit}
1024
  @param lu: the logical unit on whose behalf we execute
1025
  @type instance: L{objects.Instance}
1026
  @param instance: the instance whose disks we should wipe
1027
  @param disks: see L{WipeDisks}
1028
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1029
      case of error
1030
  @raise errors.OpPrereqError: in case of failure
1031

1032
  """
1033
  try:
1034
    WipeDisks(lu, instance, disks=disks)
1035
  except errors.OpExecError:
1036
    logging.warning("Wiping disks for instance '%s' failed",
1037
                    instance.name)
1038
    _UndoCreateDisks(lu, cleanup)
1039
    raise
1040

    
1041

    
1042
def ExpandCheckDisks(instance, disks):
1043
  """Return the instance disks selected by the disks list
1044

1045
  @type disks: list of L{objects.Disk} or None
1046
  @param disks: selected disks
1047
  @rtype: list of L{objects.Disk}
1048
  @return: selected instance disks to act on
1049

1050
  """
1051
  if disks is None:
1052
    return instance.disks
1053
  else:
1054
    if not set(disks).issubset(instance.disks):
1055
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1056
                                   " target instance")
1057
    return disks
1058

    
1059

    
1060
def WaitForSync(lu, instance, disks=None, oneshot=False):
1061
  """Sleep and poll for an instance's disk to sync.
1062

1063
  """
1064
  if not instance.disks or disks is not None and not disks:
1065
    return True
1066

    
1067
  disks = ExpandCheckDisks(instance, disks)
1068

    
1069
  if not oneshot:
1070
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1071

    
1072
  node = instance.primary_node
1073

    
1074
  for dev in disks:
1075
    lu.cfg.SetDiskID(dev, node)
1076

    
1077
  # TODO: Convert to utils.Retry
1078

    
1079
  retries = 0
1080
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1081
  while True:
1082
    max_time = 0
1083
    done = True
1084
    cumul_degraded = False
1085
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1086
    msg = rstats.fail_msg
1087
    if msg:
1088
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1089
      retries += 1
1090
      if retries >= 10:
1091
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1092
                                 " aborting." % node)
1093
      time.sleep(6)
1094
      continue
1095
    rstats = rstats.payload
1096
    retries = 0
1097
    for i, mstat in enumerate(rstats):
1098
      if mstat is None:
1099
        lu.LogWarning("Can't compute data for node %s/%s",
1100
                      node, disks[i].iv_name)
1101
        continue
1102

    
1103
      cumul_degraded = (cumul_degraded or
1104
                        (mstat.is_degraded and mstat.sync_percent is None))
1105
      if mstat.sync_percent is not None:
1106
        done = False
1107
        if mstat.estimated_time is not None:
1108
          rem_time = ("%s remaining (estimated)" %
1109
                      utils.FormatSeconds(mstat.estimated_time))
1110
          max_time = mstat.estimated_time
1111
        else:
1112
          rem_time = "no time estimate"
1113
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1114
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1115

    
1116
    # if we're done but degraded, let's do a few small retries, to
1117
    # make sure we see a stable and not transient situation; therefore
1118
    # we force restart of the loop
1119
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1120
      logging.info("Degraded disks found, %d retries left", degr_retries)
1121
      degr_retries -= 1
1122
      time.sleep(1)
1123
      continue
1124

    
1125
    if done or oneshot:
1126
      break
1127

    
1128
    time.sleep(min(60, max_time))
1129

    
1130
  if done:
1131
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1132

    
1133
  return not cumul_degraded
1134

    
1135

    
1136
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1137
  """Shutdown block devices of an instance.
1138

1139
  This does the shutdown on all nodes of the instance.
1140

1141
  If the ignore_primary is false, errors on the primary node are
1142
  ignored.
1143

1144
  """
1145
  all_result = True
1146
  disks = ExpandCheckDisks(instance, disks)
1147

    
1148
  for disk in disks:
1149
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1150
      lu.cfg.SetDiskID(top_disk, node)
1151
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1152
      msg = result.fail_msg
1153
      if msg:
1154
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1155
                      disk.iv_name, node, msg)
1156
        if ((node == instance.primary_node and not ignore_primary) or
1157
            (node != instance.primary_node and not result.offline)):
1158
          all_result = False
1159
  return all_result
1160

    
1161

    
1162
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1163
  """Shutdown block devices of an instance.
1164

1165
  This function checks if an instance is running, before calling
1166
  _ShutdownInstanceDisks.
1167

1168
  """
1169
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1170
  ShutdownInstanceDisks(lu, instance, disks=disks)
1171

    
1172

    
1173
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1174
                           ignore_size=False):
1175
  """Prepare the block devices for an instance.
1176

1177
  This sets up the block devices on all nodes.
1178

1179
  @type lu: L{LogicalUnit}
1180
  @param lu: the logical unit on whose behalf we execute
1181
  @type instance: L{objects.Instance}
1182
  @param instance: the instance for whose disks we assemble
1183
  @type disks: list of L{objects.Disk} or None
1184
  @param disks: which disks to assemble (or all, if None)
1185
  @type ignore_secondaries: boolean
1186
  @param ignore_secondaries: if true, errors on secondary nodes
1187
      won't result in an error return from the function
1188
  @type ignore_size: boolean
1189
  @param ignore_size: if true, the current known size of the disk
1190
      will not be used during the disk activation, useful for cases
1191
      when the size is wrong
1192
  @return: False if the operation failed, otherwise a list of
1193
      (host, instance_visible_name, node_visible_name)
1194
      with the mapping from node devices to instance devices
1195

1196
  """
1197
  device_info = []
1198
  disks_ok = True
1199
  iname = instance.name
1200
  disks = ExpandCheckDisks(instance, disks)
1201

    
1202
  # With the two passes mechanism we try to reduce the window of
1203
  # opportunity for the race condition of switching DRBD to primary
1204
  # before handshaking occured, but we do not eliminate it
1205

    
1206
  # The proper fix would be to wait (with some limits) until the
1207
  # connection has been made and drbd transitions from WFConnection
1208
  # into any other network-connected state (Connected, SyncTarget,
1209
  # SyncSource, etc.)
1210

    
1211
  # 1st pass, assemble on all nodes in secondary mode
1212
  for idx, inst_disk in enumerate(disks):
1213
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1214
      if ignore_size:
1215
        node_disk = node_disk.Copy()
1216
        node_disk.UnsetSize()
1217
      lu.cfg.SetDiskID(node_disk, node)
1218
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1219
                                             False, idx)
1220
      msg = result.fail_msg
1221
      if msg:
1222
        is_offline_secondary = (node in instance.secondary_nodes and
1223
                                result.offline)
1224
        lu.LogWarning("Could not prepare block device %s on node %s"
1225
                      " (is_primary=False, pass=1): %s",
1226
                      inst_disk.iv_name, node, msg)
1227
        if not (ignore_secondaries or is_offline_secondary):
1228
          disks_ok = False
1229

    
1230
  # FIXME: race condition on drbd migration to primary
1231

    
1232
  # 2nd pass, do only the primary node
1233
  for idx, inst_disk in enumerate(disks):
1234
    dev_path = None
1235

    
1236
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1237
      if node != instance.primary_node:
1238
        continue
1239
      if ignore_size:
1240
        node_disk = node_disk.Copy()
1241
        node_disk.UnsetSize()
1242
      lu.cfg.SetDiskID(node_disk, node)
1243
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1244
                                             True, idx)
1245
      msg = result.fail_msg
1246
      if msg:
1247
        lu.LogWarning("Could not prepare block device %s on node %s"
1248
                      " (is_primary=True, pass=2): %s",
1249
                      inst_disk.iv_name, node, msg)
1250
        disks_ok = False
1251
      else:
1252
        dev_path = result.payload
1253

    
1254
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1255

    
1256
  # leave the disks configured for the primary node
1257
  # this is a workaround that would be fixed better by
1258
  # improving the logical/physical id handling
1259
  for disk in disks:
1260
    lu.cfg.SetDiskID(disk, instance.primary_node)
1261

    
1262
  return disks_ok, device_info
1263

    
1264

    
1265
def StartInstanceDisks(lu, instance, force):
1266
  """Start the disks of an instance.
1267

1268
  """
1269
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1270
                                      ignore_secondaries=force)
1271
  if not disks_ok:
1272
    ShutdownInstanceDisks(lu, instance)
1273
    if force is not None and not force:
1274
      lu.LogWarning("",
1275
                    hint=("If the message above refers to a secondary node,"
1276
                          " you can retry the operation using '--force'"))
1277
    raise errors.OpExecError("Disk consistency error")
1278

    
1279

    
1280
class LUInstanceGrowDisk(LogicalUnit):
1281
  """Grow a disk of an instance.
1282

1283
  """
1284
  HPATH = "disk-grow"
1285
  HTYPE = constants.HTYPE_INSTANCE
1286
  REQ_BGL = False
1287

    
1288
  def ExpandNames(self):
1289
    self._ExpandAndLockInstance()
1290
    self.needed_locks[locking.LEVEL_NODE] = []
1291
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1292
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1293
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1294

    
1295
  def DeclareLocks(self, level):
1296
    if level == locking.LEVEL_NODE:
1297
      self._LockInstancesNodes()
1298
    elif level == locking.LEVEL_NODE_RES:
1299
      # Copy node locks
1300
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1301
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1302

    
1303
  def BuildHooksEnv(self):
1304
    """Build hooks env.
1305

1306
    This runs on the master, the primary and all the secondaries.
1307

1308
    """
1309
    env = {
1310
      "DISK": self.op.disk,
1311
      "AMOUNT": self.op.amount,
1312
      "ABSOLUTE": self.op.absolute,
1313
      }
1314
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1315
    return env
1316

    
1317
  def BuildHooksNodes(self):
1318
    """Build hooks nodes.
1319

1320
    """
1321
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1322
    return (nl, nl)
1323

    
1324
  def CheckPrereq(self):
1325
    """Check prerequisites.
1326

1327
    This checks that the instance is in the cluster.
1328

1329
    """
1330
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1331
    assert instance is not None, \
1332
      "Cannot retrieve locked instance %s" % self.op.instance_name
1333
    nodenames = list(instance.all_nodes)
1334
    for node in nodenames:
1335
      CheckNodeOnline(self, node)
1336

    
1337
    self.instance = instance
1338

    
1339
    if instance.disk_template not in constants.DTS_GROWABLE:
1340
      raise errors.OpPrereqError("Instance's disk layout does not support"
1341
                                 " growing", errors.ECODE_INVAL)
1342

    
1343
    self.disk = instance.FindDisk(self.op.disk)
1344

    
1345
    if self.op.absolute:
1346
      self.target = self.op.amount
1347
      self.delta = self.target - self.disk.size
1348
      if self.delta < 0:
1349
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1350
                                   "current disk size (%s)" %
1351
                                   (utils.FormatUnit(self.target, "h"),
1352
                                    utils.FormatUnit(self.disk.size, "h")),
1353
                                   errors.ECODE_STATE)
1354
    else:
1355
      self.delta = self.op.amount
1356
      self.target = self.disk.size + self.delta
1357
      if self.delta < 0:
1358
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1359
                                   utils.FormatUnit(self.delta, "h"),
1360
                                   errors.ECODE_INVAL)
1361

    
1362
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1363

    
1364
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1365
    template = self.instance.disk_template
1366
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1367
      # TODO: check the free disk space for file, when that feature will be
1368
      # supported
1369
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1370
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1371
                        nodes)
1372
      if es_nodes:
1373
        # With exclusive storage we need to something smarter than just looking
1374
        # at free space; for now, let's simply abort the operation.
1375
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1376
                                   " is enabled", errors.ECODE_STATE)
1377
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1378

    
1379
  def Exec(self, feedback_fn):
1380
    """Execute disk grow.
1381

1382
    """
1383
    instance = self.instance
1384
    disk = self.disk
1385

    
1386
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1387
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1388
            self.owned_locks(locking.LEVEL_NODE_RES))
1389

    
1390
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1391

    
1392
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1393
    if not disks_ok:
1394
      raise errors.OpExecError("Cannot activate block device to grow")
1395

    
1396
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1397
                (self.op.disk, instance.name,
1398
                 utils.FormatUnit(self.delta, "h"),
1399
                 utils.FormatUnit(self.target, "h")))
1400

    
1401
    # First run all grow ops in dry-run mode
1402
    for node in instance.all_nodes:
1403
      self.cfg.SetDiskID(disk, node)
1404
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1405
                                           True, True)
1406
      result.Raise("Dry-run grow request failed to node %s" % node)
1407

    
1408
    if wipe_disks:
1409
      # Get disk size from primary node for wiping
1410
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1411
      result.Raise("Failed to retrieve disk size from node '%s'" %
1412
                   instance.primary_node)
1413

    
1414
      (disk_size_in_bytes, ) = result.payload
1415

    
1416
      if disk_size_in_bytes is None:
1417
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1418
                                 " node '%s'" % instance.primary_node)
1419

    
1420
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1421

    
1422
      assert old_disk_size >= disk.size, \
1423
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1424
         (old_disk_size, disk.size))
1425
    else:
1426
      old_disk_size = None
1427

    
1428
    # We know that (as far as we can test) operations across different
1429
    # nodes will succeed, time to run it for real on the backing storage
1430
    for node in instance.all_nodes:
1431
      self.cfg.SetDiskID(disk, node)
1432
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1433
                                           False, True)
1434
      result.Raise("Grow request failed to node %s" % node)
1435

    
1436
    # And now execute it for logical storage, on the primary node
1437
    node = instance.primary_node
1438
    self.cfg.SetDiskID(disk, node)
1439
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1440
                                         False, False)
1441
    result.Raise("Grow request failed to node %s" % node)
1442

    
1443
    disk.RecordGrow(self.delta)
1444
    self.cfg.Update(instance, feedback_fn)
1445

    
1446
    # Changes have been recorded, release node lock
1447
    ReleaseLocks(self, locking.LEVEL_NODE)
1448

    
1449
    # Downgrade lock while waiting for sync
1450
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1451

    
1452
    assert wipe_disks ^ (old_disk_size is None)
1453

    
1454
    if wipe_disks:
1455
      assert instance.disks[self.op.disk] == disk
1456

    
1457
      # Wipe newly added disk space
1458
      WipeDisks(self, instance,
1459
                disks=[(self.op.disk, disk, old_disk_size)])
1460

    
1461
    if self.op.wait_for_sync:
1462
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1463
      if disk_abort:
1464
        self.LogWarning("Disk syncing has not returned a good status; check"
1465
                        " the instance")
1466
      if instance.admin_state != constants.ADMINST_UP:
1467
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1468
    elif instance.admin_state != constants.ADMINST_UP:
1469
      self.LogWarning("Not shutting down the disk even if the instance is"
1470
                      " not supposed to be running because no wait for"
1471
                      " sync mode was requested")
1472

    
1473
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1474
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1475

    
1476

    
1477
class LUInstanceReplaceDisks(LogicalUnit):
1478
  """Replace the disks of an instance.
1479

1480
  """
1481
  HPATH = "mirrors-replace"
1482
  HTYPE = constants.HTYPE_INSTANCE
1483
  REQ_BGL = False
1484

    
1485
  def CheckArguments(self):
1486
    """Check arguments.
1487

1488
    """
1489
    remote_node = self.op.remote_node
1490
    ialloc = self.op.iallocator
1491
    if self.op.mode == constants.REPLACE_DISK_CHG:
1492
      if remote_node is None and ialloc is None:
1493
        raise errors.OpPrereqError("When changing the secondary either an"
1494
                                   " iallocator script must be used or the"
1495
                                   " new node given", errors.ECODE_INVAL)
1496
      else:
1497
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1498

    
1499
    elif remote_node is not None or ialloc is not None:
1500
      # Not replacing the secondary
1501
      raise errors.OpPrereqError("The iallocator and new node options can"
1502
                                 " only be used when changing the"
1503
                                 " secondary node", errors.ECODE_INVAL)
1504

    
1505
  def ExpandNames(self):
1506
    self._ExpandAndLockInstance()
1507

    
1508
    assert locking.LEVEL_NODE not in self.needed_locks
1509
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1510
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1511

    
1512
    assert self.op.iallocator is None or self.op.remote_node is None, \
1513
      "Conflicting options"
1514

    
1515
    if self.op.remote_node is not None:
1516
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1517

    
1518
      # Warning: do not remove the locking of the new secondary here
1519
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1520
      # currently it doesn't since parallel invocations of
1521
      # FindUnusedMinor will conflict
1522
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1523
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1524
    else:
1525
      self.needed_locks[locking.LEVEL_NODE] = []
1526
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1527

    
1528
      if self.op.iallocator is not None:
1529
        # iallocator will select a new node in the same group
1530
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1531
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1532

    
1533
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1534

    
1535
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1536
                                   self.op.iallocator, self.op.remote_node,
1537
                                   self.op.disks, self.op.early_release,
1538
                                   self.op.ignore_ipolicy)
1539

    
1540
    self.tasklets = [self.replacer]
1541

    
1542
  def DeclareLocks(self, level):
1543
    if level == locking.LEVEL_NODEGROUP:
1544
      assert self.op.remote_node is None
1545
      assert self.op.iallocator is not None
1546
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1547

    
1548
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1549
      # Lock all groups used by instance optimistically; this requires going
1550
      # via the node before it's locked, requiring verification later on
1551
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1552
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1553

    
1554
    elif level == locking.LEVEL_NODE:
1555
      if self.op.iallocator is not None:
1556
        assert self.op.remote_node is None
1557
        assert not self.needed_locks[locking.LEVEL_NODE]
1558
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1559

    
1560
        # Lock member nodes of all locked groups
1561
        self.needed_locks[locking.LEVEL_NODE] = \
1562
          [node_name
1563
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1564
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1565
      else:
1566
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1567

    
1568
        self._LockInstancesNodes()
1569

    
1570
    elif level == locking.LEVEL_NODE_RES:
1571
      # Reuse node locks
1572
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1573
        self.needed_locks[locking.LEVEL_NODE]
1574

    
1575
  def BuildHooksEnv(self):
1576
    """Build hooks env.
1577

1578
    This runs on the master, the primary and all the secondaries.
1579

1580
    """
1581
    instance = self.replacer.instance
1582
    env = {
1583
      "MODE": self.op.mode,
1584
      "NEW_SECONDARY": self.op.remote_node,
1585
      "OLD_SECONDARY": instance.secondary_nodes[0],
1586
      }
1587
    env.update(BuildInstanceHookEnvByObject(self, instance))
1588
    return env
1589

    
1590
  def BuildHooksNodes(self):
1591
    """Build hooks nodes.
1592

1593
    """
1594
    instance = self.replacer.instance
1595
    nl = [
1596
      self.cfg.GetMasterNode(),
1597
      instance.primary_node,
1598
      ]
1599
    if self.op.remote_node is not None:
1600
      nl.append(self.op.remote_node)
1601
    return nl, nl
1602

    
1603
  def CheckPrereq(self):
1604
    """Check prerequisites.
1605

1606
    """
1607
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1608
            self.op.iallocator is None)
1609

    
1610
    # Verify if node group locks are still correct
1611
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1612
    if owned_groups:
1613
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1614

    
1615
    return LogicalUnit.CheckPrereq(self)
1616

    
1617

    
1618
class LUInstanceActivateDisks(NoHooksLU):
1619
  """Bring up an instance's disks.
1620

1621
  """
1622
  REQ_BGL = False
1623

    
1624
  def ExpandNames(self):
1625
    self._ExpandAndLockInstance()
1626
    self.needed_locks[locking.LEVEL_NODE] = []
1627
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1628

    
1629
  def DeclareLocks(self, level):
1630
    if level == locking.LEVEL_NODE:
1631
      self._LockInstancesNodes()
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    This checks that the instance is in the cluster.
1637

1638
    """
1639
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1640
    assert self.instance is not None, \
1641
      "Cannot retrieve locked instance %s" % self.op.instance_name
1642
    CheckNodeOnline(self, self.instance.primary_node)
1643

    
1644
  def Exec(self, feedback_fn):
1645
    """Activate the disks.
1646

1647
    """
1648
    disks_ok, disks_info = \
1649
              AssembleInstanceDisks(self, self.instance,
1650
                                    ignore_size=self.op.ignore_size)
1651
    if not disks_ok:
1652
      raise errors.OpExecError("Cannot activate block devices")
1653

    
1654
    if self.op.wait_for_sync:
1655
      if not WaitForSync(self, self.instance):
1656
        raise errors.OpExecError("Some disks of the instance are degraded!")
1657

    
1658
    return disks_info
1659

    
1660

    
1661
class LUInstanceDeactivateDisks(NoHooksLU):
1662
  """Shutdown an instance's disks.
1663

1664
  """
1665
  REQ_BGL = False
1666

    
1667
  def ExpandNames(self):
1668
    self._ExpandAndLockInstance()
1669
    self.needed_locks[locking.LEVEL_NODE] = []
1670
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1671

    
1672
  def DeclareLocks(self, level):
1673
    if level == locking.LEVEL_NODE:
1674
      self._LockInstancesNodes()
1675

    
1676
  def CheckPrereq(self):
1677
    """Check prerequisites.
1678

1679
    This checks that the instance is in the cluster.
1680

1681
    """
1682
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1683
    assert self.instance is not None, \
1684
      "Cannot retrieve locked instance %s" % self.op.instance_name
1685

    
1686
  def Exec(self, feedback_fn):
1687
    """Deactivate the disks
1688

1689
    """
1690
    instance = self.instance
1691
    if self.op.force:
1692
      ShutdownInstanceDisks(self, instance)
1693
    else:
1694
      _SafeShutdownInstanceDisks(self, instance)
1695

    
1696

    
1697
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1698
                               ldisk=False):
1699
  """Check that mirrors are not degraded.
1700

1701
  @attention: The device has to be annotated already.
1702

1703
  The ldisk parameter, if True, will change the test from the
1704
  is_degraded attribute (which represents overall non-ok status for
1705
  the device(s)) to the ldisk (representing the local storage status).
1706

1707
  """
1708
  lu.cfg.SetDiskID(dev, node)
1709

    
1710
  result = True
1711

    
1712
  if on_primary or dev.AssembleOnSecondary():
1713
    rstats = lu.rpc.call_blockdev_find(node, dev)
1714
    msg = rstats.fail_msg
1715
    if msg:
1716
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1717
      result = False
1718
    elif not rstats.payload:
1719
      lu.LogWarning("Can't find disk on node %s", node)
1720
      result = False
1721
    else:
1722
      if ldisk:
1723
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1724
      else:
1725
        result = result and not rstats.payload.is_degraded
1726

    
1727
  if dev.children:
1728
    for child in dev.children:
1729
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1730
                                                     on_primary)
1731

    
1732
  return result
1733

    
1734

    
1735
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1736
  """Wrapper around L{_CheckDiskConsistencyInner}.
1737

1738
  """
1739
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1740
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1741
                                    ldisk=ldisk)
1742

    
1743

    
1744
def _BlockdevFind(lu, node, dev, instance):
1745
  """Wrapper around call_blockdev_find to annotate diskparams.
1746

1747
  @param lu: A reference to the lu object
1748
  @param node: The node to call out
1749
  @param dev: The device to find
1750
  @param instance: The instance object the device belongs to
1751
  @returns The result of the rpc call
1752

1753
  """
1754
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1755
  return lu.rpc.call_blockdev_find(node, disk)
1756

    
1757

    
1758
def _GenerateUniqueNames(lu, exts):
1759
  """Generate a suitable LV name.
1760

1761
  This will generate a logical volume name for the given instance.
1762

1763
  """
1764
  results = []
1765
  for val in exts:
1766
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1767
    results.append("%s%s" % (new_id, val))
1768
  return results
1769

    
1770

    
1771
class TLReplaceDisks(Tasklet):
1772
  """Replaces disks for an instance.
1773

1774
  Note: Locking is not within the scope of this class.
1775

1776
  """
1777
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1778
               disks, early_release, ignore_ipolicy):
1779
    """Initializes this class.
1780

1781
    """
1782
    Tasklet.__init__(self, lu)
1783

    
1784
    # Parameters
1785
    self.instance_name = instance_name
1786
    self.mode = mode
1787
    self.iallocator_name = iallocator_name
1788
    self.remote_node = remote_node
1789
    self.disks = disks
1790
    self.early_release = early_release
1791
    self.ignore_ipolicy = ignore_ipolicy
1792

    
1793
    # Runtime data
1794
    self.instance = None
1795
    self.new_node = None
1796
    self.target_node = None
1797
    self.other_node = None
1798
    self.remote_node_info = None
1799
    self.node_secondary_ip = None
1800

    
1801
  @staticmethod
1802
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1803
    """Compute a new secondary node using an IAllocator.
1804

1805
    """
1806
    req = iallocator.IAReqRelocate(name=instance_name,
1807
                                   relocate_from=list(relocate_from))
1808
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1809

    
1810
    ial.Run(iallocator_name)
1811

    
1812
    if not ial.success:
1813
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1814
                                 " %s" % (iallocator_name, ial.info),
1815
                                 errors.ECODE_NORES)
1816

    
1817
    remote_node_name = ial.result[0]
1818

    
1819
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1820
               instance_name, remote_node_name)
1821

    
1822
    return remote_node_name
1823

    
1824
  def _FindFaultyDisks(self, node_name):
1825
    """Wrapper for L{FindFaultyInstanceDisks}.
1826

1827
    """
1828
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1829
                                   node_name, True)
1830

    
1831
  def _CheckDisksActivated(self, instance):
1832
    """Checks if the instance disks are activated.
1833

1834
    @param instance: The instance to check disks
1835
    @return: True if they are activated, False otherwise
1836

1837
    """
1838
    nodes = instance.all_nodes
1839

    
1840
    for idx, dev in enumerate(instance.disks):
1841
      for node in nodes:
1842
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1843
        self.cfg.SetDiskID(dev, node)
1844

    
1845
        result = _BlockdevFind(self, node, dev, instance)
1846

    
1847
        if result.offline:
1848
          continue
1849
        elif result.fail_msg or not result.payload:
1850
          return False
1851

    
1852
    return True
1853

    
1854
  def CheckPrereq(self):
1855
    """Check prerequisites.
1856

1857
    This checks that the instance is in the cluster.
1858

1859
    """
1860
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1861
    assert instance is not None, \
1862
      "Cannot retrieve locked instance %s" % self.instance_name
1863

    
1864
    if instance.disk_template != constants.DT_DRBD8:
1865
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1866
                                 " instances", errors.ECODE_INVAL)
1867

    
1868
    if len(instance.secondary_nodes) != 1:
1869
      raise errors.OpPrereqError("The instance has a strange layout,"
1870
                                 " expected one secondary but found %d" %
1871
                                 len(instance.secondary_nodes),
1872
                                 errors.ECODE_FAULT)
1873

    
1874
    instance = self.instance
1875
    secondary_node = instance.secondary_nodes[0]
1876

    
1877
    if self.iallocator_name is None:
1878
      remote_node = self.remote_node
1879
    else:
1880
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1881
                                       instance.name, instance.secondary_nodes)
1882

    
1883
    if remote_node is None:
1884
      self.remote_node_info = None
1885
    else:
1886
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1887
             "Remote node '%s' is not locked" % remote_node
1888

    
1889
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1890
      assert self.remote_node_info is not None, \
1891
        "Cannot retrieve locked node %s" % remote_node
1892

    
1893
    if remote_node == self.instance.primary_node:
1894
      raise errors.OpPrereqError("The specified node is the primary node of"
1895
                                 " the instance", errors.ECODE_INVAL)
1896

    
1897
    if remote_node == secondary_node:
1898
      raise errors.OpPrereqError("The specified node is already the"
1899
                                 " secondary node of the instance",
1900
                                 errors.ECODE_INVAL)
1901

    
1902
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1903
                                    constants.REPLACE_DISK_CHG):
1904
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1905
                                 errors.ECODE_INVAL)
1906

    
1907
    if self.mode == constants.REPLACE_DISK_AUTO:
1908
      if not self._CheckDisksActivated(instance):
1909
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1910
                                   " first" % self.instance_name,
1911
                                   errors.ECODE_STATE)
1912
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1913
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1914

    
1915
      if faulty_primary and faulty_secondary:
1916
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1917
                                   " one node and can not be repaired"
1918
                                   " automatically" % self.instance_name,
1919
                                   errors.ECODE_STATE)
1920

    
1921
      if faulty_primary:
1922
        self.disks = faulty_primary
1923
        self.target_node = instance.primary_node
1924
        self.other_node = secondary_node
1925
        check_nodes = [self.target_node, self.other_node]
1926
      elif faulty_secondary:
1927
        self.disks = faulty_secondary
1928
        self.target_node = secondary_node
1929
        self.other_node = instance.primary_node
1930
        check_nodes = [self.target_node, self.other_node]
1931
      else:
1932
        self.disks = []
1933
        check_nodes = []
1934

    
1935
    else:
1936
      # Non-automatic modes
1937
      if self.mode == constants.REPLACE_DISK_PRI:
1938
        self.target_node = instance.primary_node
1939
        self.other_node = secondary_node
1940
        check_nodes = [self.target_node, self.other_node]
1941

    
1942
      elif self.mode == constants.REPLACE_DISK_SEC:
1943
        self.target_node = secondary_node
1944
        self.other_node = instance.primary_node
1945
        check_nodes = [self.target_node, self.other_node]
1946

    
1947
      elif self.mode == constants.REPLACE_DISK_CHG:
1948
        self.new_node = remote_node
1949
        self.other_node = instance.primary_node
1950
        self.target_node = secondary_node
1951
        check_nodes = [self.new_node, self.other_node]
1952

    
1953
        CheckNodeNotDrained(self.lu, remote_node)
1954
        CheckNodeVmCapable(self.lu, remote_node)
1955

    
1956
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1957
        assert old_node_info is not None
1958
        if old_node_info.offline and not self.early_release:
1959
          # doesn't make sense to delay the release
1960
          self.early_release = True
1961
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1962
                          " early-release mode", secondary_node)
1963

    
1964
      else:
1965
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1966
                                     self.mode)
1967

    
1968
      # If not specified all disks should be replaced
1969
      if not self.disks:
1970
        self.disks = range(len(self.instance.disks))
1971

    
1972
    # TODO: This is ugly, but right now we can't distinguish between internal
1973
    # submitted opcode and external one. We should fix that.
1974
    if self.remote_node_info:
1975
      # We change the node, lets verify it still meets instance policy
1976
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1977
      cluster = self.cfg.GetClusterInfo()
1978
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1979
                                                              new_group_info)
1980
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1981
                             self.cfg, ignore=self.ignore_ipolicy)
1982

    
1983
    for node in check_nodes:
1984
      CheckNodeOnline(self.lu, node)
1985

    
1986
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
1987
                                                          self.other_node,
1988
                                                          self.target_node]
1989
                              if node_name is not None)
1990

    
1991
    # Release unneeded node and node resource locks
1992
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1993
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1994
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1995

    
1996
    # Release any owned node group
1997
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1998

    
1999
    # Check whether disks are valid
2000
    for disk_idx in self.disks:
2001
      instance.FindDisk(disk_idx)
2002

    
2003
    # Get secondary node IP addresses
2004
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2005
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2006

    
2007
  def Exec(self, feedback_fn):
2008
    """Execute disk replacement.
2009

2010
    This dispatches the disk replacement to the appropriate handler.
2011

2012
    """
2013
    if __debug__:
2014
      # Verify owned locks before starting operation
2015
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2016
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2017
          ("Incorrect node locks, owning %s, expected %s" %
2018
           (owned_nodes, self.node_secondary_ip.keys()))
2019
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2020
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2021
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2022

    
2023
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2024
      assert list(owned_instances) == [self.instance_name], \
2025
          "Instance '%s' not locked" % self.instance_name
2026

    
2027
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2028
          "Should not own any node group lock at this point"
2029

    
2030
    if not self.disks:
2031
      feedback_fn("No disks need replacement for instance '%s'" %
2032
                  self.instance.name)
2033
      return
2034

    
2035
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2036
                (utils.CommaJoin(self.disks), self.instance.name))
2037
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2038
    feedback_fn("Current seconary node: %s" %
2039
                utils.CommaJoin(self.instance.secondary_nodes))
2040

    
2041
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2042

    
2043
    # Activate the instance disks if we're replacing them on a down instance
2044
    if activate_disks:
2045
      StartInstanceDisks(self.lu, self.instance, True)
2046

    
2047
    try:
2048
      # Should we replace the secondary node?
2049
      if self.new_node is not None:
2050
        fn = self._ExecDrbd8Secondary
2051
      else:
2052
        fn = self._ExecDrbd8DiskOnly
2053

    
2054
      result = fn(feedback_fn)
2055
    finally:
2056
      # Deactivate the instance disks if we're replacing them on a
2057
      # down instance
2058
      if activate_disks:
2059
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2060

    
2061
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2062

    
2063
    if __debug__:
2064
      # Verify owned locks
2065
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2066
      nodes = frozenset(self.node_secondary_ip)
2067
      assert ((self.early_release and not owned_nodes) or
2068
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2069
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2070
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2071

    
2072
    return result
2073

    
2074
  def _CheckVolumeGroup(self, nodes):
2075
    self.lu.LogInfo("Checking volume groups")
2076

    
2077
    vgname = self.cfg.GetVGName()
2078

    
2079
    # Make sure volume group exists on all involved nodes
2080
    results = self.rpc.call_vg_list(nodes)
2081
    if not results:
2082
      raise errors.OpExecError("Can't list volume groups on the nodes")
2083

    
2084
    for node in nodes:
2085
      res = results[node]
2086
      res.Raise("Error checking node %s" % node)
2087
      if vgname not in res.payload:
2088
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2089
                                 (vgname, node))
2090

    
2091
  def _CheckDisksExistence(self, nodes):
2092
    # Check disk existence
2093
    for idx, dev in enumerate(self.instance.disks):
2094
      if idx not in self.disks:
2095
        continue
2096

    
2097
      for node in nodes:
2098
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2099
        self.cfg.SetDiskID(dev, node)
2100

    
2101
        result = _BlockdevFind(self, node, dev, self.instance)
2102

    
2103
        msg = result.fail_msg
2104
        if msg or not result.payload:
2105
          if not msg:
2106
            msg = "disk not found"
2107
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2108
                                   (idx, node, msg))
2109

    
2110
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2111
    for idx, dev in enumerate(self.instance.disks):
2112
      if idx not in self.disks:
2113
        continue
2114

    
2115
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2116
                      (idx, node_name))
2117

    
2118
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2119
                                  on_primary, ldisk=ldisk):
2120
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2121
                                 " replace disks for instance %s" %
2122
                                 (node_name, self.instance.name))
2123

    
2124
  def _CreateNewStorage(self, node_name):
2125
    """Create new storage on the primary or secondary node.
2126

2127
    This is only used for same-node replaces, not for changing the
2128
    secondary node, hence we don't want to modify the existing disk.
2129

2130
    """
2131
    iv_names = {}
2132

    
2133
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2134
    for idx, dev in enumerate(disks):
2135
      if idx not in self.disks:
2136
        continue
2137

    
2138
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2139

    
2140
      self.cfg.SetDiskID(dev, node_name)
2141

    
2142
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2143
      names = _GenerateUniqueNames(self.lu, lv_names)
2144

    
2145
      (data_disk, meta_disk) = dev.children
2146
      vg_data = data_disk.logical_id[0]
2147
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2148
                             logical_id=(vg_data, names[0]),
2149
                             params=data_disk.params)
2150
      vg_meta = meta_disk.logical_id[0]
2151
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2152
                             size=constants.DRBD_META_SIZE,
2153
                             logical_id=(vg_meta, names[1]),
2154
                             params=meta_disk.params)
2155

    
2156
      new_lvs = [lv_data, lv_meta]
2157
      old_lvs = [child.Copy() for child in dev.children]
2158
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2159
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2160

    
2161
      # we pass force_create=True to force the LVM creation
2162
      for new_lv in new_lvs:
2163
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2164
                             GetInstanceInfoText(self.instance), False,
2165
                             excl_stor)
2166

    
2167
    return iv_names
2168

    
2169
  def _CheckDevices(self, node_name, iv_names):
2170
    for name, (dev, _, _) in iv_names.iteritems():
2171
      self.cfg.SetDiskID(dev, node_name)
2172

    
2173
      result = _BlockdevFind(self, node_name, dev, self.instance)
2174

    
2175
      msg = result.fail_msg
2176
      if msg or not result.payload:
2177
        if not msg:
2178
          msg = "disk not found"
2179
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2180
                                 (name, msg))
2181

    
2182
      if result.payload.is_degraded:
2183
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2184

    
2185
  def _RemoveOldStorage(self, node_name, iv_names):
2186
    for name, (_, old_lvs, _) in iv_names.iteritems():
2187
      self.lu.LogInfo("Remove logical volumes for %s", name)
2188

    
2189
      for lv in old_lvs:
2190
        self.cfg.SetDiskID(lv, node_name)
2191

    
2192
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2193
        if msg:
2194
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2195
                             hint="remove unused LVs manually")
2196

    
2197
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2198
    """Replace a disk on the primary or secondary for DRBD 8.
2199

2200
    The algorithm for replace is quite complicated:
2201

2202
      1. for each disk to be replaced:
2203

2204
        1. create new LVs on the target node with unique names
2205
        1. detach old LVs from the drbd device
2206
        1. rename old LVs to name_replaced.<time_t>
2207
        1. rename new LVs to old LVs
2208
        1. attach the new LVs (with the old names now) to the drbd device
2209

2210
      1. wait for sync across all devices
2211

2212
      1. for each modified disk:
2213

2214
        1. remove old LVs (which have the name name_replaces.<time_t>)
2215

2216
    Failures are not very well handled.
2217

2218
    """
2219
    steps_total = 6
2220

    
2221
    # Step: check device activation
2222
    self.lu.LogStep(1, steps_total, "Check device existence")
2223
    self._CheckDisksExistence([self.other_node, self.target_node])
2224
    self._CheckVolumeGroup([self.target_node, self.other_node])
2225

    
2226
    # Step: check other node consistency
2227
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2228
    self._CheckDisksConsistency(self.other_node,
2229
                                self.other_node == self.instance.primary_node,
2230
                                False)
2231

    
2232
    # Step: create new storage
2233
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2234
    iv_names = self._CreateNewStorage(self.target_node)
2235

    
2236
    # Step: for each lv, detach+rename*2+attach
2237
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2238
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2239
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2240

    
2241
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2242
                                                     old_lvs)
2243
      result.Raise("Can't detach drbd from local storage on node"
2244
                   " %s for device %s" % (self.target_node, dev.iv_name))
2245
      #dev.children = []
2246
      #cfg.Update(instance)
2247

    
2248
      # ok, we created the new LVs, so now we know we have the needed
2249
      # storage; as such, we proceed on the target node to rename
2250
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2251
      # using the assumption that logical_id == physical_id (which in
2252
      # turn is the unique_id on that node)
2253

    
2254
      # FIXME(iustin): use a better name for the replaced LVs
2255
      temp_suffix = int(time.time())
2256
      ren_fn = lambda d, suff: (d.physical_id[0],
2257
                                d.physical_id[1] + "_replaced-%s" % suff)
2258

    
2259
      # Build the rename list based on what LVs exist on the node
2260
      rename_old_to_new = []
2261
      for to_ren in old_lvs:
2262
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2263
        if not result.fail_msg and result.payload:
2264
          # device exists
2265
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2266

    
2267
      self.lu.LogInfo("Renaming the old LVs on the target node")
2268
      result = self.rpc.call_blockdev_rename(self.target_node,
2269
                                             rename_old_to_new)
2270
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2271

    
2272
      # Now we rename the new LVs to the old LVs
2273
      self.lu.LogInfo("Renaming the new LVs on the target node")
2274
      rename_new_to_old = [(new, old.physical_id)
2275
                           for old, new in zip(old_lvs, new_lvs)]
2276
      result = self.rpc.call_blockdev_rename(self.target_node,
2277
                                             rename_new_to_old)
2278
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2279

    
2280
      # Intermediate steps of in memory modifications
2281
      for old, new in zip(old_lvs, new_lvs):
2282
        new.logical_id = old.logical_id
2283
        self.cfg.SetDiskID(new, self.target_node)
2284

    
2285
      # We need to modify old_lvs so that removal later removes the
2286
      # right LVs, not the newly added ones; note that old_lvs is a
2287
      # copy here
2288
      for disk in old_lvs:
2289
        disk.logical_id = ren_fn(disk, temp_suffix)
2290
        self.cfg.SetDiskID(disk, self.target_node)
2291

    
2292
      # Now that the new lvs have the old name, we can add them to the device
2293
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2294
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2295
                                                  (dev, self.instance), new_lvs)
2296
      msg = result.fail_msg
2297
      if msg:
2298
        for new_lv in new_lvs:
2299
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2300
                                               new_lv).fail_msg
2301
          if msg2:
2302
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2303
                               hint=("cleanup manually the unused logical"
2304
                                     "volumes"))
2305
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2306

    
2307
    cstep = itertools.count(5)
2308

    
2309
    if self.early_release:
2310
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2311
      self._RemoveOldStorage(self.target_node, iv_names)
2312
      # TODO: Check if releasing locks early still makes sense
2313
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2314
    else:
2315
      # Release all resource locks except those used by the instance
2316
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2317
                   keep=self.node_secondary_ip.keys())
2318

    
2319
    # Release all node locks while waiting for sync
2320
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2321

    
2322
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2323
    # shutdown in the caller into consideration.
2324

    
2325
    # Wait for sync
2326
    # This can fail as the old devices are degraded and _WaitForSync
2327
    # does a combined result over all disks, so we don't check its return value
2328
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2329
    WaitForSync(self.lu, self.instance)
2330

    
2331
    # Check all devices manually
2332
    self._CheckDevices(self.instance.primary_node, iv_names)
2333

    
2334
    # Step: remove old storage
2335
    if not self.early_release:
2336
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2337
      self._RemoveOldStorage(self.target_node, iv_names)
2338

    
2339
  def _ExecDrbd8Secondary(self, feedback_fn):
2340
    """Replace the secondary node for DRBD 8.
2341

2342
    The algorithm for replace is quite complicated:
2343
      - for all disks of the instance:
2344
        - create new LVs on the new node with same names
2345
        - shutdown the drbd device on the old secondary
2346
        - disconnect the drbd network on the primary
2347
        - create the drbd device on the new secondary
2348
        - network attach the drbd on the primary, using an artifice:
2349
          the drbd code for Attach() will connect to the network if it
2350
          finds a device which is connected to the good local disks but
2351
          not network enabled
2352
      - wait for sync across all devices
2353
      - remove all disks from the old secondary
2354

2355
    Failures are not very well handled.
2356

2357
    """
2358
    steps_total = 6
2359

    
2360
    pnode = self.instance.primary_node
2361

    
2362
    # Step: check device activation
2363
    self.lu.LogStep(1, steps_total, "Check device existence")
2364
    self._CheckDisksExistence([self.instance.primary_node])
2365
    self._CheckVolumeGroup([self.instance.primary_node])
2366

    
2367
    # Step: check other node consistency
2368
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2369
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2370

    
2371
    # Step: create new storage
2372
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2373
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2374
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2375
    for idx, dev in enumerate(disks):
2376
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2377
                      (self.new_node, idx))
2378
      # we pass force_create=True to force LVM creation
2379
      for new_lv in dev.children:
2380
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2381
                             True, GetInstanceInfoText(self.instance), False,
2382
                             excl_stor)
2383

    
2384
    # Step 4: dbrd minors and drbd setups changes
2385
    # after this, we must manually remove the drbd minors on both the
2386
    # error and the success paths
2387
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2388
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2389
                                         for dev in self.instance.disks],
2390
                                        self.instance.name)
2391
    logging.debug("Allocated minors %r", minors)
2392

    
2393
    iv_names = {}
2394
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2395
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2396
                      (self.new_node, idx))
2397
      # create new devices on new_node; note that we create two IDs:
2398
      # one without port, so the drbd will be activated without
2399
      # networking information on the new node at this stage, and one
2400
      # with network, for the latter activation in step 4
2401
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2402
      if self.instance.primary_node == o_node1:
2403
        p_minor = o_minor1
2404
      else:
2405
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2406
        p_minor = o_minor2
2407

    
2408
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2409
                      p_minor, new_minor, o_secret)
2410
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2411
                    p_minor, new_minor, o_secret)
2412

    
2413
      iv_names[idx] = (dev, dev.children, new_net_id)
2414
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2415
                    new_net_id)
2416
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2417
                              logical_id=new_alone_id,
2418
                              children=dev.children,
2419
                              size=dev.size,
2420
                              params={})
2421
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2422
                                            self.cfg)
2423
      try:
2424
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2425
                             anno_new_drbd,
2426
                             GetInstanceInfoText(self.instance), False,
2427
                             excl_stor)
2428
      except errors.GenericError:
2429
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2430
        raise
2431

    
2432
    # We have new devices, shutdown the drbd on the old secondary
2433
    for idx, dev in enumerate(self.instance.disks):
2434
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2435
      self.cfg.SetDiskID(dev, self.target_node)
2436
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2437
                                            (dev, self.instance)).fail_msg
2438
      if msg:
2439
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2440
                           "node: %s" % (idx, msg),
2441
                           hint=("Please cleanup this device manually as"
2442
                                 " soon as possible"))
2443

    
2444
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2445
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2446
                                               self.instance.disks)[pnode]
2447

    
2448
    msg = result.fail_msg
2449
    if msg:
2450
      # detaches didn't succeed (unlikely)
2451
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2452
      raise errors.OpExecError("Can't detach the disks from the network on"
2453
                               " old node: %s" % (msg,))
2454

    
2455
    # if we managed to detach at least one, we update all the disks of
2456
    # the instance to point to the new secondary
2457
    self.lu.LogInfo("Updating instance configuration")
2458
    for dev, _, new_logical_id in iv_names.itervalues():
2459
      dev.logical_id = new_logical_id
2460
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2461

    
2462
    self.cfg.Update(self.instance, feedback_fn)
2463

    
2464
    # Release all node locks (the configuration has been updated)
2465
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2466

    
2467
    # and now perform the drbd attach
2468
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2469
                    " (standalone => connected)")
2470
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2471
                                            self.new_node],
2472
                                           self.node_secondary_ip,
2473
                                           (self.instance.disks, self.instance),
2474
                                           self.instance.name,
2475
                                           False)
2476
    for to_node, to_result in result.items():
2477
      msg = to_result.fail_msg
2478
      if msg:
2479
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2480
                           to_node, msg,
2481
                           hint=("please do a gnt-instance info to see the"
2482
                                 " status of disks"))
2483

    
2484
    cstep = itertools.count(5)
2485

    
2486
    if self.early_release:
2487
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2488
      self._RemoveOldStorage(self.target_node, iv_names)
2489
      # TODO: Check if releasing locks early still makes sense
2490
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2491
    else:
2492
      # Release all resource locks except those used by the instance
2493
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2494
                   keep=self.node_secondary_ip.keys())
2495

    
2496
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2497
    # shutdown in the caller into consideration.
2498

    
2499
    # Wait for sync
2500
    # This can fail as the old devices are degraded and _WaitForSync
2501
    # does a combined result over all disks, so we don't check its return value
2502
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2503
    WaitForSync(self.lu, self.instance)
2504

    
2505
    # Check all devices manually
2506
    self._CheckDevices(self.instance.primary_node, iv_names)
2507

    
2508
    # Step: remove old storage
2509
    if not self.early_release:
2510
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2511
      self._RemoveOldStorage(self.target_node, iv_names)