Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 6839584c

History | View | Annotate | Download (93.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  constants.DT_FILE: ".file",
56
  constants.DT_SHARED_FILE: ".sharedfile",
57
  }
58

    
59

    
60
_DISK_TEMPLATE_DEVICE_TYPE = {
61
  constants.DT_PLAIN: constants.LD_LV,
62
  constants.DT_FILE: constants.LD_FILE,
63
  constants.DT_SHARED_FILE: constants.LD_FILE,
64
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
65
  constants.DT_RBD: constants.LD_RBD,
66
  constants.DT_EXT: constants.LD_EXT,
67
  }
68

    
69

    
70
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
71
                         excl_stor):
72
  """Create a single block device on a given node.
73

74
  This will not recurse over children of the device, so they must be
75
  created in advance.
76

77
  @param lu: the lu on whose behalf we execute
78
  @param node: the node on which to create the device
79
  @type instance: L{objects.Instance}
80
  @param instance: the instance which owns the device
81
  @type device: L{objects.Disk}
82
  @param device: the device to create
83
  @param info: the extra 'metadata' we should attach to the device
84
      (this will be represented as a LVM tag)
85
  @type force_open: boolean
86
  @param force_open: this parameter will be passes to the
87
      L{backend.BlockdevCreate} function where it specifies
88
      whether we run on primary or not, and it affects both
89
      the child assembly and the device own Open() execution
90
  @type excl_stor: boolean
91
  @param excl_stor: Whether exclusive_storage is active for the node
92

93
  """
94
  lu.cfg.SetDiskID(device, node)
95
  result = lu.rpc.call_blockdev_create(node, device, device.size,
96
                                       instance.name, force_open, info,
97
                                       excl_stor)
98
  result.Raise("Can't create block device %s on"
99
               " node %s for instance %s" % (device, node, instance.name))
100
  if device.physical_id is None:
101
    device.physical_id = result.payload
102

    
103

    
104
def _CreateBlockDevInner(lu, node, instance, device, force_create,
105
                         info, force_open, excl_stor):
106
  """Create a tree of block devices on a given node.
107

108
  If this device type has to be created on secondaries, create it and
109
  all its children.
110

111
  If not, just recurse to children keeping the same 'force' value.
112

113
  @attention: The device has to be annotated already.
114

115
  @param lu: the lu on whose behalf we execute
116
  @param node: the node on which to create the device
117
  @type instance: L{objects.Instance}
118
  @param instance: the instance which owns the device
119
  @type device: L{objects.Disk}
120
  @param device: the device to create
121
  @type force_create: boolean
122
  @param force_create: whether to force creation of this device; this
123
      will be change to True whenever we find a device which has
124
      CreateOnSecondary() attribute
125
  @param info: the extra 'metadata' we should attach to the device
126
      (this will be represented as a LVM tag)
127
  @type force_open: boolean
128
  @param force_open: this parameter will be passes to the
129
      L{backend.BlockdevCreate} function where it specifies
130
      whether we run on primary or not, and it affects both
131
      the child assembly and the device own Open() execution
132
  @type excl_stor: boolean
133
  @param excl_stor: Whether exclusive_storage is active for the node
134

135
  @return: list of created devices
136
  """
137
  created_devices = []
138
  try:
139
    if device.CreateOnSecondary():
140
      force_create = True
141

    
142
    if device.children:
143
      for child in device.children:
144
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
145
                                    info, force_open, excl_stor)
146
        created_devices.extend(devs)
147

    
148
    if not force_create:
149
      return created_devices
150

    
151
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
152
                         excl_stor)
153
    # The device has been completely created, so there is no point in keeping
154
    # its subdevices in the list. We just add the device itself instead.
155
    created_devices = [(node, device)]
156
    return created_devices
157

    
158
  except errors.DeviceCreationError, e:
159
    e.created_devices.extend(created_devices)
160
    raise e
161
  except errors.OpExecError, e:
162
    raise errors.DeviceCreationError(str(e), created_devices)
163

    
164

    
165
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
166
  """Whether exclusive_storage is in effect for the given node.
167

168
  @type cfg: L{config.ConfigWriter}
169
  @param cfg: The cluster configuration
170
  @type nodename: string
171
  @param nodename: The node
172
  @rtype: bool
173
  @return: The effective value of exclusive_storage
174
  @raise errors.OpPrereqError: if no node exists with the given name
175

176
  """
177
  ni = cfg.GetNodeInfo(nodename)
178
  if ni is None:
179
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
180
                               errors.ECODE_NOENT)
181
  return IsExclusiveStorageEnabledNode(cfg, ni)
182

    
183

    
184
def _CreateBlockDev(lu, node, instance, device, force_create, info,
185
                    force_open):
186
  """Wrapper around L{_CreateBlockDevInner}.
187

188
  This method annotates the root device first.
189

190
  """
191
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
193
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
194
                              force_open, excl_stor)
195

    
196

    
197
def _UndoCreateDisks(lu, disks_created):
198
  """Undo the work performed by L{CreateDisks}.
199

200
  This function is called in case of an error to undo the work of
201
  L{CreateDisks}.
202

203
  @type lu: L{LogicalUnit}
204
  @param lu: the logical unit on whose behalf we execute
205
  @param disks_created: the result returned by L{CreateDisks}
206

207
  """
208
  for (node, disk) in disks_created:
209
    lu.cfg.SetDiskID(disk, node)
210
    result = lu.rpc.call_blockdev_remove(node, disk)
211
    if result.fail_msg:
212
      logging.warning("Failed to remove newly-created disk %s on node %s:"
213
                      " %s", disk, node, result.fail_msg)
214

    
215

    
216
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
217
  """Create all disks for an instance.
218

219
  This abstracts away some work from AddInstance.
220

221
  @type lu: L{LogicalUnit}
222
  @param lu: the logical unit on whose behalf we execute
223
  @type instance: L{objects.Instance}
224
  @param instance: the instance whose disks we should create
225
  @type to_skip: list
226
  @param to_skip: list of indices to skip
227
  @type target_node: string
228
  @param target_node: if passed, overrides the target node for creation
229
  @type disks: list of {objects.Disk}
230
  @param disks: the disks to create; if not specified, all the disks of the
231
      instance are created
232
  @return: information about the created disks, to be used to call
233
      L{_UndoCreateDisks}
234
  @raise errors.OpPrereqError: in case of error
235

236
  """
237
  info = GetInstanceInfoText(instance)
238
  if target_node is None:
239
    pnode = instance.primary_node
240
    all_nodes = instance.all_nodes
241
  else:
242
    pnode = target_node
243
    all_nodes = [pnode]
244

    
245
  if disks is None:
246
    disks = instance.disks
247

    
248
  if instance.disk_template in constants.DTS_FILEBASED:
249
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
250
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
251

    
252
    result.Raise("Failed to create directory '%s' on"
253
                 " node %s" % (file_storage_dir, pnode))
254

    
255
  disks_created = []
256
  for idx, device in enumerate(disks):
257
    if to_skip and idx in to_skip:
258
      continue
259
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
260
    for node in all_nodes:
261
      f_create = node == pnode
262
      try:
263
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
264
        disks_created.append((node, device))
265
      except errors.DeviceCreationError, e:
266
        logging.warning("Creating disk %s for instance '%s' failed",
267
                        idx, instance.name)
268
        disks_created.extend(e.created_devices)
269
        _UndoCreateDisks(lu, disks_created)
270
        raise errors.OpExecError(e.message)
271
  return disks_created
272

    
273

    
274
def ComputeDiskSizePerVG(disk_template, disks):
275
  """Compute disk size requirements in the volume group
276

277
  """
278
  def _compute(disks, payload):
279
    """Universal algorithm.
280

281
    """
282
    vgs = {}
283
    for disk in disks:
284
      vgs[disk[constants.IDISK_VG]] = \
285
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
286

    
287
    return vgs
288

    
289
  # Required free disk space as a function of disk and swap space
290
  req_size_dict = {
291
    constants.DT_DISKLESS: {},
292
    constants.DT_PLAIN: _compute(disks, 0),
293
    # 128 MB are added for drbd metadata for each disk
294
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
295
    constants.DT_FILE: {},
296
    constants.DT_SHARED_FILE: {},
297
    }
298

    
299
  if disk_template not in req_size_dict:
300
    raise errors.ProgrammerError("Disk template '%s' size requirement"
301
                                 " is unknown" % disk_template)
302

    
303
  return req_size_dict[disk_template]
304

    
305

    
306
def ComputeDisks(op, default_vg):
307
  """Computes the instance disks.
308

309
  @param op: The instance opcode
310
  @param default_vg: The default_vg to assume
311

312
  @return: The computed disks
313

314
  """
315
  disks = []
316
  for disk in op.disks:
317
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
318
    if mode not in constants.DISK_ACCESS_SET:
319
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
320
                                 mode, errors.ECODE_INVAL)
321
    size = disk.get(constants.IDISK_SIZE, None)
322
    if size is None:
323
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
324
    try:
325
      size = int(size)
326
    except (TypeError, ValueError):
327
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
328
                                 errors.ECODE_INVAL)
329

    
330
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
331
    if ext_provider and op.disk_template != constants.DT_EXT:
332
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
333
                                 " disk template, not %s" %
334
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
335
                                  op.disk_template), errors.ECODE_INVAL)
336

    
337
    data_vg = disk.get(constants.IDISK_VG, default_vg)
338
    name = disk.get(constants.IDISK_NAME, None)
339
    if name is not None and name.lower() == constants.VALUE_NONE:
340
      name = None
341
    new_disk = {
342
      constants.IDISK_SIZE: size,
343
      constants.IDISK_MODE: mode,
344
      constants.IDISK_VG: data_vg,
345
      constants.IDISK_NAME: name,
346
      }
347

    
348
    if constants.IDISK_METAVG in disk:
349
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
350
    if constants.IDISK_ADOPT in disk:
351
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
352

    
353
    # For extstorage, demand the `provider' option and add any
354
    # additional parameters (ext-params) to the dict
355
    if op.disk_template == constants.DT_EXT:
356
      if ext_provider:
357
        new_disk[constants.IDISK_PROVIDER] = ext_provider
358
        for key in disk:
359
          if key not in constants.IDISK_PARAMS:
360
            new_disk[key] = disk[key]
361
      else:
362
        raise errors.OpPrereqError("Missing provider for template '%s'" %
363
                                   constants.DT_EXT, errors.ECODE_INVAL)
364

    
365
    disks.append(new_disk)
366

    
367
  return disks
368

    
369

    
370
def CheckRADOSFreeSpace():
371
  """Compute disk size requirements inside the RADOS cluster.
372

373
  """
374
  # For the RADOS cluster we assume there is always enough space.
375
  pass
376

    
377

    
378
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
379
                         iv_name, p_minor, s_minor):
380
  """Generate a drbd8 device complete with its children.
381

382
  """
383
  assert len(vgnames) == len(names) == 2
384
  port = lu.cfg.AllocatePort()
385
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
386

    
387
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
388
                          logical_id=(vgnames[0], names[0]),
389
                          params={})
390
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
391
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
392
                          size=constants.DRBD_META_SIZE,
393
                          logical_id=(vgnames[1], names[1]),
394
                          params={})
395
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
396
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
397
                          logical_id=(primary, secondary, port,
398
                                      p_minor, s_minor,
399
                                      shared_secret),
400
                          children=[dev_data, dev_meta],
401
                          iv_name=iv_name, params={})
402
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
403
  return drbd_dev
404

    
405

    
406
def GenerateDiskTemplate(
407
  lu, template_name, instance_name, primary_node, secondary_nodes,
408
  disk_info, file_storage_dir, file_driver, base_index,
409
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
410
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
411
  """Generate the entire disk layout for a given template type.
412

413
  """
414
  vgname = lu.cfg.GetVGName()
415
  disk_count = len(disk_info)
416
  disks = []
417

    
418
  if template_name == constants.DT_DISKLESS:
419
    pass
420
  elif template_name == constants.DT_DRBD8:
421
    if len(secondary_nodes) != 1:
422
      raise errors.ProgrammerError("Wrong template configuration")
423
    remote_node = secondary_nodes[0]
424
    minors = lu.cfg.AllocateDRBDMinor(
425
      [primary_node, remote_node] * len(disk_info), instance_name)
426

    
427
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
428
                                                       full_disk_params)
429
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
430

    
431
    names = []
432
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
433
                                               for i in range(disk_count)]):
434
      names.append(lv_prefix + "_data")
435
      names.append(lv_prefix + "_meta")
436
    for idx, disk in enumerate(disk_info):
437
      disk_index = idx + base_index
438
      data_vg = disk.get(constants.IDISK_VG, vgname)
439
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
440
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
441
                                      disk[constants.IDISK_SIZE],
442
                                      [data_vg, meta_vg],
443
                                      names[idx * 2:idx * 2 + 2],
444
                                      "disk/%d" % disk_index,
445
                                      minors[idx * 2], minors[idx * 2 + 1])
446
      disk_dev.mode = disk[constants.IDISK_MODE]
447
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
448
      disks.append(disk_dev)
449
  else:
450
    if secondary_nodes:
451
      raise errors.ProgrammerError("Wrong template configuration")
452

    
453
    if template_name == constants.DT_FILE:
454
      _req_file_storage()
455
    elif template_name == constants.DT_SHARED_FILE:
456
      _req_shr_file_storage()
457

    
458
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
459
    if name_prefix is None:
460
      names = None
461
    else:
462
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
463
                                        (name_prefix, base_index + i)
464
                                        for i in range(disk_count)])
465

    
466
    if template_name == constants.DT_PLAIN:
467

    
468
      def logical_id_fn(idx, _, disk):
469
        vg = disk.get(constants.IDISK_VG, vgname)
470
        return (vg, names[idx])
471

    
472
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
473
      logical_id_fn = \
474
        lambda _, disk_index, disk: (file_driver,
475
                                     "%s/%s" % (file_storage_dir,
476
                                                names[idx]))
477
    elif template_name == constants.DT_BLOCK:
478
      logical_id_fn = \
479
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
480
                                       disk[constants.IDISK_ADOPT])
481
    elif template_name == constants.DT_RBD:
482
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
483
    elif template_name == constants.DT_EXT:
484
      def logical_id_fn(idx, _, disk):
485
        provider = disk.get(constants.IDISK_PROVIDER, None)
486
        if provider is None:
487
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
488
                                       " not found", constants.DT_EXT,
489
                                       constants.IDISK_PROVIDER)
490
        return (provider, names[idx])
491
    else:
492
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
493

    
494
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
495

    
496
    for idx, disk in enumerate(disk_info):
497
      params = {}
498
      # Only for the Ext template add disk_info to params
499
      if template_name == constants.DT_EXT:
500
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
501
        for key in disk:
502
          if key not in constants.IDISK_PARAMS:
503
            params[key] = disk[key]
504
      disk_index = idx + base_index
505
      size = disk[constants.IDISK_SIZE]
506
      feedback_fn("* disk %s, size %s" %
507
                  (disk_index, utils.FormatUnit(size, "h")))
508
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
509
                              logical_id=logical_id_fn(idx, disk_index, disk),
510
                              iv_name="disk/%d" % disk_index,
511
                              mode=disk[constants.IDISK_MODE],
512
                              params=params)
513
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
514
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
515
      disks.append(disk_dev)
516

    
517
  return disks
518

    
519

    
520
class LUInstanceRecreateDisks(LogicalUnit):
521
  """Recreate an instance's missing disks.
522

523
  """
524
  HPATH = "instance-recreate-disks"
525
  HTYPE = constants.HTYPE_INSTANCE
526
  REQ_BGL = False
527

    
528
  _MODIFYABLE = compat.UniqueFrozenset([
529
    constants.IDISK_SIZE,
530
    constants.IDISK_MODE,
531
    ])
532

    
533
  # New or changed disk parameters may have different semantics
534
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
535
    constants.IDISK_ADOPT,
536

    
537
    # TODO: Implement support changing VG while recreating
538
    constants.IDISK_VG,
539
    constants.IDISK_METAVG,
540
    constants.IDISK_PROVIDER,
541
    constants.IDISK_NAME,
542
    constants.IDISK_SNAPSHOT_NAME,
543
    ]))
544

    
545
  def _RunAllocator(self):
546
    """Run the allocator based on input opcode.
547

548
    """
549
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
550

    
551
    # FIXME
552
    # The allocator should actually run in "relocate" mode, but current
553
    # allocators don't support relocating all the nodes of an instance at
554
    # the same time. As a workaround we use "allocate" mode, but this is
555
    # suboptimal for two reasons:
556
    # - The instance name passed to the allocator is present in the list of
557
    #   existing instances, so there could be a conflict within the
558
    #   internal structures of the allocator. This doesn't happen with the
559
    #   current allocators, but it's a liability.
560
    # - The allocator counts the resources used by the instance twice: once
561
    #   because the instance exists already, and once because it tries to
562
    #   allocate a new instance.
563
    # The allocator could choose some of the nodes on which the instance is
564
    # running, but that's not a problem. If the instance nodes are broken,
565
    # they should be already be marked as drained or offline, and hence
566
    # skipped by the allocator. If instance disks have been lost for other
567
    # reasons, then recreating the disks on the same nodes should be fine.
568
    disk_template = self.instance.disk_template
569
    spindle_use = be_full[constants.BE_SPINDLE_USE]
570
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
571
                                        disk_template=disk_template,
572
                                        tags=list(self.instance.GetTags()),
573
                                        os=self.instance.os,
574
                                        nics=[{}],
575
                                        vcpus=be_full[constants.BE_VCPUS],
576
                                        memory=be_full[constants.BE_MAXMEM],
577
                                        spindle_use=spindle_use,
578
                                        disks=[{constants.IDISK_SIZE: d.size,
579
                                                constants.IDISK_MODE: d.mode}
580
                                               for d in self.instance.disks],
581
                                        hypervisor=self.instance.hypervisor,
582
                                        node_whitelist=None)
583
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
584

    
585
    ial.Run(self.op.iallocator)
586

    
587
    assert req.RequiredNodes() == len(self.instance.all_nodes)
588

    
589
    if not ial.success:
590
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
591
                                 " %s" % (self.op.iallocator, ial.info),
592
                                 errors.ECODE_NORES)
593

    
594
    self.op.nodes = ial.result
595
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
596
                 self.op.instance_name, self.op.iallocator,
597
                 utils.CommaJoin(ial.result))
598

    
599
  def CheckArguments(self):
600
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
601
      # Normalize and convert deprecated list of disk indices
602
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
603

    
604
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
605
    if duplicates:
606
      raise errors.OpPrereqError("Some disks have been specified more than"
607
                                 " once: %s" % utils.CommaJoin(duplicates),
608
                                 errors.ECODE_INVAL)
609

    
610
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
611
    # when neither iallocator nor nodes are specified
612
    if self.op.iallocator or self.op.nodes:
613
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
614

    
615
    for (idx, params) in self.op.disks:
616
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
617
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
618
      if unsupported:
619
        raise errors.OpPrereqError("Parameters for disk %s try to change"
620
                                   " unmodifyable parameter(s): %s" %
621
                                   (idx, utils.CommaJoin(unsupported)),
622
                                   errors.ECODE_INVAL)
623

    
624
  def ExpandNames(self):
625
    self._ExpandAndLockInstance()
626
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
627

    
628
    if self.op.nodes:
629
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
630
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
631
    else:
632
      self.needed_locks[locking.LEVEL_NODE] = []
633
      if self.op.iallocator:
634
        # iallocator will select a new node in the same group
635
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
636
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
637

    
638
    self.needed_locks[locking.LEVEL_NODE_RES] = []
639

    
640
  def DeclareLocks(self, level):
641
    if level == locking.LEVEL_NODEGROUP:
642
      assert self.op.iallocator is not None
643
      assert not self.op.nodes
644
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
645
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
646
      # Lock the primary group used by the instance optimistically; this
647
      # requires going via the node before it's locked, requiring
648
      # verification later on
649
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
650
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
651

    
652
    elif level == locking.LEVEL_NODE:
653
      # If an allocator is used, then we lock all the nodes in the current
654
      # instance group, as we don't know yet which ones will be selected;
655
      # if we replace the nodes without using an allocator, locks are
656
      # already declared in ExpandNames; otherwise, we need to lock all the
657
      # instance nodes for disk re-creation
658
      if self.op.iallocator:
659
        assert not self.op.nodes
660
        assert not self.needed_locks[locking.LEVEL_NODE]
661
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
662

    
663
        # Lock member nodes of the group of the primary node
664
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
665
          self.needed_locks[locking.LEVEL_NODE].extend(
666
            self.cfg.GetNodeGroup(group_uuid).members)
667

    
668
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
669
      elif not self.op.nodes:
670
        self._LockInstancesNodes(primary_only=False)
671
    elif level == locking.LEVEL_NODE_RES:
672
      # Copy node locks
673
      self.needed_locks[locking.LEVEL_NODE_RES] = \
674
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
675

    
676
  def BuildHooksEnv(self):
677
    """Build hooks env.
678

679
    This runs on master, primary and secondary nodes of the instance.
680

681
    """
682
    return BuildInstanceHookEnvByObject(self, self.instance)
683

    
684
  def BuildHooksNodes(self):
685
    """Build hooks nodes.
686

687
    """
688
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
689
    return (nl, nl)
690

    
691
  def CheckPrereq(self):
692
    """Check prerequisites.
693

694
    This checks that the instance is in the cluster and is not running.
695

696
    """
697
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
698
    assert instance is not None, \
699
      "Cannot retrieve locked instance %s" % self.op.instance_name
700
    if self.op.nodes:
701
      if len(self.op.nodes) != len(instance.all_nodes):
702
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
703
                                   " %d replacement nodes were specified" %
704
                                   (instance.name, len(instance.all_nodes),
705
                                    len(self.op.nodes)),
706
                                   errors.ECODE_INVAL)
707
      assert instance.disk_template != constants.DT_DRBD8 or \
708
             len(self.op.nodes) == 2
709
      assert instance.disk_template != constants.DT_PLAIN or \
710
             len(self.op.nodes) == 1
711
      primary_node = self.op.nodes[0]
712
    else:
713
      primary_node = instance.primary_node
714
    if not self.op.iallocator:
715
      CheckNodeOnline(self, primary_node)
716

    
717
    if instance.disk_template == constants.DT_DISKLESS:
718
      raise errors.OpPrereqError("Instance '%s' has no disks" %
719
                                 self.op.instance_name, errors.ECODE_INVAL)
720

    
721
    # Verify if node group locks are still correct
722
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
723
    if owned_groups:
724
      # Node group locks are acquired only for the primary node (and only
725
      # when the allocator is used)
726
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
727
                              primary_only=True)
728

    
729
    # if we replace nodes *and* the old primary is offline, we don't
730
    # check the instance state
731
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
732
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
733
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
734
                         msg="cannot recreate disks")
735

    
736
    if self.op.disks:
737
      self.disks = dict(self.op.disks)
738
    else:
739
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
740

    
741
    maxidx = max(self.disks.keys())
742
    if maxidx >= len(instance.disks):
743
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
744
                                 errors.ECODE_INVAL)
745

    
746
    if ((self.op.nodes or self.op.iallocator) and
747
         sorted(self.disks.keys()) != range(len(instance.disks))):
748
      raise errors.OpPrereqError("Can't recreate disks partially and"
749
                                 " change the nodes at the same time",
750
                                 errors.ECODE_INVAL)
751

    
752
    self.instance = instance
753

    
754
    if self.op.iallocator:
755
      self._RunAllocator()
756
      # Release unneeded node and node resource locks
757
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
758
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
759
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
760

    
761
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
762

    
763
  def Exec(self, feedback_fn):
764
    """Recreate the disks.
765

766
    """
767
    instance = self.instance
768

    
769
    assert (self.owned_locks(locking.LEVEL_NODE) ==
770
            self.owned_locks(locking.LEVEL_NODE_RES))
771

    
772
    to_skip = []
773
    mods = [] # keeps track of needed changes
774

    
775
    for idx, disk in enumerate(instance.disks):
776
      try:
777
        changes = self.disks[idx]
778
      except KeyError:
779
        # Disk should not be recreated
780
        to_skip.append(idx)
781
        continue
782

    
783
      # update secondaries for disks, if needed
784
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
785
        # need to update the nodes and minors
786
        assert len(self.op.nodes) == 2
787
        assert len(disk.logical_id) == 6 # otherwise disk internals
788
                                         # have changed
789
        (_, _, old_port, _, _, old_secret) = disk.logical_id
790
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
791
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
792
                  new_minors[0], new_minors[1], old_secret)
793
        assert len(disk.logical_id) == len(new_id)
794
      else:
795
        new_id = None
796

    
797
      mods.append((idx, new_id, changes))
798

    
799
    # now that we have passed all asserts above, we can apply the mods
800
    # in a single run (to avoid partial changes)
801
    for idx, new_id, changes in mods:
802
      disk = instance.disks[idx]
803
      if new_id is not None:
804
        assert disk.dev_type == constants.LD_DRBD8
805
        disk.logical_id = new_id
806
      if changes:
807
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
808
                    mode=changes.get(constants.IDISK_MODE, None))
809

    
810
    # change primary node, if needed
811
    if self.op.nodes:
812
      instance.primary_node = self.op.nodes[0]
813
      self.LogWarning("Changing the instance's nodes, you will have to"
814
                      " remove any disks left on the older nodes manually")
815

    
816
    if self.op.nodes:
817
      self.cfg.Update(instance, feedback_fn)
818

    
819
    # All touched nodes must be locked
820
    mylocks = self.owned_locks(locking.LEVEL_NODE)
821
    assert mylocks.issuperset(frozenset(instance.all_nodes))
822
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
823

    
824
    # TODO: Release node locks before wiping, or explain why it's not possible
825
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
826
      wipedisks = [(idx, disk, 0)
827
                   for (idx, disk) in enumerate(instance.disks)
828
                   if idx not in to_skip]
829
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
830

    
831

    
832
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
833
  """Checks if nodes have enough free disk space in the specified VG.
834

835
  This function checks if all given nodes have the needed amount of
836
  free disk. In case any node has less disk or we cannot get the
837
  information from the node, this function raises an OpPrereqError
838
  exception.
839

840
  @type lu: C{LogicalUnit}
841
  @param lu: a logical unit from which we get configuration data
842
  @type nodenames: C{list}
843
  @param nodenames: the list of node names to check
844
  @type vg: C{str}
845
  @param vg: the volume group to check
846
  @type requested: C{int}
847
  @param requested: the amount of disk in MiB to check for
848
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
849
      or we cannot check the node
850

851
  """
852
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
853
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
854
  for node in nodenames:
855
    info = nodeinfo[node]
856
    info.Raise("Cannot get current information from node %s" % node,
857
               prereq=True, ecode=errors.ECODE_ENVIRON)
858
    (_, (vg_info, ), _) = info.payload
859
    vg_free = vg_info.get("vg_free", None)
860
    if not isinstance(vg_free, int):
861
      raise errors.OpPrereqError("Can't compute free disk space on node"
862
                                 " %s for vg %s, result was '%s'" %
863
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
864
    if requested > vg_free:
865
      raise errors.OpPrereqError("Not enough disk space on target node %s"
866
                                 " vg %s: required %d MiB, available %d MiB" %
867
                                 (node, vg, requested, vg_free),
868
                                 errors.ECODE_NORES)
869

    
870

    
871
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
872
  """Checks if nodes have enough free disk space in all the VGs.
873

874
  This function checks if all given nodes have the needed amount of
875
  free disk. In case any node has less disk or we cannot get the
876
  information from the node, this function raises an OpPrereqError
877
  exception.
878

879
  @type lu: C{LogicalUnit}
880
  @param lu: a logical unit from which we get configuration data
881
  @type nodenames: C{list}
882
  @param nodenames: the list of node names to check
883
  @type req_sizes: C{dict}
884
  @param req_sizes: the hash of vg and corresponding amount of disk in
885
      MiB to check for
886
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
887
      or we cannot check the node
888

889
  """
890
  for vg, req_size in req_sizes.items():
891
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
892

    
893

    
894
def _DiskSizeInBytesToMebibytes(lu, size):
895
  """Converts a disk size in bytes to mebibytes.
896

897
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
898

899
  """
900
  (mib, remainder) = divmod(size, 1024 * 1024)
901

    
902
  if remainder != 0:
903
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
904
                  " to not overwrite existing data (%s bytes will not be"
905
                  " wiped)", (1024 * 1024) - remainder)
906
    mib += 1
907

    
908
  return mib
909

    
910

    
911
def _CalcEta(time_taken, written, total_size):
912
  """Calculates the ETA based on size written and total size.
913

914
  @param time_taken: The time taken so far
915
  @param written: amount written so far
916
  @param total_size: The total size of data to be written
917
  @return: The remaining time in seconds
918

919
  """
920
  avg_time = time_taken / float(written)
921
  return (total_size - written) * avg_time
922

    
923

    
924
def WipeDisks(lu, instance, disks=None):
925
  """Wipes instance disks.
926

927
  @type lu: L{LogicalUnit}
928
  @param lu: the logical unit on whose behalf we execute
929
  @type instance: L{objects.Instance}
930
  @param instance: the instance whose disks we should create
931
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
932
  @param disks: Disk details; tuple contains disk index, disk object and the
933
    start offset
934

935
  """
936
  node = instance.primary_node
937

    
938
  if disks is None:
939
    disks = [(idx, disk, 0)
940
             for (idx, disk) in enumerate(instance.disks)]
941

    
942
  for (_, device, _) in disks:
943
    lu.cfg.SetDiskID(device, node)
944

    
945
  logging.info("Pausing synchronization of disks of instance '%s'",
946
               instance.name)
947
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
948
                                                  (map(compat.snd, disks),
949
                                                   instance),
950
                                                  True)
951
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
952

    
953
  for idx, success in enumerate(result.payload):
954
    if not success:
955
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
956
                   " failed", idx, instance.name)
957

    
958
  try:
959
    for (idx, device, offset) in disks:
960
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
961
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
962
      wipe_chunk_size = \
963
        int(min(constants.MAX_WIPE_CHUNK,
964
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
965

    
966
      size = device.size
967
      last_output = 0
968
      start_time = time.time()
969

    
970
      if offset == 0:
971
        info_text = ""
972
      else:
973
        info_text = (" (from %s to %s)" %
974
                     (utils.FormatUnit(offset, "h"),
975
                      utils.FormatUnit(size, "h")))
976

    
977
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
978

    
979
      logging.info("Wiping disk %d for instance %s on node %s using"
980
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
981

    
982
      while offset < size:
983
        wipe_size = min(wipe_chunk_size, size - offset)
984

    
985
        logging.debug("Wiping disk %d, offset %s, chunk %s",
986
                      idx, offset, wipe_size)
987

    
988
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
989
                                           wipe_size)
990
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
991
                     (idx, offset, wipe_size))
992

    
993
        now = time.time()
994
        offset += wipe_size
995
        if now - last_output >= 60:
996
          eta = _CalcEta(now - start_time, offset, size)
997
          lu.LogInfo(" - done: %.1f%% ETA: %s",
998
                     offset / float(size) * 100, utils.FormatSeconds(eta))
999
          last_output = now
1000
  finally:
1001
    logging.info("Resuming synchronization of disks for instance '%s'",
1002
                 instance.name)
1003

    
1004
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1005
                                                    (map(compat.snd, disks),
1006
                                                     instance),
1007
                                                    False)
1008

    
1009
    if result.fail_msg:
1010
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1011
                    node, result.fail_msg)
1012
    else:
1013
      for idx, success in enumerate(result.payload):
1014
        if not success:
1015
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1016
                        " failed", idx, instance.name)
1017

    
1018

    
1019
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1020
  """Wrapper for L{WipeDisks} that handles errors.
1021

1022
  @type lu: L{LogicalUnit}
1023
  @param lu: the logical unit on whose behalf we execute
1024
  @type instance: L{objects.Instance}
1025
  @param instance: the instance whose disks we should wipe
1026
  @param disks: see L{WipeDisks}
1027
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1028
      case of error
1029
  @raise errors.OpPrereqError: in case of failure
1030

1031
  """
1032
  try:
1033
    WipeDisks(lu, instance, disks=disks)
1034
  except errors.OpExecError:
1035
    logging.warning("Wiping disks for instance '%s' failed",
1036
                    instance.name)
1037
    _UndoCreateDisks(lu, cleanup)
1038
    raise
1039

    
1040

    
1041
def ExpandCheckDisks(instance, disks):
1042
  """Return the instance disks selected by the disks list
1043

1044
  @type disks: list of L{objects.Disk} or None
1045
  @param disks: selected disks
1046
  @rtype: list of L{objects.Disk}
1047
  @return: selected instance disks to act on
1048

1049
  """
1050
  if disks is None:
1051
    return instance.disks
1052
  else:
1053
    if not set(disks).issubset(instance.disks):
1054
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1055
                                   " target instance: expected a subset of %r,"
1056
                                   " got %r" % (instance.disks, disks))
1057
    return disks
1058

    
1059

    
1060
def WaitForSync(lu, instance, disks=None, oneshot=False):
1061
  """Sleep and poll for an instance's disk to sync.
1062

1063
  """
1064
  if not instance.disks or disks is not None and not disks:
1065
    return True
1066

    
1067
  disks = ExpandCheckDisks(instance, disks)
1068

    
1069
  if not oneshot:
1070
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1071

    
1072
  node = instance.primary_node
1073

    
1074
  for dev in disks:
1075
    lu.cfg.SetDiskID(dev, node)
1076

    
1077
  # TODO: Convert to utils.Retry
1078

    
1079
  retries = 0
1080
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1081
  while True:
1082
    max_time = 0
1083
    done = True
1084
    cumul_degraded = False
1085
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1086
    msg = rstats.fail_msg
1087
    if msg:
1088
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1089
      retries += 1
1090
      if retries >= 10:
1091
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1092
                                 " aborting." % node)
1093
      time.sleep(6)
1094
      continue
1095
    rstats = rstats.payload
1096
    retries = 0
1097
    for i, mstat in enumerate(rstats):
1098
      if mstat is None:
1099
        lu.LogWarning("Can't compute data for node %s/%s",
1100
                      node, disks[i].iv_name)
1101
        continue
1102

    
1103
      cumul_degraded = (cumul_degraded or
1104
                        (mstat.is_degraded and mstat.sync_percent is None))
1105
      if mstat.sync_percent is not None:
1106
        done = False
1107
        if mstat.estimated_time is not None:
1108
          rem_time = ("%s remaining (estimated)" %
1109
                      utils.FormatSeconds(mstat.estimated_time))
1110
          max_time = mstat.estimated_time
1111
        else:
1112
          rem_time = "no time estimate"
1113
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1114
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1115

    
1116
    # if we're done but degraded, let's do a few small retries, to
1117
    # make sure we see a stable and not transient situation; therefore
1118
    # we force restart of the loop
1119
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1120
      logging.info("Degraded disks found, %d retries left", degr_retries)
1121
      degr_retries -= 1
1122
      time.sleep(1)
1123
      continue
1124

    
1125
    if done or oneshot:
1126
      break
1127

    
1128
    time.sleep(min(60, max_time))
1129

    
1130
  if done:
1131
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1132

    
1133
  return not cumul_degraded
1134

    
1135

    
1136
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1137
  """Shutdown block devices of an instance.
1138

1139
  This does the shutdown on all nodes of the instance.
1140

1141
  If the ignore_primary is false, errors on the primary node are
1142
  ignored.
1143

1144
  """
1145
  all_result = True
1146

    
1147
  if disks is None:
1148
    # only mark instance disks as inactive if all disks are affected
1149
    lu.cfg.MarkInstanceDisksInactive(instance.name)
1150

    
1151
  disks = ExpandCheckDisks(instance, disks)
1152

    
1153
  for disk in disks:
1154
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1155
      lu.cfg.SetDiskID(top_disk, node)
1156
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1157
      msg = result.fail_msg
1158
      if msg:
1159
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1160
                      disk.iv_name, node, msg)
1161
        if ((node == instance.primary_node and not ignore_primary) or
1162
            (node != instance.primary_node and not result.offline)):
1163
          all_result = False
1164
  return all_result
1165

    
1166

    
1167
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1168
  """Shutdown block devices of an instance.
1169

1170
  This function checks if an instance is running, before calling
1171
  _ShutdownInstanceDisks.
1172

1173
  """
1174
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1175
  ShutdownInstanceDisks(lu, instance, disks=disks)
1176

    
1177

    
1178
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1179
                          ignore_size=False):
1180
  """Prepare the block devices for an instance.
1181

1182
  This sets up the block devices on all nodes.
1183

1184
  @type lu: L{LogicalUnit}
1185
  @param lu: the logical unit on whose behalf we execute
1186
  @type instance: L{objects.Instance}
1187
  @param instance: the instance for whose disks we assemble
1188
  @type disks: list of L{objects.Disk} or None
1189
  @param disks: which disks to assemble (or all, if None)
1190
  @type ignore_secondaries: boolean
1191
  @param ignore_secondaries: if true, errors on secondary nodes
1192
      won't result in an error return from the function
1193
  @type ignore_size: boolean
1194
  @param ignore_size: if true, the current known size of the disk
1195
      will not be used during the disk activation, useful for cases
1196
      when the size is wrong
1197
  @return: False if the operation failed, otherwise a list of
1198
      (host, instance_visible_name, node_visible_name)
1199
      with the mapping from node devices to instance devices
1200

1201
  """
1202
  device_info = []
1203
  disks_ok = True
1204
  iname = instance.name
1205

    
1206
  if disks is None:
1207
    # only mark instance disks as active if all disks are affected
1208
    lu.cfg.MarkInstanceDisksActive(iname)
1209

    
1210
  disks = ExpandCheckDisks(instance, disks)
1211

    
1212
  # With the two passes mechanism we try to reduce the window of
1213
  # opportunity for the race condition of switching DRBD to primary
1214
  # before handshaking occured, but we do not eliminate it
1215

    
1216
  # The proper fix would be to wait (with some limits) until the
1217
  # connection has been made and drbd transitions from WFConnection
1218
  # into any other network-connected state (Connected, SyncTarget,
1219
  # SyncSource, etc.)
1220

    
1221
  # 1st pass, assemble on all nodes in secondary mode
1222
  for idx, inst_disk in enumerate(disks):
1223
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1224
      if ignore_size:
1225
        node_disk = node_disk.Copy()
1226
        node_disk.UnsetSize()
1227
      lu.cfg.SetDiskID(node_disk, node)
1228
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1229
                                             False, idx)
1230
      msg = result.fail_msg
1231
      if msg:
1232
        is_offline_secondary = (node in instance.secondary_nodes and
1233
                                result.offline)
1234
        lu.LogWarning("Could not prepare block device %s on node %s"
1235
                      " (is_primary=False, pass=1): %s",
1236
                      inst_disk.iv_name, node, msg)
1237
        if not (ignore_secondaries or is_offline_secondary):
1238
          disks_ok = False
1239

    
1240
  # FIXME: race condition on drbd migration to primary
1241

    
1242
  # 2nd pass, do only the primary node
1243
  for idx, inst_disk in enumerate(disks):
1244
    dev_path = None
1245

    
1246
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1247
      if node != instance.primary_node:
1248
        continue
1249
      if ignore_size:
1250
        node_disk = node_disk.Copy()
1251
        node_disk.UnsetSize()
1252
      lu.cfg.SetDiskID(node_disk, node)
1253
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1254
                                             True, idx)
1255
      msg = result.fail_msg
1256
      if msg:
1257
        lu.LogWarning("Could not prepare block device %s on node %s"
1258
                      " (is_primary=True, pass=2): %s",
1259
                      inst_disk.iv_name, node, msg)
1260
        disks_ok = False
1261
      else:
1262
        dev_path, _ = result.payload
1263

    
1264
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1265

    
1266
  # leave the disks configured for the primary node
1267
  # this is a workaround that would be fixed better by
1268
  # improving the logical/physical id handling
1269
  for disk in disks:
1270
    lu.cfg.SetDiskID(disk, instance.primary_node)
1271

    
1272
  if not disks_ok:
1273
    lu.cfg.MarkInstanceDisksInactive(iname)
1274

    
1275
  return disks_ok, device_info
1276

    
1277

    
1278
def StartInstanceDisks(lu, instance, force):
1279
  """Start the disks of an instance.
1280

1281
  """
1282
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1283
                                      ignore_secondaries=force)
1284
  if not disks_ok:
1285
    ShutdownInstanceDisks(lu, instance)
1286
    if force is not None and not force:
1287
      lu.LogWarning("",
1288
                    hint=("If the message above refers to a secondary node,"
1289
                          " you can retry the operation using '--force'"))
1290
    raise errors.OpExecError("Disk consistency error")
1291

    
1292

    
1293
class LUInstanceGrowDisk(LogicalUnit):
1294
  """Grow a disk of an instance.
1295

1296
  """
1297
  HPATH = "disk-grow"
1298
  HTYPE = constants.HTYPE_INSTANCE
1299
  REQ_BGL = False
1300

    
1301
  def ExpandNames(self):
1302
    self._ExpandAndLockInstance()
1303
    self.needed_locks[locking.LEVEL_NODE] = []
1304
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1305
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1306
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1307

    
1308
  def DeclareLocks(self, level):
1309
    if level == locking.LEVEL_NODE:
1310
      self._LockInstancesNodes()
1311
    elif level == locking.LEVEL_NODE_RES:
1312
      # Copy node locks
1313
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1314
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1315

    
1316
  def BuildHooksEnv(self):
1317
    """Build hooks env.
1318

1319
    This runs on the master, the primary and all the secondaries.
1320

1321
    """
1322
    env = {
1323
      "DISK": self.op.disk,
1324
      "AMOUNT": self.op.amount,
1325
      "ABSOLUTE": self.op.absolute,
1326
      }
1327
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1328
    return env
1329

    
1330
  def BuildHooksNodes(self):
1331
    """Build hooks nodes.
1332

1333
    """
1334
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1335
    return (nl, nl)
1336

    
1337
  def CheckPrereq(self):
1338
    """Check prerequisites.
1339

1340
    This checks that the instance is in the cluster.
1341

1342
    """
1343
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1344
    assert instance is not None, \
1345
      "Cannot retrieve locked instance %s" % self.op.instance_name
1346
    nodenames = list(instance.all_nodes)
1347
    for node in nodenames:
1348
      CheckNodeOnline(self, node)
1349

    
1350
    self.instance = instance
1351

    
1352
    if instance.disk_template not in constants.DTS_GROWABLE:
1353
      raise errors.OpPrereqError("Instance's disk layout does not support"
1354
                                 " growing", errors.ECODE_INVAL)
1355

    
1356
    self.disk = instance.FindDisk(self.op.disk)
1357

    
1358
    if self.op.absolute:
1359
      self.target = self.op.amount
1360
      self.delta = self.target - self.disk.size
1361
      if self.delta < 0:
1362
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1363
                                   "current disk size (%s)" %
1364
                                   (utils.FormatUnit(self.target, "h"),
1365
                                    utils.FormatUnit(self.disk.size, "h")),
1366
                                   errors.ECODE_STATE)
1367
    else:
1368
      self.delta = self.op.amount
1369
      self.target = self.disk.size + self.delta
1370
      if self.delta < 0:
1371
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1372
                                   utils.FormatUnit(self.delta, "h"),
1373
                                   errors.ECODE_INVAL)
1374

    
1375
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1376

    
1377
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1378
    template = self.instance.disk_template
1379
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1380
      # TODO: check the free disk space for file, when that feature will be
1381
      # supported
1382
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1383
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1384
                        nodes)
1385
      if es_nodes:
1386
        # With exclusive storage we need to something smarter than just looking
1387
        # at free space; for now, let's simply abort the operation.
1388
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1389
                                   " is enabled", errors.ECODE_STATE)
1390
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1391

    
1392
  def Exec(self, feedback_fn):
1393
    """Execute disk grow.
1394

1395
    """
1396
    instance = self.instance
1397
    disk = self.disk
1398

    
1399
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1400
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1401
            self.owned_locks(locking.LEVEL_NODE_RES))
1402

    
1403
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1404

    
1405
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1406
    if not disks_ok:
1407
      raise errors.OpExecError("Cannot activate block device to grow")
1408

    
1409
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1410
                (self.op.disk, instance.name,
1411
                 utils.FormatUnit(self.delta, "h"),
1412
                 utils.FormatUnit(self.target, "h")))
1413

    
1414
    # First run all grow ops in dry-run mode
1415
    for node in instance.all_nodes:
1416
      self.cfg.SetDiskID(disk, node)
1417
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1418
                                           True, True)
1419
      result.Raise("Dry-run grow request failed to node %s" % node)
1420

    
1421
    if wipe_disks:
1422
      # Get disk size from primary node for wiping
1423
      self.cfg.SetDiskID(disk, instance.primary_node)
1424
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1425
      result.Raise("Failed to retrieve disk size from node '%s'" %
1426
                   instance.primary_node)
1427

    
1428
      (disk_size_in_bytes, ) = result.payload
1429

    
1430
      if disk_size_in_bytes is None:
1431
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1432
                                 " node '%s'" % instance.primary_node)
1433

    
1434
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1435

    
1436
      assert old_disk_size >= disk.size, \
1437
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1438
         (old_disk_size, disk.size))
1439
    else:
1440
      old_disk_size = None
1441

    
1442
    # We know that (as far as we can test) operations across different
1443
    # nodes will succeed, time to run it for real on the backing storage
1444
    for node in instance.all_nodes:
1445
      self.cfg.SetDiskID(disk, node)
1446
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1447
                                           False, True)
1448
      result.Raise("Grow request failed to node %s" % node)
1449

    
1450
    # And now execute it for logical storage, on the primary node
1451
    node = instance.primary_node
1452
    self.cfg.SetDiskID(disk, node)
1453
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1454
                                         False, False)
1455
    result.Raise("Grow request failed to node %s" % node)
1456

    
1457
    disk.RecordGrow(self.delta)
1458
    self.cfg.Update(instance, feedback_fn)
1459

    
1460
    # Changes have been recorded, release node lock
1461
    ReleaseLocks(self, locking.LEVEL_NODE)
1462

    
1463
    # Downgrade lock while waiting for sync
1464
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1465

    
1466
    assert wipe_disks ^ (old_disk_size is None)
1467

    
1468
    if wipe_disks:
1469
      assert instance.disks[self.op.disk] == disk
1470

    
1471
      # Wipe newly added disk space
1472
      WipeDisks(self, instance,
1473
                disks=[(self.op.disk, disk, old_disk_size)])
1474

    
1475
    if self.op.wait_for_sync:
1476
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1477
      if disk_abort:
1478
        self.LogWarning("Disk syncing has not returned a good status; check"
1479
                        " the instance")
1480
      if not instance.disks_active:
1481
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1482
    elif not instance.disks_active:
1483
      self.LogWarning("Not shutting down the disk even if the instance is"
1484
                      " not supposed to be running because no wait for"
1485
                      " sync mode was requested")
1486

    
1487
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1488
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1489

    
1490

    
1491
class LUInstanceReplaceDisks(LogicalUnit):
1492
  """Replace the disks of an instance.
1493

1494
  """
1495
  HPATH = "mirrors-replace"
1496
  HTYPE = constants.HTYPE_INSTANCE
1497
  REQ_BGL = False
1498

    
1499
  def CheckArguments(self):
1500
    """Check arguments.
1501

1502
    """
1503
    remote_node = self.op.remote_node
1504
    ialloc = self.op.iallocator
1505
    if self.op.mode == constants.REPLACE_DISK_CHG:
1506
      if remote_node is None and ialloc is None:
1507
        raise errors.OpPrereqError("When changing the secondary either an"
1508
                                   " iallocator script must be used or the"
1509
                                   " new node given", errors.ECODE_INVAL)
1510
      else:
1511
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1512

    
1513
    elif remote_node is not None or ialloc is not None:
1514
      # Not replacing the secondary
1515
      raise errors.OpPrereqError("The iallocator and new node options can"
1516
                                 " only be used when changing the"
1517
                                 " secondary node", errors.ECODE_INVAL)
1518

    
1519
  def ExpandNames(self):
1520
    self._ExpandAndLockInstance()
1521

    
1522
    assert locking.LEVEL_NODE not in self.needed_locks
1523
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1524
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1525

    
1526
    assert self.op.iallocator is None or self.op.remote_node is None, \
1527
      "Conflicting options"
1528

    
1529
    if self.op.remote_node is not None:
1530
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1531

    
1532
      # Warning: do not remove the locking of the new secondary here
1533
      # unless DRBD8.AddChildren is changed to work in parallel;
1534
      # currently it doesn't since parallel invocations of
1535
      # FindUnusedMinor will conflict
1536
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1537
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1538
    else:
1539
      self.needed_locks[locking.LEVEL_NODE] = []
1540
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1541

    
1542
      if self.op.iallocator is not None:
1543
        # iallocator will select a new node in the same group
1544
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1545
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1546

    
1547
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1548

    
1549
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1550
                                   self.op.iallocator, self.op.remote_node,
1551
                                   self.op.disks, self.op.early_release,
1552
                                   self.op.ignore_ipolicy)
1553

    
1554
    self.tasklets = [self.replacer]
1555

    
1556
  def DeclareLocks(self, level):
1557
    if level == locking.LEVEL_NODEGROUP:
1558
      assert self.op.remote_node is None
1559
      assert self.op.iallocator is not None
1560
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1561

    
1562
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1563
      # Lock all groups used by instance optimistically; this requires going
1564
      # via the node before it's locked, requiring verification later on
1565
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1566
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1567

    
1568
    elif level == locking.LEVEL_NODE:
1569
      if self.op.iallocator is not None:
1570
        assert self.op.remote_node is None
1571
        assert not self.needed_locks[locking.LEVEL_NODE]
1572
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1573

    
1574
        # Lock member nodes of all locked groups
1575
        self.needed_locks[locking.LEVEL_NODE] = \
1576
          [node_name
1577
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1578
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1579
      else:
1580
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1581

    
1582
        self._LockInstancesNodes()
1583

    
1584
    elif level == locking.LEVEL_NODE_RES:
1585
      # Reuse node locks
1586
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1587
        self.needed_locks[locking.LEVEL_NODE]
1588

    
1589
  def BuildHooksEnv(self):
1590
    """Build hooks env.
1591

1592
    This runs on the master, the primary and all the secondaries.
1593

1594
    """
1595
    instance = self.replacer.instance
1596
    env = {
1597
      "MODE": self.op.mode,
1598
      "NEW_SECONDARY": self.op.remote_node,
1599
      "OLD_SECONDARY": instance.secondary_nodes[0],
1600
      }
1601
    env.update(BuildInstanceHookEnvByObject(self, instance))
1602
    return env
1603

    
1604
  def BuildHooksNodes(self):
1605
    """Build hooks nodes.
1606

1607
    """
1608
    instance = self.replacer.instance
1609
    nl = [
1610
      self.cfg.GetMasterNode(),
1611
      instance.primary_node,
1612
      ]
1613
    if self.op.remote_node is not None:
1614
      nl.append(self.op.remote_node)
1615
    return nl, nl
1616

    
1617
  def CheckPrereq(self):
1618
    """Check prerequisites.
1619

1620
    """
1621
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1622
            self.op.iallocator is None)
1623

    
1624
    # Verify if node group locks are still correct
1625
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1626
    if owned_groups:
1627
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1628

    
1629
    return LogicalUnit.CheckPrereq(self)
1630

    
1631

    
1632
class LUInstanceActivateDisks(NoHooksLU):
1633
  """Bring up an instance's disks.
1634

1635
  """
1636
  REQ_BGL = False
1637

    
1638
  def ExpandNames(self):
1639
    self._ExpandAndLockInstance()
1640
    self.needed_locks[locking.LEVEL_NODE] = []
1641
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1642

    
1643
  def DeclareLocks(self, level):
1644
    if level == locking.LEVEL_NODE:
1645
      self._LockInstancesNodes()
1646

    
1647
  def CheckPrereq(self):
1648
    """Check prerequisites.
1649

1650
    This checks that the instance is in the cluster.
1651

1652
    """
1653
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1654
    assert self.instance is not None, \
1655
      "Cannot retrieve locked instance %s" % self.op.instance_name
1656
    CheckNodeOnline(self, self.instance.primary_node)
1657

    
1658
  def Exec(self, feedback_fn):
1659
    """Activate the disks.
1660

1661
    """
1662
    disks_ok, disks_info = \
1663
              AssembleInstanceDisks(self, self.instance,
1664
                                    ignore_size=self.op.ignore_size)
1665
    if not disks_ok:
1666
      raise errors.OpExecError("Cannot activate block devices")
1667

    
1668
    if self.op.wait_for_sync:
1669
      if not WaitForSync(self, self.instance):
1670
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1671
        raise errors.OpExecError("Some disks of the instance are degraded!")
1672

    
1673
    return disks_info
1674

    
1675

    
1676
class LUInstanceDeactivateDisks(NoHooksLU):
1677
  """Shutdown an instance's disks.
1678

1679
  """
1680
  REQ_BGL = False
1681

    
1682
  def ExpandNames(self):
1683
    self._ExpandAndLockInstance()
1684
    self.needed_locks[locking.LEVEL_NODE] = []
1685
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1686

    
1687
  def DeclareLocks(self, level):
1688
    if level == locking.LEVEL_NODE:
1689
      self._LockInstancesNodes()
1690

    
1691
  def CheckPrereq(self):
1692
    """Check prerequisites.
1693

1694
    This checks that the instance is in the cluster.
1695

1696
    """
1697
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1698
    assert self.instance is not None, \
1699
      "Cannot retrieve locked instance %s" % self.op.instance_name
1700

    
1701
  def Exec(self, feedback_fn):
1702
    """Deactivate the disks
1703

1704
    """
1705
    instance = self.instance
1706
    if self.op.force:
1707
      ShutdownInstanceDisks(self, instance)
1708
    else:
1709
      _SafeShutdownInstanceDisks(self, instance)
1710

    
1711

    
1712
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1713
                               ldisk=False):
1714
  """Check that mirrors are not degraded.
1715

1716
  @attention: The device has to be annotated already.
1717

1718
  The ldisk parameter, if True, will change the test from the
1719
  is_degraded attribute (which represents overall non-ok status for
1720
  the device(s)) to the ldisk (representing the local storage status).
1721

1722
  """
1723
  lu.cfg.SetDiskID(dev, node)
1724

    
1725
  result = True
1726

    
1727
  if on_primary or dev.AssembleOnSecondary():
1728
    rstats = lu.rpc.call_blockdev_find(node, dev)
1729
    msg = rstats.fail_msg
1730
    if msg:
1731
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1732
      result = False
1733
    elif not rstats.payload:
1734
      lu.LogWarning("Can't find disk on node %s", node)
1735
      result = False
1736
    else:
1737
      if ldisk:
1738
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1739
      else:
1740
        result = result and not rstats.payload.is_degraded
1741

    
1742
  if dev.children:
1743
    for child in dev.children:
1744
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1745
                                                     on_primary)
1746

    
1747
  return result
1748

    
1749

    
1750
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1751
  """Wrapper around L{_CheckDiskConsistencyInner}.
1752

1753
  """
1754
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1755
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1756
                                    ldisk=ldisk)
1757

    
1758

    
1759
def _BlockdevFind(lu, node, dev, instance):
1760
  """Wrapper around call_blockdev_find to annotate diskparams.
1761

1762
  @param lu: A reference to the lu object
1763
  @param node: The node to call out
1764
  @param dev: The device to find
1765
  @param instance: The instance object the device belongs to
1766
  @returns The result of the rpc call
1767

1768
  """
1769
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1770
  return lu.rpc.call_blockdev_find(node, disk)
1771

    
1772

    
1773
def _GenerateUniqueNames(lu, exts):
1774
  """Generate a suitable LV name.
1775

1776
  This will generate a logical volume name for the given instance.
1777

1778
  """
1779
  results = []
1780
  for val in exts:
1781
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1782
    results.append("%s%s" % (new_id, val))
1783
  return results
1784

    
1785

    
1786
class TLReplaceDisks(Tasklet):
1787
  """Replaces disks for an instance.
1788

1789
  Note: Locking is not within the scope of this class.
1790

1791
  """
1792
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1793
               disks, early_release, ignore_ipolicy):
1794
    """Initializes this class.
1795

1796
    """
1797
    Tasklet.__init__(self, lu)
1798

    
1799
    # Parameters
1800
    self.instance_name = instance_name
1801
    self.mode = mode
1802
    self.iallocator_name = iallocator_name
1803
    self.remote_node = remote_node
1804
    self.disks = disks
1805
    self.early_release = early_release
1806
    self.ignore_ipolicy = ignore_ipolicy
1807

    
1808
    # Runtime data
1809
    self.instance = None
1810
    self.new_node = None
1811
    self.target_node = None
1812
    self.other_node = None
1813
    self.remote_node_info = None
1814
    self.node_secondary_ip = None
1815

    
1816
  @staticmethod
1817
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1818
    """Compute a new secondary node using an IAllocator.
1819

1820
    """
1821
    req = iallocator.IAReqRelocate(name=instance_name,
1822
                                   relocate_from=list(relocate_from))
1823
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1824

    
1825
    ial.Run(iallocator_name)
1826

    
1827
    if not ial.success:
1828
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1829
                                 " %s" % (iallocator_name, ial.info),
1830
                                 errors.ECODE_NORES)
1831

    
1832
    remote_node_name = ial.result[0]
1833

    
1834
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1835
               instance_name, remote_node_name)
1836

    
1837
    return remote_node_name
1838

    
1839
  def _FindFaultyDisks(self, node_name):
1840
    """Wrapper for L{FindFaultyInstanceDisks}.
1841

1842
    """
1843
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1844
                                   node_name, True)
1845

    
1846
  def _CheckDisksActivated(self, instance):
1847
    """Checks if the instance disks are activated.
1848

1849
    @param instance: The instance to check disks
1850
    @return: True if they are activated, False otherwise
1851

1852
    """
1853
    nodes = instance.all_nodes
1854

    
1855
    for idx, dev in enumerate(instance.disks):
1856
      for node in nodes:
1857
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1858
        self.cfg.SetDiskID(dev, node)
1859

    
1860
        result = _BlockdevFind(self, node, dev, instance)
1861

    
1862
        if result.offline:
1863
          continue
1864
        elif result.fail_msg or not result.payload:
1865
          return False
1866

    
1867
    return True
1868

    
1869
  def CheckPrereq(self):
1870
    """Check prerequisites.
1871

1872
    This checks that the instance is in the cluster.
1873

1874
    """
1875
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1876
    assert instance is not None, \
1877
      "Cannot retrieve locked instance %s" % self.instance_name
1878

    
1879
    if instance.disk_template != constants.DT_DRBD8:
1880
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1881
                                 " instances", errors.ECODE_INVAL)
1882

    
1883
    if len(instance.secondary_nodes) != 1:
1884
      raise errors.OpPrereqError("The instance has a strange layout,"
1885
                                 " expected one secondary but found %d" %
1886
                                 len(instance.secondary_nodes),
1887
                                 errors.ECODE_FAULT)
1888

    
1889
    instance = self.instance
1890
    secondary_node = instance.secondary_nodes[0]
1891

    
1892
    if self.iallocator_name is None:
1893
      remote_node = self.remote_node
1894
    else:
1895
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1896
                                       instance.name, instance.secondary_nodes)
1897

    
1898
    if remote_node is None:
1899
      self.remote_node_info = None
1900
    else:
1901
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1902
             "Remote node '%s' is not locked" % remote_node
1903

    
1904
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1905
      assert self.remote_node_info is not None, \
1906
        "Cannot retrieve locked node %s" % remote_node
1907

    
1908
    if remote_node == self.instance.primary_node:
1909
      raise errors.OpPrereqError("The specified node is the primary node of"
1910
                                 " the instance", errors.ECODE_INVAL)
1911

    
1912
    if remote_node == secondary_node:
1913
      raise errors.OpPrereqError("The specified node is already the"
1914
                                 " secondary node of the instance",
1915
                                 errors.ECODE_INVAL)
1916

    
1917
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1918
                                    constants.REPLACE_DISK_CHG):
1919
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1920
                                 errors.ECODE_INVAL)
1921

    
1922
    if self.mode == constants.REPLACE_DISK_AUTO:
1923
      if not self._CheckDisksActivated(instance):
1924
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1925
                                   " first" % self.instance_name,
1926
                                   errors.ECODE_STATE)
1927
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1928
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1929

    
1930
      if faulty_primary and faulty_secondary:
1931
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1932
                                   " one node and can not be repaired"
1933
                                   " automatically" % self.instance_name,
1934
                                   errors.ECODE_STATE)
1935

    
1936
      if faulty_primary:
1937
        self.disks = faulty_primary
1938
        self.target_node = instance.primary_node
1939
        self.other_node = secondary_node
1940
        check_nodes = [self.target_node, self.other_node]
1941
      elif faulty_secondary:
1942
        self.disks = faulty_secondary
1943
        self.target_node = secondary_node
1944
        self.other_node = instance.primary_node
1945
        check_nodes = [self.target_node, self.other_node]
1946
      else:
1947
        self.disks = []
1948
        check_nodes = []
1949

    
1950
    else:
1951
      # Non-automatic modes
1952
      if self.mode == constants.REPLACE_DISK_PRI:
1953
        self.target_node = instance.primary_node
1954
        self.other_node = secondary_node
1955
        check_nodes = [self.target_node, self.other_node]
1956

    
1957
      elif self.mode == constants.REPLACE_DISK_SEC:
1958
        self.target_node = secondary_node
1959
        self.other_node = instance.primary_node
1960
        check_nodes = [self.target_node, self.other_node]
1961

    
1962
      elif self.mode == constants.REPLACE_DISK_CHG:
1963
        self.new_node = remote_node
1964
        self.other_node = instance.primary_node
1965
        self.target_node = secondary_node
1966
        check_nodes = [self.new_node, self.other_node]
1967

    
1968
        CheckNodeNotDrained(self.lu, remote_node)
1969
        CheckNodeVmCapable(self.lu, remote_node)
1970

    
1971
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1972
        assert old_node_info is not None
1973
        if old_node_info.offline and not self.early_release:
1974
          # doesn't make sense to delay the release
1975
          self.early_release = True
1976
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1977
                          " early-release mode", secondary_node)
1978

    
1979
      else:
1980
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1981
                                     self.mode)
1982

    
1983
      # If not specified all disks should be replaced
1984
      if not self.disks:
1985
        self.disks = range(len(self.instance.disks))
1986

    
1987
    # TODO: This is ugly, but right now we can't distinguish between internal
1988
    # submitted opcode and external one. We should fix that.
1989
    if self.remote_node_info:
1990
      # We change the node, lets verify it still meets instance policy
1991
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1992
      cluster = self.cfg.GetClusterInfo()
1993
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1994
                                                              new_group_info)
1995
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1996
                             self.cfg, ignore=self.ignore_ipolicy)
1997

    
1998
    for node in check_nodes:
1999
      CheckNodeOnline(self.lu, node)
2000

    
2001
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2002
                                                          self.other_node,
2003
                                                          self.target_node]
2004
                              if node_name is not None)
2005

    
2006
    # Release unneeded node and node resource locks
2007
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2008
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2009
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2010

    
2011
    # Release any owned node group
2012
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2013

    
2014
    # Check whether disks are valid
2015
    for disk_idx in self.disks:
2016
      instance.FindDisk(disk_idx)
2017

    
2018
    # Get secondary node IP addresses
2019
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2020
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2021

    
2022
  def Exec(self, feedback_fn):
2023
    """Execute disk replacement.
2024

2025
    This dispatches the disk replacement to the appropriate handler.
2026

2027
    """
2028
    if __debug__:
2029
      # Verify owned locks before starting operation
2030
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2031
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2032
          ("Incorrect node locks, owning %s, expected %s" %
2033
           (owned_nodes, self.node_secondary_ip.keys()))
2034
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2035
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2036
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2037

    
2038
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2039
      assert list(owned_instances) == [self.instance_name], \
2040
          "Instance '%s' not locked" % self.instance_name
2041

    
2042
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2043
          "Should not own any node group lock at this point"
2044

    
2045
    if not self.disks:
2046
      feedback_fn("No disks need replacement for instance '%s'" %
2047
                  self.instance.name)
2048
      return
2049

    
2050
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2051
                (utils.CommaJoin(self.disks), self.instance.name))
2052
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2053
    feedback_fn("Current seconary node: %s" %
2054
                utils.CommaJoin(self.instance.secondary_nodes))
2055

    
2056
    activate_disks = not self.instance.disks_active
2057

    
2058
    # Activate the instance disks if we're replacing them on a down instance
2059
    if activate_disks:
2060
      StartInstanceDisks(self.lu, self.instance, True)
2061

    
2062
    try:
2063
      # Should we replace the secondary node?
2064
      if self.new_node is not None:
2065
        fn = self._ExecDrbd8Secondary
2066
      else:
2067
        fn = self._ExecDrbd8DiskOnly
2068

    
2069
      result = fn(feedback_fn)
2070
    finally:
2071
      # Deactivate the instance disks if we're replacing them on a
2072
      # down instance
2073
      if activate_disks:
2074
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2075

    
2076
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2077

    
2078
    if __debug__:
2079
      # Verify owned locks
2080
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2081
      nodes = frozenset(self.node_secondary_ip)
2082
      assert ((self.early_release and not owned_nodes) or
2083
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2084
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2085
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2086

    
2087
    return result
2088

    
2089
  def _CheckVolumeGroup(self, nodes):
2090
    self.lu.LogInfo("Checking volume groups")
2091

    
2092
    vgname = self.cfg.GetVGName()
2093

    
2094
    # Make sure volume group exists on all involved nodes
2095
    results = self.rpc.call_vg_list(nodes)
2096
    if not results:
2097
      raise errors.OpExecError("Can't list volume groups on the nodes")
2098

    
2099
    for node in nodes:
2100
      res = results[node]
2101
      res.Raise("Error checking node %s" % node)
2102
      if vgname not in res.payload:
2103
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2104
                                 (vgname, node))
2105

    
2106
  def _CheckDisksExistence(self, nodes):
2107
    # Check disk existence
2108
    for idx, dev in enumerate(self.instance.disks):
2109
      if idx not in self.disks:
2110
        continue
2111

    
2112
      for node in nodes:
2113
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2114
        self.cfg.SetDiskID(dev, node)
2115

    
2116
        result = _BlockdevFind(self, node, dev, self.instance)
2117

    
2118
        msg = result.fail_msg
2119
        if msg or not result.payload:
2120
          if not msg:
2121
            msg = "disk not found"
2122
          if not self._CheckDisksActivated(self.instance):
2123
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2124
                          " running activate-disks on the instance before"
2125
                          " using replace-disks.")
2126
          else:
2127
            extra_hint = ""
2128
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2129
                                   (idx, node, msg, extra_hint))
2130

    
2131
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2132
    for idx, dev in enumerate(self.instance.disks):
2133
      if idx not in self.disks:
2134
        continue
2135

    
2136
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2137
                      (idx, node_name))
2138

    
2139
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2140
                                  on_primary, ldisk=ldisk):
2141
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2142
                                 " replace disks for instance %s" %
2143
                                 (node_name, self.instance.name))
2144

    
2145
  def _CreateNewStorage(self, node_name):
2146
    """Create new storage on the primary or secondary node.
2147

2148
    This is only used for same-node replaces, not for changing the
2149
    secondary node, hence we don't want to modify the existing disk.
2150

2151
    """
2152
    iv_names = {}
2153

    
2154
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2155
    for idx, dev in enumerate(disks):
2156
      if idx not in self.disks:
2157
        continue
2158

    
2159
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2160

    
2161
      self.cfg.SetDiskID(dev, node_name)
2162

    
2163
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2164
      names = _GenerateUniqueNames(self.lu, lv_names)
2165

    
2166
      (data_disk, meta_disk) = dev.children
2167
      vg_data = data_disk.logical_id[0]
2168
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2169
                             logical_id=(vg_data, names[0]),
2170
                             params=data_disk.params)
2171
      vg_meta = meta_disk.logical_id[0]
2172
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2173
                             size=constants.DRBD_META_SIZE,
2174
                             logical_id=(vg_meta, names[1]),
2175
                             params=meta_disk.params)
2176

    
2177
      new_lvs = [lv_data, lv_meta]
2178
      old_lvs = [child.Copy() for child in dev.children]
2179
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2180
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2181

    
2182
      # we pass force_create=True to force the LVM creation
2183
      for new_lv in new_lvs:
2184
        try:
2185
          _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2186
                               GetInstanceInfoText(self.instance), False,
2187
                               excl_stor)
2188
        except errors.DeviceCreationError, e:
2189
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2190

    
2191
    return iv_names
2192

    
2193
  def _CheckDevices(self, node_name, iv_names):
2194
    for name, (dev, _, _) in iv_names.iteritems():
2195
      self.cfg.SetDiskID(dev, node_name)
2196

    
2197
      result = _BlockdevFind(self, node_name, dev, self.instance)
2198

    
2199
      msg = result.fail_msg
2200
      if msg or not result.payload:
2201
        if not msg:
2202
          msg = "disk not found"
2203
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2204
                                 (name, msg))
2205

    
2206
      if result.payload.is_degraded:
2207
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2208

    
2209
  def _RemoveOldStorage(self, node_name, iv_names):
2210
    for name, (_, old_lvs, _) in iv_names.iteritems():
2211
      self.lu.LogInfo("Remove logical volumes for %s", name)
2212

    
2213
      for lv in old_lvs:
2214
        self.cfg.SetDiskID(lv, node_name)
2215

    
2216
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2217
        if msg:
2218
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2219
                             hint="remove unused LVs manually")
2220

    
2221
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2222
    """Replace a disk on the primary or secondary for DRBD 8.
2223

2224
    The algorithm for replace is quite complicated:
2225

2226
      1. for each disk to be replaced:
2227

2228
        1. create new LVs on the target node with unique names
2229
        1. detach old LVs from the drbd device
2230
        1. rename old LVs to name_replaced.<time_t>
2231
        1. rename new LVs to old LVs
2232
        1. attach the new LVs (with the old names now) to the drbd device
2233

2234
      1. wait for sync across all devices
2235

2236
      1. for each modified disk:
2237

2238
        1. remove old LVs (which have the name name_replaces.<time_t>)
2239

2240
    Failures are not very well handled.
2241

2242
    """
2243
    steps_total = 6
2244

    
2245
    # Step: check device activation
2246
    self.lu.LogStep(1, steps_total, "Check device existence")
2247
    self._CheckDisksExistence([self.other_node, self.target_node])
2248
    self._CheckVolumeGroup([self.target_node, self.other_node])
2249

    
2250
    # Step: check other node consistency
2251
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2252
    self._CheckDisksConsistency(self.other_node,
2253
                                self.other_node == self.instance.primary_node,
2254
                                False)
2255

    
2256
    # Step: create new storage
2257
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2258
    iv_names = self._CreateNewStorage(self.target_node)
2259

    
2260
    # Step: for each lv, detach+rename*2+attach
2261
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2262
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2263
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2264

    
2265
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2266
                                                     old_lvs)
2267
      result.Raise("Can't detach drbd from local storage on node"
2268
                   " %s for device %s" % (self.target_node, dev.iv_name))
2269
      #dev.children = []
2270
      #cfg.Update(instance)
2271

    
2272
      # ok, we created the new LVs, so now we know we have the needed
2273
      # storage; as such, we proceed on the target node to rename
2274
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2275
      # using the assumption that logical_id == physical_id (which in
2276
      # turn is the unique_id on that node)
2277

    
2278
      # FIXME(iustin): use a better name for the replaced LVs
2279
      temp_suffix = int(time.time())
2280
      ren_fn = lambda d, suff: (d.physical_id[0],
2281
                                d.physical_id[1] + "_replaced-%s" % suff)
2282

    
2283
      # Build the rename list based on what LVs exist on the node
2284
      rename_old_to_new = []
2285
      for to_ren in old_lvs:
2286
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2287
        if not result.fail_msg and result.payload:
2288
          # device exists
2289
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2290

    
2291
      self.lu.LogInfo("Renaming the old LVs on the target node")
2292
      result = self.rpc.call_blockdev_rename(self.target_node,
2293
                                             rename_old_to_new)
2294
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2295

    
2296
      # Now we rename the new LVs to the old LVs
2297
      self.lu.LogInfo("Renaming the new LVs on the target node")
2298
      rename_new_to_old = [(new, old.physical_id)
2299
                           for old, new in zip(old_lvs, new_lvs)]
2300
      result = self.rpc.call_blockdev_rename(self.target_node,
2301
                                             rename_new_to_old)
2302
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2303

    
2304
      # Intermediate steps of in memory modifications
2305
      for old, new in zip(old_lvs, new_lvs):
2306
        new.logical_id = old.logical_id
2307
        self.cfg.SetDiskID(new, self.target_node)
2308

    
2309
      # We need to modify old_lvs so that removal later removes the
2310
      # right LVs, not the newly added ones; note that old_lvs is a
2311
      # copy here
2312
      for disk in old_lvs:
2313
        disk.logical_id = ren_fn(disk, temp_suffix)
2314
        self.cfg.SetDiskID(disk, self.target_node)
2315

    
2316
      # Now that the new lvs have the old name, we can add them to the device
2317
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2318
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2319
                                                  (dev, self.instance), new_lvs)
2320
      msg = result.fail_msg
2321
      if msg:
2322
        for new_lv in new_lvs:
2323
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2324
                                               new_lv).fail_msg
2325
          if msg2:
2326
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2327
                               hint=("cleanup manually the unused logical"
2328
                                     "volumes"))
2329
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2330

    
2331
    cstep = itertools.count(5)
2332

    
2333
    if self.early_release:
2334
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2335
      self._RemoveOldStorage(self.target_node, iv_names)
2336
      # TODO: Check if releasing locks early still makes sense
2337
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2338
    else:
2339
      # Release all resource locks except those used by the instance
2340
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2341
                   keep=self.node_secondary_ip.keys())
2342

    
2343
    # Release all node locks while waiting for sync
2344
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2345

    
2346
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2347
    # shutdown in the caller into consideration.
2348

    
2349
    # Wait for sync
2350
    # This can fail as the old devices are degraded and _WaitForSync
2351
    # does a combined result over all disks, so we don't check its return value
2352
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2353
    WaitForSync(self.lu, self.instance)
2354

    
2355
    # Check all devices manually
2356
    self._CheckDevices(self.instance.primary_node, iv_names)
2357

    
2358
    # Step: remove old storage
2359
    if not self.early_release:
2360
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2361
      self._RemoveOldStorage(self.target_node, iv_names)
2362

    
2363
  def _ExecDrbd8Secondary(self, feedback_fn):
2364
    """Replace the secondary node for DRBD 8.
2365

2366
    The algorithm for replace is quite complicated:
2367
      - for all disks of the instance:
2368
        - create new LVs on the new node with same names
2369
        - shutdown the drbd device on the old secondary
2370
        - disconnect the drbd network on the primary
2371
        - create the drbd device on the new secondary
2372
        - network attach the drbd on the primary, using an artifice:
2373
          the drbd code for Attach() will connect to the network if it
2374
          finds a device which is connected to the good local disks but
2375
          not network enabled
2376
      - wait for sync across all devices
2377
      - remove all disks from the old secondary
2378

2379
    Failures are not very well handled.
2380

2381
    """
2382
    steps_total = 6
2383

    
2384
    pnode = self.instance.primary_node
2385

    
2386
    # Step: check device activation
2387
    self.lu.LogStep(1, steps_total, "Check device existence")
2388
    self._CheckDisksExistence([self.instance.primary_node])
2389
    self._CheckVolumeGroup([self.instance.primary_node])
2390

    
2391
    # Step: check other node consistency
2392
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2393
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2394

    
2395
    # Step: create new storage
2396
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2397
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2398
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2399
    for idx, dev in enumerate(disks):
2400
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2401
                      (self.new_node, idx))
2402
      # we pass force_create=True to force LVM creation
2403
      for new_lv in dev.children:
2404
        try:
2405
          _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2406
                               True, GetInstanceInfoText(self.instance), False,
2407
                               excl_stor)
2408
        except errors.DeviceCreationError, e:
2409
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2410

    
2411
    # Step 4: dbrd minors and drbd setups changes
2412
    # after this, we must manually remove the drbd minors on both the
2413
    # error and the success paths
2414
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2415
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2416
                                         for dev in self.instance.disks],
2417
                                        self.instance.name)
2418
    logging.debug("Allocated minors %r", minors)
2419

    
2420
    iv_names = {}
2421
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2422
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2423
                      (self.new_node, idx))
2424
      # create new devices on new_node; note that we create two IDs:
2425
      # one without port, so the drbd will be activated without
2426
      # networking information on the new node at this stage, and one
2427
      # with network, for the latter activation in step 4
2428
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2429
      if self.instance.primary_node == o_node1:
2430
        p_minor = o_minor1
2431
      else:
2432
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2433
        p_minor = o_minor2
2434

    
2435
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2436
                      p_minor, new_minor, o_secret)
2437
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2438
                    p_minor, new_minor, o_secret)
2439

    
2440
      iv_names[idx] = (dev, dev.children, new_net_id)
2441
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2442
                    new_net_id)
2443
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2444
                              logical_id=new_alone_id,
2445
                              children=dev.children,
2446
                              size=dev.size,
2447
                              params={})
2448
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2449
                                            self.cfg)
2450
      try:
2451
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2452
                             anno_new_drbd,
2453
                             GetInstanceInfoText(self.instance), False,
2454
                             excl_stor)
2455
      except errors.GenericError:
2456
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2457
        raise
2458

    
2459
    # We have new devices, shutdown the drbd on the old secondary
2460
    for idx, dev in enumerate(self.instance.disks):
2461
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2462
      self.cfg.SetDiskID(dev, self.target_node)
2463
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2464
                                            (dev, self.instance)).fail_msg
2465
      if msg:
2466
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2467
                           "node: %s" % (idx, msg),
2468
                           hint=("Please cleanup this device manually as"
2469
                                 " soon as possible"))
2470

    
2471
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2472
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2473
                                               self.instance.disks)[pnode]
2474

    
2475
    msg = result.fail_msg
2476
    if msg:
2477
      # detaches didn't succeed (unlikely)
2478
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2479
      raise errors.OpExecError("Can't detach the disks from the network on"
2480
                               " old node: %s" % (msg,))
2481

    
2482
    # if we managed to detach at least one, we update all the disks of
2483
    # the instance to point to the new secondary
2484
    self.lu.LogInfo("Updating instance configuration")
2485
    for dev, _, new_logical_id in iv_names.itervalues():
2486
      dev.logical_id = new_logical_id
2487
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2488

    
2489
    self.cfg.Update(self.instance, feedback_fn)
2490

    
2491
    # Release all node locks (the configuration has been updated)
2492
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2493

    
2494
    # and now perform the drbd attach
2495
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2496
                    " (standalone => connected)")
2497
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2498
                                            self.new_node],
2499
                                           self.node_secondary_ip,
2500
                                           (self.instance.disks, self.instance),
2501
                                           self.instance.name,
2502
                                           False)
2503
    for to_node, to_result in result.items():
2504
      msg = to_result.fail_msg
2505
      if msg:
2506
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2507
                           to_node, msg,
2508
                           hint=("please do a gnt-instance info to see the"
2509
                                 " status of disks"))
2510

    
2511
    cstep = itertools.count(5)
2512

    
2513
    if self.early_release:
2514
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2515
      self._RemoveOldStorage(self.target_node, iv_names)
2516
      # TODO: Check if releasing locks early still makes sense
2517
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2518
    else:
2519
      # Release all resource locks except those used by the instance
2520
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2521
                   keep=self.node_secondary_ip.keys())
2522

    
2523
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2524
    # shutdown in the caller into consideration.
2525

    
2526
    # Wait for sync
2527
    # This can fail as the old devices are degraded and _WaitForSync
2528
    # does a combined result over all disks, so we don't check its return value
2529
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2530
    WaitForSync(self.lu, self.instance)
2531

    
2532
    # Check all devices manually
2533
    self._CheckDevices(self.instance.primary_node, iv_names)
2534

    
2535
    # Step: remove old storage
2536
    if not self.early_release:
2537
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2538
      self._RemoveOldStorage(self.target_node, iv_names)