Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ e8e50805

History | View | Annotate | Download (93.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    if constants.IDISK_METAVG in disk:
347
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348
    if constants.IDISK_ADOPT in disk:
349
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350

    
351
    # For extstorage, demand the `provider' option and add any
352
    # additional parameters (ext-params) to the dict
353
    if op.disk_template == constants.DT_EXT:
354
      if ext_provider:
355
        new_disk[constants.IDISK_PROVIDER] = ext_provider
356
        for key in disk:
357
          if key not in constants.IDISK_PARAMS:
358
            new_disk[key] = disk[key]
359
      else:
360
        raise errors.OpPrereqError("Missing provider for template '%s'" %
361
                                   constants.DT_EXT, errors.ECODE_INVAL)
362

    
363
    disks.append(new_disk)
364

    
365
  return disks
366

    
367

    
368
def CheckRADOSFreeSpace():
369
  """Compute disk size requirements inside the RADOS cluster.
370

371
  """
372
  # For the RADOS cluster we assume there is always enough space.
373
  pass
374

    
375

    
376
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377
                         iv_name, p_minor, s_minor):
378
  """Generate a drbd8 device complete with its children.
379

380
  """
381
  assert len(vgnames) == len(names) == 2
382
  port = lu.cfg.AllocatePort()
383
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384

    
385
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386
                          logical_id=(vgnames[0], names[0]),
387
                          params={})
388
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
390
                          size=constants.DRBD_META_SIZE,
391
                          logical_id=(vgnames[1], names[1]),
392
                          params={})
393
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395
                          logical_id=(primary, secondary, port,
396
                                      p_minor, s_minor,
397
                                      shared_secret),
398
                          children=[dev_data, dev_meta],
399
                          iv_name=iv_name, params={})
400
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401
  return drbd_dev
402

    
403

    
404
def GenerateDiskTemplate(
405
  lu, template_name, instance_name, primary_node, secondary_nodes,
406
  disk_info, file_storage_dir, file_driver, base_index,
407
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409
  """Generate the entire disk layout for a given template type.
410

411
  """
412
  vgname = lu.cfg.GetVGName()
413
  disk_count = len(disk_info)
414
  disks = []
415

    
416
  if template_name == constants.DT_DISKLESS:
417
    pass
418
  elif template_name == constants.DT_DRBD8:
419
    if len(secondary_nodes) != 1:
420
      raise errors.ProgrammerError("Wrong template configuration")
421
    remote_node = secondary_nodes[0]
422
    minors = lu.cfg.AllocateDRBDMinor(
423
      [primary_node, remote_node] * len(disk_info), instance_name)
424

    
425
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426
                                                       full_disk_params)
427
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428

    
429
    names = []
430
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431
                                               for i in range(disk_count)]):
432
      names.append(lv_prefix + "_data")
433
      names.append(lv_prefix + "_meta")
434
    for idx, disk in enumerate(disk_info):
435
      disk_index = idx + base_index
436
      data_vg = disk.get(constants.IDISK_VG, vgname)
437
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439
                                      disk[constants.IDISK_SIZE],
440
                                      [data_vg, meta_vg],
441
                                      names[idx * 2:idx * 2 + 2],
442
                                      "disk/%d" % disk_index,
443
                                      minors[idx * 2], minors[idx * 2 + 1])
444
      disk_dev.mode = disk[constants.IDISK_MODE]
445
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
446
      disks.append(disk_dev)
447
  else:
448
    if secondary_nodes:
449
      raise errors.ProgrammerError("Wrong template configuration")
450

    
451
    if template_name == constants.DT_FILE:
452
      _req_file_storage()
453
    elif template_name == constants.DT_SHARED_FILE:
454
      _req_shr_file_storage()
455

    
456
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457
    if name_prefix is None:
458
      names = None
459
    else:
460
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461
                                        (name_prefix, base_index + i)
462
                                        for i in range(disk_count)])
463

    
464
    if template_name == constants.DT_PLAIN:
465

    
466
      def logical_id_fn(idx, _, disk):
467
        vg = disk.get(constants.IDISK_VG, vgname)
468
        return (vg, names[idx])
469

    
470
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471
      logical_id_fn = \
472
        lambda _, disk_index, disk: (file_driver,
473
                                     "%s/disk%d" % (file_storage_dir,
474
                                                    disk_index))
475
    elif template_name == constants.DT_BLOCK:
476
      logical_id_fn = \
477
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478
                                       disk[constants.IDISK_ADOPT])
479
    elif template_name == constants.DT_RBD:
480
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481
    elif template_name == constants.DT_EXT:
482
      def logical_id_fn(idx, _, disk):
483
        provider = disk.get(constants.IDISK_PROVIDER, None)
484
        if provider is None:
485
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486
                                       " not found", constants.DT_EXT,
487
                                       constants.IDISK_PROVIDER)
488
        return (provider, names[idx])
489
    else:
490
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491

    
492
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493

    
494
    for idx, disk in enumerate(disk_info):
495
      params = {}
496
      # Only for the Ext template add disk_info to params
497
      if template_name == constants.DT_EXT:
498
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499
        for key in disk:
500
          if key not in constants.IDISK_PARAMS:
501
            params[key] = disk[key]
502
      disk_index = idx + base_index
503
      size = disk[constants.IDISK_SIZE]
504
      feedback_fn("* disk %s, size %s" %
505
                  (disk_index, utils.FormatUnit(size, "h")))
506
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
507
                              logical_id=logical_id_fn(idx, disk_index, disk),
508
                              iv_name="disk/%d" % disk_index,
509
                              mode=disk[constants.IDISK_MODE],
510
                              params=params)
511
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
512
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513
      disks.append(disk_dev)
514

    
515
  return disks
516

    
517

    
518
class LUInstanceRecreateDisks(LogicalUnit):
519
  """Recreate an instance's missing disks.
520

521
  """
522
  HPATH = "instance-recreate-disks"
523
  HTYPE = constants.HTYPE_INSTANCE
524
  REQ_BGL = False
525

    
526
  _MODIFYABLE = compat.UniqueFrozenset([
527
    constants.IDISK_SIZE,
528
    constants.IDISK_MODE,
529
    ])
530

    
531
  # New or changed disk parameters may have different semantics
532
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533
    constants.IDISK_ADOPT,
534

    
535
    # TODO: Implement support changing VG while recreating
536
    constants.IDISK_VG,
537
    constants.IDISK_METAVG,
538
    constants.IDISK_PROVIDER,
539
    constants.IDISK_NAME,
540
    ]))
541

    
542
  def _RunAllocator(self):
543
    """Run the allocator based on input opcode.
544

545
    """
546
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
547

    
548
    # FIXME
549
    # The allocator should actually run in "relocate" mode, but current
550
    # allocators don't support relocating all the nodes of an instance at
551
    # the same time. As a workaround we use "allocate" mode, but this is
552
    # suboptimal for two reasons:
553
    # - The instance name passed to the allocator is present in the list of
554
    #   existing instances, so there could be a conflict within the
555
    #   internal structures of the allocator. This doesn't happen with the
556
    #   current allocators, but it's a liability.
557
    # - The allocator counts the resources used by the instance twice: once
558
    #   because the instance exists already, and once because it tries to
559
    #   allocate a new instance.
560
    # The allocator could choose some of the nodes on which the instance is
561
    # running, but that's not a problem. If the instance nodes are broken,
562
    # they should be already be marked as drained or offline, and hence
563
    # skipped by the allocator. If instance disks have been lost for other
564
    # reasons, then recreating the disks on the same nodes should be fine.
565
    disk_template = self.instance.disk_template
566
    spindle_use = be_full[constants.BE_SPINDLE_USE]
567
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
568
                                        disk_template=disk_template,
569
                                        tags=list(self.instance.GetTags()),
570
                                        os=self.instance.os,
571
                                        nics=[{}],
572
                                        vcpus=be_full[constants.BE_VCPUS],
573
                                        memory=be_full[constants.BE_MAXMEM],
574
                                        spindle_use=spindle_use,
575
                                        disks=[{constants.IDISK_SIZE: d.size,
576
                                                constants.IDISK_MODE: d.mode}
577
                                               for d in self.instance.disks],
578
                                        hypervisor=self.instance.hypervisor,
579
                                        node_whitelist=None)
580
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
581

    
582
    ial.Run(self.op.iallocator)
583

    
584
    assert req.RequiredNodes() == len(self.instance.all_nodes)
585

    
586
    if not ial.success:
587
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
588
                                 " %s" % (self.op.iallocator, ial.info),
589
                                 errors.ECODE_NORES)
590

    
591
    self.op.nodes = ial.result
592
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
593
                 self.op.instance_name, self.op.iallocator,
594
                 utils.CommaJoin(ial.result))
595

    
596
  def CheckArguments(self):
597
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
598
      # Normalize and convert deprecated list of disk indices
599
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
600

    
601
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
602
    if duplicates:
603
      raise errors.OpPrereqError("Some disks have been specified more than"
604
                                 " once: %s" % utils.CommaJoin(duplicates),
605
                                 errors.ECODE_INVAL)
606

    
607
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
608
    # when neither iallocator nor nodes are specified
609
    if self.op.iallocator or self.op.nodes:
610
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
611

    
612
    for (idx, params) in self.op.disks:
613
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
614
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
615
      if unsupported:
616
        raise errors.OpPrereqError("Parameters for disk %s try to change"
617
                                   " unmodifyable parameter(s): %s" %
618
                                   (idx, utils.CommaJoin(unsupported)),
619
                                   errors.ECODE_INVAL)
620

    
621
  def ExpandNames(self):
622
    self._ExpandAndLockInstance()
623
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
624

    
625
    if self.op.nodes:
626
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
627
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
628
    else:
629
      self.needed_locks[locking.LEVEL_NODE] = []
630
      if self.op.iallocator:
631
        # iallocator will select a new node in the same group
632
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
633
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
634

    
635
    self.needed_locks[locking.LEVEL_NODE_RES] = []
636

    
637
  def DeclareLocks(self, level):
638
    if level == locking.LEVEL_NODEGROUP:
639
      assert self.op.iallocator is not None
640
      assert not self.op.nodes
641
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
642
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
643
      # Lock the primary group used by the instance optimistically; this
644
      # requires going via the node before it's locked, requiring
645
      # verification later on
646
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
647
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
648

    
649
    elif level == locking.LEVEL_NODE:
650
      # If an allocator is used, then we lock all the nodes in the current
651
      # instance group, as we don't know yet which ones will be selected;
652
      # if we replace the nodes without using an allocator, locks are
653
      # already declared in ExpandNames; otherwise, we need to lock all the
654
      # instance nodes for disk re-creation
655
      if self.op.iallocator:
656
        assert not self.op.nodes
657
        assert not self.needed_locks[locking.LEVEL_NODE]
658
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
659

    
660
        # Lock member nodes of the group of the primary node
661
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
662
          self.needed_locks[locking.LEVEL_NODE].extend(
663
            self.cfg.GetNodeGroup(group_uuid).members)
664

    
665
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
666
      elif not self.op.nodes:
667
        self._LockInstancesNodes(primary_only=False)
668
    elif level == locking.LEVEL_NODE_RES:
669
      # Copy node locks
670
      self.needed_locks[locking.LEVEL_NODE_RES] = \
671
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
672

    
673
  def BuildHooksEnv(self):
674
    """Build hooks env.
675

676
    This runs on master, primary and secondary nodes of the instance.
677

678
    """
679
    return BuildInstanceHookEnvByObject(self, self.instance)
680

    
681
  def BuildHooksNodes(self):
682
    """Build hooks nodes.
683

684
    """
685
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
686
    return (nl, nl)
687

    
688
  def CheckPrereq(self):
689
    """Check prerequisites.
690

691
    This checks that the instance is in the cluster and is not running.
692

693
    """
694
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
695
    assert instance is not None, \
696
      "Cannot retrieve locked instance %s" % self.op.instance_name
697
    if self.op.nodes:
698
      if len(self.op.nodes) != len(instance.all_nodes):
699
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
700
                                   " %d replacement nodes were specified" %
701
                                   (instance.name, len(instance.all_nodes),
702
                                    len(self.op.nodes)),
703
                                   errors.ECODE_INVAL)
704
      assert instance.disk_template != constants.DT_DRBD8 or \
705
             len(self.op.nodes) == 2
706
      assert instance.disk_template != constants.DT_PLAIN or \
707
             len(self.op.nodes) == 1
708
      primary_node = self.op.nodes[0]
709
    else:
710
      primary_node = instance.primary_node
711
    if not self.op.iallocator:
712
      CheckNodeOnline(self, primary_node)
713

    
714
    if instance.disk_template == constants.DT_DISKLESS:
715
      raise errors.OpPrereqError("Instance '%s' has no disks" %
716
                                 self.op.instance_name, errors.ECODE_INVAL)
717

    
718
    # Verify if node group locks are still correct
719
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
720
    if owned_groups:
721
      # Node group locks are acquired only for the primary node (and only
722
      # when the allocator is used)
723
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
724
                              primary_only=True)
725

    
726
    # if we replace nodes *and* the old primary is offline, we don't
727
    # check the instance state
728
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
729
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
730
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
731
                         msg="cannot recreate disks")
732

    
733
    if self.op.disks:
734
      self.disks = dict(self.op.disks)
735
    else:
736
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
737

    
738
    maxidx = max(self.disks.keys())
739
    if maxidx >= len(instance.disks):
740
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
741
                                 errors.ECODE_INVAL)
742

    
743
    if ((self.op.nodes or self.op.iallocator) and
744
         sorted(self.disks.keys()) != range(len(instance.disks))):
745
      raise errors.OpPrereqError("Can't recreate disks partially and"
746
                                 " change the nodes at the same time",
747
                                 errors.ECODE_INVAL)
748

    
749
    self.instance = instance
750

    
751
    if self.op.iallocator:
752
      self._RunAllocator()
753
      # Release unneeded node and node resource locks
754
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
755
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
756
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
757

    
758
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
759

    
760
  def Exec(self, feedback_fn):
761
    """Recreate the disks.
762

763
    """
764
    instance = self.instance
765

    
766
    assert (self.owned_locks(locking.LEVEL_NODE) ==
767
            self.owned_locks(locking.LEVEL_NODE_RES))
768

    
769
    to_skip = []
770
    mods = [] # keeps track of needed changes
771

    
772
    for idx, disk in enumerate(instance.disks):
773
      try:
774
        changes = self.disks[idx]
775
      except KeyError:
776
        # Disk should not be recreated
777
        to_skip.append(idx)
778
        continue
779

    
780
      # update secondaries for disks, if needed
781
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
782
        # need to update the nodes and minors
783
        assert len(self.op.nodes) == 2
784
        assert len(disk.logical_id) == 6 # otherwise disk internals
785
                                         # have changed
786
        (_, _, old_port, _, _, old_secret) = disk.logical_id
787
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
788
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
789
                  new_minors[0], new_minors[1], old_secret)
790
        assert len(disk.logical_id) == len(new_id)
791
      else:
792
        new_id = None
793

    
794
      mods.append((idx, new_id, changes))
795

    
796
    # now that we have passed all asserts above, we can apply the mods
797
    # in a single run (to avoid partial changes)
798
    for idx, new_id, changes in mods:
799
      disk = instance.disks[idx]
800
      if new_id is not None:
801
        assert disk.dev_type == constants.LD_DRBD8
802
        disk.logical_id = new_id
803
      if changes:
804
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
805
                    mode=changes.get(constants.IDISK_MODE, None))
806

    
807
    # change primary node, if needed
808
    if self.op.nodes:
809
      instance.primary_node = self.op.nodes[0]
810
      self.LogWarning("Changing the instance's nodes, you will have to"
811
                      " remove any disks left on the older nodes manually")
812

    
813
    if self.op.nodes:
814
      self.cfg.Update(instance, feedback_fn)
815

    
816
    # All touched nodes must be locked
817
    mylocks = self.owned_locks(locking.LEVEL_NODE)
818
    assert mylocks.issuperset(frozenset(instance.all_nodes))
819
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
820

    
821
    # TODO: Release node locks before wiping, or explain why it's not possible
822
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
823
      wipedisks = [(idx, disk, 0)
824
                   for (idx, disk) in enumerate(instance.disks)
825
                   if idx not in to_skip]
826
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
827

    
828

    
829
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830
  """Checks if nodes have enough free disk space in the specified VG.
831

832
  This function checks if all given nodes have the needed amount of
833
  free disk. In case any node has less disk or we cannot get the
834
  information from the node, this function raises an OpPrereqError
835
  exception.
836

837
  @type lu: C{LogicalUnit}
838
  @param lu: a logical unit from which we get configuration data
839
  @type nodenames: C{list}
840
  @param nodenames: the list of node names to check
841
  @type vg: C{str}
842
  @param vg: the volume group to check
843
  @type requested: C{int}
844
  @param requested: the amount of disk in MiB to check for
845
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
846
      or we cannot check the node
847

848
  """
849
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
850
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
851
  for node in nodenames:
852
    info = nodeinfo[node]
853
    info.Raise("Cannot get current information from node %s" % node,
854
               prereq=True, ecode=errors.ECODE_ENVIRON)
855
    (_, (vg_info, ), _) = info.payload
856
    vg_free = vg_info.get("vg_free", None)
857
    if not isinstance(vg_free, int):
858
      raise errors.OpPrereqError("Can't compute free disk space on node"
859
                                 " %s for vg %s, result was '%s'" %
860
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
861
    if requested > vg_free:
862
      raise errors.OpPrereqError("Not enough disk space on target node %s"
863
                                 " vg %s: required %d MiB, available %d MiB" %
864
                                 (node, vg, requested, vg_free),
865
                                 errors.ECODE_NORES)
866

    
867

    
868
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
869
  """Checks if nodes have enough free disk space in all the VGs.
870

871
  This function checks if all given nodes have the needed amount of
872
  free disk. In case any node has less disk or we cannot get the
873
  information from the node, this function raises an OpPrereqError
874
  exception.
875

876
  @type lu: C{LogicalUnit}
877
  @param lu: a logical unit from which we get configuration data
878
  @type nodenames: C{list}
879
  @param nodenames: the list of node names to check
880
  @type req_sizes: C{dict}
881
  @param req_sizes: the hash of vg and corresponding amount of disk in
882
      MiB to check for
883
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
884
      or we cannot check the node
885

886
  """
887
  for vg, req_size in req_sizes.items():
888
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
889

    
890

    
891
def _DiskSizeInBytesToMebibytes(lu, size):
892
  """Converts a disk size in bytes to mebibytes.
893

894
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
895

896
  """
897
  (mib, remainder) = divmod(size, 1024 * 1024)
898

    
899
  if remainder != 0:
900
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
901
                  " to not overwrite existing data (%s bytes will not be"
902
                  " wiped)", (1024 * 1024) - remainder)
903
    mib += 1
904

    
905
  return mib
906

    
907

    
908
def _CalcEta(time_taken, written, total_size):
909
  """Calculates the ETA based on size written and total size.
910

911
  @param time_taken: The time taken so far
912
  @param written: amount written so far
913
  @param total_size: The total size of data to be written
914
  @return: The remaining time in seconds
915

916
  """
917
  avg_time = time_taken / float(written)
918
  return (total_size - written) * avg_time
919

    
920

    
921
def WipeDisks(lu, instance, disks=None):
922
  """Wipes instance disks.
923

924
  @type lu: L{LogicalUnit}
925
  @param lu: the logical unit on whose behalf we execute
926
  @type instance: L{objects.Instance}
927
  @param instance: the instance whose disks we should create
928
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
929
  @param disks: Disk details; tuple contains disk index, disk object and the
930
    start offset
931

932
  """
933
  node = instance.primary_node
934

    
935
  if disks is None:
936
    disks = [(idx, disk, 0)
937
             for (idx, disk) in enumerate(instance.disks)]
938

    
939
  for (_, device, _) in disks:
940
    lu.cfg.SetDiskID(device, node)
941

    
942
  logging.info("Pausing synchronization of disks of instance '%s'",
943
               instance.name)
944
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
945
                                                  (map(compat.snd, disks),
946
                                                   instance),
947
                                                  True)
948
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
949

    
950
  for idx, success in enumerate(result.payload):
951
    if not success:
952
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
953
                   " failed", idx, instance.name)
954

    
955
  try:
956
    for (idx, device, offset) in disks:
957
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
958
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
959
      wipe_chunk_size = \
960
        int(min(constants.MAX_WIPE_CHUNK,
961
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
962

    
963
      size = device.size
964
      last_output = 0
965
      start_time = time.time()
966

    
967
      if offset == 0:
968
        info_text = ""
969
      else:
970
        info_text = (" (from %s to %s)" %
971
                     (utils.FormatUnit(offset, "h"),
972
                      utils.FormatUnit(size, "h")))
973

    
974
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
975

    
976
      logging.info("Wiping disk %d for instance %s on node %s using"
977
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
978

    
979
      while offset < size:
980
        wipe_size = min(wipe_chunk_size, size - offset)
981

    
982
        logging.debug("Wiping disk %d, offset %s, chunk %s",
983
                      idx, offset, wipe_size)
984

    
985
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
986
                                           wipe_size)
987
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
988
                     (idx, offset, wipe_size))
989

    
990
        now = time.time()
991
        offset += wipe_size
992
        if now - last_output >= 60:
993
          eta = _CalcEta(now - start_time, offset, size)
994
          lu.LogInfo(" - done: %.1f%% ETA: %s",
995
                     offset / float(size) * 100, utils.FormatSeconds(eta))
996
          last_output = now
997
  finally:
998
    logging.info("Resuming synchronization of disks for instance '%s'",
999
                 instance.name)
1000

    
1001
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1002
                                                    (map(compat.snd, disks),
1003
                                                     instance),
1004
                                                    False)
1005

    
1006
    if result.fail_msg:
1007
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1008
                    node, result.fail_msg)
1009
    else:
1010
      for idx, success in enumerate(result.payload):
1011
        if not success:
1012
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1013
                        " failed", idx, instance.name)
1014

    
1015

    
1016
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1017
  """Wrapper for L{WipeDisks} that handles errors.
1018

1019
  @type lu: L{LogicalUnit}
1020
  @param lu: the logical unit on whose behalf we execute
1021
  @type instance: L{objects.Instance}
1022
  @param instance: the instance whose disks we should wipe
1023
  @param disks: see L{WipeDisks}
1024
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1025
      case of error
1026
  @raise errors.OpPrereqError: in case of failure
1027

1028
  """
1029
  try:
1030
    WipeDisks(lu, instance, disks=disks)
1031
  except errors.OpExecError:
1032
    logging.warning("Wiping disks for instance '%s' failed",
1033
                    instance.name)
1034
    _UndoCreateDisks(lu, cleanup)
1035
    raise
1036

    
1037

    
1038
def ExpandCheckDisks(instance, disks):
1039
  """Return the instance disks selected by the disks list
1040

1041
  @type disks: list of L{objects.Disk} or None
1042
  @param disks: selected disks
1043
  @rtype: list of L{objects.Disk}
1044
  @return: selected instance disks to act on
1045

1046
  """
1047
  if disks is None:
1048
    return instance.disks
1049
  else:
1050
    if not set(disks).issubset(instance.disks):
1051
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1052
                                   " target instance: expected a subset of %r,"
1053
                                   " got %r" % (instance.disks, disks))
1054
    return disks
1055

    
1056

    
1057
def WaitForSync(lu, instance, disks=None, oneshot=False):
1058
  """Sleep and poll for an instance's disk to sync.
1059

1060
  """
1061
  if not instance.disks or disks is not None and not disks:
1062
    return True
1063

    
1064
  disks = ExpandCheckDisks(instance, disks)
1065

    
1066
  if not oneshot:
1067
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1068

    
1069
  node = instance.primary_node
1070

    
1071
  for dev in disks:
1072
    lu.cfg.SetDiskID(dev, node)
1073

    
1074
  # TODO: Convert to utils.Retry
1075

    
1076
  retries = 0
1077
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1078
  while True:
1079
    max_time = 0
1080
    done = True
1081
    cumul_degraded = False
1082
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1083
    msg = rstats.fail_msg
1084
    if msg:
1085
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1086
      retries += 1
1087
      if retries >= 10:
1088
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1089
                                 " aborting." % node)
1090
      time.sleep(6)
1091
      continue
1092
    rstats = rstats.payload
1093
    retries = 0
1094
    for i, mstat in enumerate(rstats):
1095
      if mstat is None:
1096
        lu.LogWarning("Can't compute data for node %s/%s",
1097
                      node, disks[i].iv_name)
1098
        continue
1099

    
1100
      cumul_degraded = (cumul_degraded or
1101
                        (mstat.is_degraded and mstat.sync_percent is None))
1102
      if mstat.sync_percent is not None:
1103
        done = False
1104
        if mstat.estimated_time is not None:
1105
          rem_time = ("%s remaining (estimated)" %
1106
                      utils.FormatSeconds(mstat.estimated_time))
1107
          max_time = mstat.estimated_time
1108
        else:
1109
          rem_time = "no time estimate"
1110
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1111
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1112

    
1113
    # if we're done but degraded, let's do a few small retries, to
1114
    # make sure we see a stable and not transient situation; therefore
1115
    # we force restart of the loop
1116
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1117
      logging.info("Degraded disks found, %d retries left", degr_retries)
1118
      degr_retries -= 1
1119
      time.sleep(1)
1120
      continue
1121

    
1122
    if done or oneshot:
1123
      break
1124

    
1125
    time.sleep(min(60, max_time))
1126

    
1127
  if done:
1128
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1129

    
1130
  return not cumul_degraded
1131

    
1132

    
1133
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1134
  """Shutdown block devices of an instance.
1135

1136
  This does the shutdown on all nodes of the instance.
1137

1138
  If the ignore_primary is false, errors on the primary node are
1139
  ignored.
1140

1141
  """
1142
  all_result = True
1143

    
1144
  if disks is None:
1145
    # only mark instance disks as inactive if all disks are affected
1146
    lu.cfg.MarkInstanceDisksInactive(instance.name)
1147

    
1148
  disks = ExpandCheckDisks(instance, disks)
1149

    
1150
  for disk in disks:
1151
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1152
      lu.cfg.SetDiskID(top_disk, node)
1153
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1154
      msg = result.fail_msg
1155
      if msg:
1156
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1157
                      disk.iv_name, node, msg)
1158
        if ((node == instance.primary_node and not ignore_primary) or
1159
            (node != instance.primary_node and not result.offline)):
1160
          all_result = False
1161
  return all_result
1162

    
1163

    
1164
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1165
  """Shutdown block devices of an instance.
1166

1167
  This function checks if an instance is running, before calling
1168
  _ShutdownInstanceDisks.
1169

1170
  """
1171
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1172
  ShutdownInstanceDisks(lu, instance, disks=disks)
1173

    
1174

    
1175
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1176
                          ignore_size=False):
1177
  """Prepare the block devices for an instance.
1178

1179
  This sets up the block devices on all nodes.
1180

1181
  @type lu: L{LogicalUnit}
1182
  @param lu: the logical unit on whose behalf we execute
1183
  @type instance: L{objects.Instance}
1184
  @param instance: the instance for whose disks we assemble
1185
  @type disks: list of L{objects.Disk} or None
1186
  @param disks: which disks to assemble (or all, if None)
1187
  @type ignore_secondaries: boolean
1188
  @param ignore_secondaries: if true, errors on secondary nodes
1189
      won't result in an error return from the function
1190
  @type ignore_size: boolean
1191
  @param ignore_size: if true, the current known size of the disk
1192
      will not be used during the disk activation, useful for cases
1193
      when the size is wrong
1194
  @return: False if the operation failed, otherwise a list of
1195
      (host, instance_visible_name, node_visible_name)
1196
      with the mapping from node devices to instance devices
1197

1198
  """
1199
  device_info = []
1200
  disks_ok = True
1201
  iname = instance.name
1202

    
1203
  if disks is None:
1204
    # only mark instance disks as active if all disks are affected
1205
    lu.cfg.MarkInstanceDisksActive(iname)
1206

    
1207
  disks = ExpandCheckDisks(instance, disks)
1208

    
1209
  # With the two passes mechanism we try to reduce the window of
1210
  # opportunity for the race condition of switching DRBD to primary
1211
  # before handshaking occured, but we do not eliminate it
1212

    
1213
  # The proper fix would be to wait (with some limits) until the
1214
  # connection has been made and drbd transitions from WFConnection
1215
  # into any other network-connected state (Connected, SyncTarget,
1216
  # SyncSource, etc.)
1217

    
1218
  # 1st pass, assemble on all nodes in secondary mode
1219
  for idx, inst_disk in enumerate(disks):
1220
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1221
      if ignore_size:
1222
        node_disk = node_disk.Copy()
1223
        node_disk.UnsetSize()
1224
      lu.cfg.SetDiskID(node_disk, node)
1225
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1226
                                             False, idx)
1227
      msg = result.fail_msg
1228
      if msg:
1229
        is_offline_secondary = (node in instance.secondary_nodes and
1230
                                result.offline)
1231
        lu.LogWarning("Could not prepare block device %s on node %s"
1232
                      " (is_primary=False, pass=1): %s",
1233
                      inst_disk.iv_name, node, msg)
1234
        if not (ignore_secondaries or is_offline_secondary):
1235
          disks_ok = False
1236

    
1237
  # FIXME: race condition on drbd migration to primary
1238

    
1239
  # 2nd pass, do only the primary node
1240
  for idx, inst_disk in enumerate(disks):
1241
    dev_path = None
1242

    
1243
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1244
      if node != instance.primary_node:
1245
        continue
1246
      if ignore_size:
1247
        node_disk = node_disk.Copy()
1248
        node_disk.UnsetSize()
1249
      lu.cfg.SetDiskID(node_disk, node)
1250
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1251
                                             True, idx)
1252
      msg = result.fail_msg
1253
      if msg:
1254
        lu.LogWarning("Could not prepare block device %s on node %s"
1255
                      " (is_primary=True, pass=2): %s",
1256
                      inst_disk.iv_name, node, msg)
1257
        disks_ok = False
1258
      else:
1259
        dev_path, _ = result.payload
1260

    
1261
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1262

    
1263
  # leave the disks configured for the primary node
1264
  # this is a workaround that would be fixed better by
1265
  # improving the logical/physical id handling
1266
  for disk in disks:
1267
    lu.cfg.SetDiskID(disk, instance.primary_node)
1268

    
1269
  if not disks_ok:
1270
    lu.cfg.MarkInstanceDisksInactive(iname)
1271

    
1272
  return disks_ok, device_info
1273

    
1274

    
1275
def StartInstanceDisks(lu, instance, force):
1276
  """Start the disks of an instance.
1277

1278
  """
1279
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1280
                                      ignore_secondaries=force)
1281
  if not disks_ok:
1282
    ShutdownInstanceDisks(lu, instance)
1283
    if force is not None and not force:
1284
      lu.LogWarning("",
1285
                    hint=("If the message above refers to a secondary node,"
1286
                          " you can retry the operation using '--force'"))
1287
    raise errors.OpExecError("Disk consistency error")
1288

    
1289

    
1290
class LUInstanceGrowDisk(LogicalUnit):
1291
  """Grow a disk of an instance.
1292

1293
  """
1294
  HPATH = "disk-grow"
1295
  HTYPE = constants.HTYPE_INSTANCE
1296
  REQ_BGL = False
1297

    
1298
  def ExpandNames(self):
1299
    self._ExpandAndLockInstance()
1300
    self.needed_locks[locking.LEVEL_NODE] = []
1301
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1302
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1303
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1304

    
1305
  def DeclareLocks(self, level):
1306
    if level == locking.LEVEL_NODE:
1307
      self._LockInstancesNodes()
1308
    elif level == locking.LEVEL_NODE_RES:
1309
      # Copy node locks
1310
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1311
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1312

    
1313
  def BuildHooksEnv(self):
1314
    """Build hooks env.
1315

1316
    This runs on the master, the primary and all the secondaries.
1317

1318
    """
1319
    env = {
1320
      "DISK": self.op.disk,
1321
      "AMOUNT": self.op.amount,
1322
      "ABSOLUTE": self.op.absolute,
1323
      }
1324
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1325
    return env
1326

    
1327
  def BuildHooksNodes(self):
1328
    """Build hooks nodes.
1329

1330
    """
1331
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1332
    return (nl, nl)
1333

    
1334
  def CheckPrereq(self):
1335
    """Check prerequisites.
1336

1337
    This checks that the instance is in the cluster.
1338

1339
    """
1340
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1341
    assert instance is not None, \
1342
      "Cannot retrieve locked instance %s" % self.op.instance_name
1343
    nodenames = list(instance.all_nodes)
1344
    for node in nodenames:
1345
      CheckNodeOnline(self, node)
1346

    
1347
    self.instance = instance
1348

    
1349
    if instance.disk_template not in constants.DTS_GROWABLE:
1350
      raise errors.OpPrereqError("Instance's disk layout does not support"
1351
                                 " growing", errors.ECODE_INVAL)
1352

    
1353
    self.disk = instance.FindDisk(self.op.disk)
1354

    
1355
    if self.op.absolute:
1356
      self.target = self.op.amount
1357
      self.delta = self.target - self.disk.size
1358
      if self.delta < 0:
1359
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1360
                                   "current disk size (%s)" %
1361
                                   (utils.FormatUnit(self.target, "h"),
1362
                                    utils.FormatUnit(self.disk.size, "h")),
1363
                                   errors.ECODE_STATE)
1364
    else:
1365
      self.delta = self.op.amount
1366
      self.target = self.disk.size + self.delta
1367
      if self.delta < 0:
1368
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1369
                                   utils.FormatUnit(self.delta, "h"),
1370
                                   errors.ECODE_INVAL)
1371

    
1372
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1373

    
1374
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1375
    template = self.instance.disk_template
1376
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1377
      # TODO: check the free disk space for file, when that feature will be
1378
      # supported
1379
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1380
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1381
                        nodes)
1382
      if es_nodes:
1383
        # With exclusive storage we need to something smarter than just looking
1384
        # at free space; for now, let's simply abort the operation.
1385
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1386
                                   " is enabled", errors.ECODE_STATE)
1387
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1388

    
1389
  def Exec(self, feedback_fn):
1390
    """Execute disk grow.
1391

1392
    """
1393
    instance = self.instance
1394
    disk = self.disk
1395

    
1396
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1397
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1398
            self.owned_locks(locking.LEVEL_NODE_RES))
1399

    
1400
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1401

    
1402
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1403
    if not disks_ok:
1404
      raise errors.OpExecError("Cannot activate block device to grow")
1405

    
1406
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1407
                (self.op.disk, instance.name,
1408
                 utils.FormatUnit(self.delta, "h"),
1409
                 utils.FormatUnit(self.target, "h")))
1410

    
1411
    # First run all grow ops in dry-run mode
1412
    for node in instance.all_nodes:
1413
      self.cfg.SetDiskID(disk, node)
1414
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1415
                                           True, True)
1416
      result.Raise("Dry-run grow request failed to node %s" % node)
1417

    
1418
    if wipe_disks:
1419
      # Get disk size from primary node for wiping
1420
      self.cfg.SetDiskID(disk, instance.primary_node)
1421
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1422
      result.Raise("Failed to retrieve disk size from node '%s'" %
1423
                   instance.primary_node)
1424

    
1425
      (disk_size_in_bytes, ) = result.payload
1426

    
1427
      if disk_size_in_bytes is None:
1428
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1429
                                 " node '%s'" % instance.primary_node)
1430

    
1431
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1432

    
1433
      assert old_disk_size >= disk.size, \
1434
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1435
         (old_disk_size, disk.size))
1436
    else:
1437
      old_disk_size = None
1438

    
1439
    # We know that (as far as we can test) operations across different
1440
    # nodes will succeed, time to run it for real on the backing storage
1441
    for node in instance.all_nodes:
1442
      self.cfg.SetDiskID(disk, node)
1443
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1444
                                           False, True)
1445
      result.Raise("Grow request failed to node %s" % node)
1446

    
1447
    # And now execute it for logical storage, on the primary node
1448
    node = instance.primary_node
1449
    self.cfg.SetDiskID(disk, node)
1450
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1451
                                         False, False)
1452
    result.Raise("Grow request failed to node %s" % node)
1453

    
1454
    disk.RecordGrow(self.delta)
1455
    self.cfg.Update(instance, feedback_fn)
1456

    
1457
    # Changes have been recorded, release node lock
1458
    ReleaseLocks(self, locking.LEVEL_NODE)
1459

    
1460
    # Downgrade lock while waiting for sync
1461
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1462

    
1463
    assert wipe_disks ^ (old_disk_size is None)
1464

    
1465
    if wipe_disks:
1466
      assert instance.disks[self.op.disk] == disk
1467

    
1468
      # Wipe newly added disk space
1469
      WipeDisks(self, instance,
1470
                disks=[(self.op.disk, disk, old_disk_size)])
1471

    
1472
    if self.op.wait_for_sync:
1473
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1474
      if disk_abort:
1475
        self.LogWarning("Disk syncing has not returned a good status; check"
1476
                        " the instance")
1477
      if not instance.disks_active:
1478
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1479
    elif not instance.disks_active:
1480
      self.LogWarning("Not shutting down the disk even if the instance is"
1481
                      " not supposed to be running because no wait for"
1482
                      " sync mode was requested")
1483

    
1484
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1485
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1486

    
1487

    
1488
class LUInstanceReplaceDisks(LogicalUnit):
1489
  """Replace the disks of an instance.
1490

1491
  """
1492
  HPATH = "mirrors-replace"
1493
  HTYPE = constants.HTYPE_INSTANCE
1494
  REQ_BGL = False
1495

    
1496
  def CheckArguments(self):
1497
    """Check arguments.
1498

1499
    """
1500
    remote_node = self.op.remote_node
1501
    ialloc = self.op.iallocator
1502
    if self.op.mode == constants.REPLACE_DISK_CHG:
1503
      if remote_node is None and ialloc is None:
1504
        raise errors.OpPrereqError("When changing the secondary either an"
1505
                                   " iallocator script must be used or the"
1506
                                   " new node given", errors.ECODE_INVAL)
1507
      else:
1508
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1509

    
1510
    elif remote_node is not None or ialloc is not None:
1511
      # Not replacing the secondary
1512
      raise errors.OpPrereqError("The iallocator and new node options can"
1513
                                 " only be used when changing the"
1514
                                 " secondary node", errors.ECODE_INVAL)
1515

    
1516
  def ExpandNames(self):
1517
    self._ExpandAndLockInstance()
1518

    
1519
    assert locking.LEVEL_NODE not in self.needed_locks
1520
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1521
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1522

    
1523
    assert self.op.iallocator is None or self.op.remote_node is None, \
1524
      "Conflicting options"
1525

    
1526
    if self.op.remote_node is not None:
1527
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1528

    
1529
      # Warning: do not remove the locking of the new secondary here
1530
      # unless DRBD8.AddChildren is changed to work in parallel;
1531
      # currently it doesn't since parallel invocations of
1532
      # FindUnusedMinor will conflict
1533
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1534
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1535
    else:
1536
      self.needed_locks[locking.LEVEL_NODE] = []
1537
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1538

    
1539
      if self.op.iallocator is not None:
1540
        # iallocator will select a new node in the same group
1541
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1542
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1543

    
1544
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1545

    
1546
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1547
                                   self.op.iallocator, self.op.remote_node,
1548
                                   self.op.disks, self.op.early_release,
1549
                                   self.op.ignore_ipolicy)
1550

    
1551
    self.tasklets = [self.replacer]
1552

    
1553
  def DeclareLocks(self, level):
1554
    if level == locking.LEVEL_NODEGROUP:
1555
      assert self.op.remote_node is None
1556
      assert self.op.iallocator is not None
1557
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1558

    
1559
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1560
      # Lock all groups used by instance optimistically; this requires going
1561
      # via the node before it's locked, requiring verification later on
1562
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1563
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1564

    
1565
    elif level == locking.LEVEL_NODE:
1566
      if self.op.iallocator is not None:
1567
        assert self.op.remote_node is None
1568
        assert not self.needed_locks[locking.LEVEL_NODE]
1569
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1570

    
1571
        # Lock member nodes of all locked groups
1572
        self.needed_locks[locking.LEVEL_NODE] = \
1573
          [node_name
1574
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1575
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1576
      else:
1577
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1578

    
1579
        self._LockInstancesNodes()
1580

    
1581
    elif level == locking.LEVEL_NODE_RES:
1582
      # Reuse node locks
1583
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1584
        self.needed_locks[locking.LEVEL_NODE]
1585

    
1586
  def BuildHooksEnv(self):
1587
    """Build hooks env.
1588

1589
    This runs on the master, the primary and all the secondaries.
1590

1591
    """
1592
    instance = self.replacer.instance
1593
    env = {
1594
      "MODE": self.op.mode,
1595
      "NEW_SECONDARY": self.op.remote_node,
1596
      "OLD_SECONDARY": instance.secondary_nodes[0],
1597
      }
1598
    env.update(BuildInstanceHookEnvByObject(self, instance))
1599
    return env
1600

    
1601
  def BuildHooksNodes(self):
1602
    """Build hooks nodes.
1603

1604
    """
1605
    instance = self.replacer.instance
1606
    nl = [
1607
      self.cfg.GetMasterNode(),
1608
      instance.primary_node,
1609
      ]
1610
    if self.op.remote_node is not None:
1611
      nl.append(self.op.remote_node)
1612
    return nl, nl
1613

    
1614
  def CheckPrereq(self):
1615
    """Check prerequisites.
1616

1617
    """
1618
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1619
            self.op.iallocator is None)
1620

    
1621
    # Verify if node group locks are still correct
1622
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1623
    if owned_groups:
1624
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1625

    
1626
    return LogicalUnit.CheckPrereq(self)
1627

    
1628

    
1629
class LUInstanceActivateDisks(NoHooksLU):
1630
  """Bring up an instance's disks.
1631

1632
  """
1633
  REQ_BGL = False
1634

    
1635
  def ExpandNames(self):
1636
    self._ExpandAndLockInstance()
1637
    self.needed_locks[locking.LEVEL_NODE] = []
1638
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1639

    
1640
  def DeclareLocks(self, level):
1641
    if level == locking.LEVEL_NODE:
1642
      self._LockInstancesNodes()
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    This checks that the instance is in the cluster.
1648

1649
    """
1650
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651
    assert self.instance is not None, \
1652
      "Cannot retrieve locked instance %s" % self.op.instance_name
1653
    CheckNodeOnline(self, self.instance.primary_node)
1654

    
1655
  def Exec(self, feedback_fn):
1656
    """Activate the disks.
1657

1658
    """
1659
    disks_ok, disks_info = \
1660
              AssembleInstanceDisks(self, self.instance,
1661
                                    ignore_size=self.op.ignore_size)
1662
    if not disks_ok:
1663
      raise errors.OpExecError("Cannot activate block devices")
1664

    
1665
    if self.op.wait_for_sync:
1666
      if not WaitForSync(self, self.instance):
1667
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1668
        raise errors.OpExecError("Some disks of the instance are degraded!")
1669

    
1670
    return disks_info
1671

    
1672

    
1673
class LUInstanceDeactivateDisks(NoHooksLU):
1674
  """Shutdown an instance's disks.
1675

1676
  """
1677
  REQ_BGL = False
1678

    
1679
  def ExpandNames(self):
1680
    self._ExpandAndLockInstance()
1681
    self.needed_locks[locking.LEVEL_NODE] = []
1682
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1683

    
1684
  def DeclareLocks(self, level):
1685
    if level == locking.LEVEL_NODE:
1686
      self._LockInstancesNodes()
1687

    
1688
  def CheckPrereq(self):
1689
    """Check prerequisites.
1690

1691
    This checks that the instance is in the cluster.
1692

1693
    """
1694
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1695
    assert self.instance is not None, \
1696
      "Cannot retrieve locked instance %s" % self.op.instance_name
1697

    
1698
  def Exec(self, feedback_fn):
1699
    """Deactivate the disks
1700

1701
    """
1702
    instance = self.instance
1703
    if self.op.force:
1704
      ShutdownInstanceDisks(self, instance)
1705
    else:
1706
      _SafeShutdownInstanceDisks(self, instance)
1707

    
1708

    
1709
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1710
                               ldisk=False):
1711
  """Check that mirrors are not degraded.
1712

1713
  @attention: The device has to be annotated already.
1714

1715
  The ldisk parameter, if True, will change the test from the
1716
  is_degraded attribute (which represents overall non-ok status for
1717
  the device(s)) to the ldisk (representing the local storage status).
1718

1719
  """
1720
  lu.cfg.SetDiskID(dev, node)
1721

    
1722
  result = True
1723

    
1724
  if on_primary or dev.AssembleOnSecondary():
1725
    rstats = lu.rpc.call_blockdev_find(node, dev)
1726
    msg = rstats.fail_msg
1727
    if msg:
1728
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1729
      result = False
1730
    elif not rstats.payload:
1731
      lu.LogWarning("Can't find disk on node %s", node)
1732
      result = False
1733
    else:
1734
      if ldisk:
1735
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1736
      else:
1737
        result = result and not rstats.payload.is_degraded
1738

    
1739
  if dev.children:
1740
    for child in dev.children:
1741
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1742
                                                     on_primary)
1743

    
1744
  return result
1745

    
1746

    
1747
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1748
  """Wrapper around L{_CheckDiskConsistencyInner}.
1749

1750
  """
1751
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1752
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1753
                                    ldisk=ldisk)
1754

    
1755

    
1756
def _BlockdevFind(lu, node, dev, instance):
1757
  """Wrapper around call_blockdev_find to annotate diskparams.
1758

1759
  @param lu: A reference to the lu object
1760
  @param node: The node to call out
1761
  @param dev: The device to find
1762
  @param instance: The instance object the device belongs to
1763
  @returns The result of the rpc call
1764

1765
  """
1766
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1767
  return lu.rpc.call_blockdev_find(node, disk)
1768

    
1769

    
1770
def _GenerateUniqueNames(lu, exts):
1771
  """Generate a suitable LV name.
1772

1773
  This will generate a logical volume name for the given instance.
1774

1775
  """
1776
  results = []
1777
  for val in exts:
1778
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1779
    results.append("%s%s" % (new_id, val))
1780
  return results
1781

    
1782

    
1783
class TLReplaceDisks(Tasklet):
1784
  """Replaces disks for an instance.
1785

1786
  Note: Locking is not within the scope of this class.
1787

1788
  """
1789
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1790
               disks, early_release, ignore_ipolicy):
1791
    """Initializes this class.
1792

1793
    """
1794
    Tasklet.__init__(self, lu)
1795

    
1796
    # Parameters
1797
    self.instance_name = instance_name
1798
    self.mode = mode
1799
    self.iallocator_name = iallocator_name
1800
    self.remote_node = remote_node
1801
    self.disks = disks
1802
    self.early_release = early_release
1803
    self.ignore_ipolicy = ignore_ipolicy
1804

    
1805
    # Runtime data
1806
    self.instance = None
1807
    self.new_node = None
1808
    self.target_node = None
1809
    self.other_node = None
1810
    self.remote_node_info = None
1811
    self.node_secondary_ip = None
1812

    
1813
  @staticmethod
1814
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1815
    """Compute a new secondary node using an IAllocator.
1816

1817
    """
1818
    req = iallocator.IAReqRelocate(name=instance_name,
1819
                                   relocate_from=list(relocate_from))
1820
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1821

    
1822
    ial.Run(iallocator_name)
1823

    
1824
    if not ial.success:
1825
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1826
                                 " %s" % (iallocator_name, ial.info),
1827
                                 errors.ECODE_NORES)
1828

    
1829
    remote_node_name = ial.result[0]
1830

    
1831
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1832
               instance_name, remote_node_name)
1833

    
1834
    return remote_node_name
1835

    
1836
  def _FindFaultyDisks(self, node_name):
1837
    """Wrapper for L{FindFaultyInstanceDisks}.
1838

1839
    """
1840
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1841
                                   node_name, True)
1842

    
1843
  def _CheckDisksActivated(self, instance):
1844
    """Checks if the instance disks are activated.
1845

1846
    @param instance: The instance to check disks
1847
    @return: True if they are activated, False otherwise
1848

1849
    """
1850
    nodes = instance.all_nodes
1851

    
1852
    for idx, dev in enumerate(instance.disks):
1853
      for node in nodes:
1854
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1855
        self.cfg.SetDiskID(dev, node)
1856

    
1857
        result = _BlockdevFind(self, node, dev, instance)
1858

    
1859
        if result.offline:
1860
          continue
1861
        elif result.fail_msg or not result.payload:
1862
          return False
1863

    
1864
    return True
1865

    
1866
  def CheckPrereq(self):
1867
    """Check prerequisites.
1868

1869
    This checks that the instance is in the cluster.
1870

1871
    """
1872
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1873
    assert instance is not None, \
1874
      "Cannot retrieve locked instance %s" % self.instance_name
1875

    
1876
    if instance.disk_template != constants.DT_DRBD8:
1877
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1878
                                 " instances", errors.ECODE_INVAL)
1879

    
1880
    if len(instance.secondary_nodes) != 1:
1881
      raise errors.OpPrereqError("The instance has a strange layout,"
1882
                                 " expected one secondary but found %d" %
1883
                                 len(instance.secondary_nodes),
1884
                                 errors.ECODE_FAULT)
1885

    
1886
    instance = self.instance
1887
    secondary_node = instance.secondary_nodes[0]
1888

    
1889
    if self.iallocator_name is None:
1890
      remote_node = self.remote_node
1891
    else:
1892
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1893
                                       instance.name, instance.secondary_nodes)
1894

    
1895
    if remote_node is None:
1896
      self.remote_node_info = None
1897
    else:
1898
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1899
             "Remote node '%s' is not locked" % remote_node
1900

    
1901
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1902
      assert self.remote_node_info is not None, \
1903
        "Cannot retrieve locked node %s" % remote_node
1904

    
1905
    if remote_node == self.instance.primary_node:
1906
      raise errors.OpPrereqError("The specified node is the primary node of"
1907
                                 " the instance", errors.ECODE_INVAL)
1908

    
1909
    if remote_node == secondary_node:
1910
      raise errors.OpPrereqError("The specified node is already the"
1911
                                 " secondary node of the instance",
1912
                                 errors.ECODE_INVAL)
1913

    
1914
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1915
                                    constants.REPLACE_DISK_CHG):
1916
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1917
                                 errors.ECODE_INVAL)
1918

    
1919
    if self.mode == constants.REPLACE_DISK_AUTO:
1920
      if not self._CheckDisksActivated(instance):
1921
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1922
                                   " first" % self.instance_name,
1923
                                   errors.ECODE_STATE)
1924
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1925
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1926

    
1927
      if faulty_primary and faulty_secondary:
1928
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1929
                                   " one node and can not be repaired"
1930
                                   " automatically" % self.instance_name,
1931
                                   errors.ECODE_STATE)
1932

    
1933
      if faulty_primary:
1934
        self.disks = faulty_primary
1935
        self.target_node = instance.primary_node
1936
        self.other_node = secondary_node
1937
        check_nodes = [self.target_node, self.other_node]
1938
      elif faulty_secondary:
1939
        self.disks = faulty_secondary
1940
        self.target_node = secondary_node
1941
        self.other_node = instance.primary_node
1942
        check_nodes = [self.target_node, self.other_node]
1943
      else:
1944
        self.disks = []
1945
        check_nodes = []
1946

    
1947
    else:
1948
      # Non-automatic modes
1949
      if self.mode == constants.REPLACE_DISK_PRI:
1950
        self.target_node = instance.primary_node
1951
        self.other_node = secondary_node
1952
        check_nodes = [self.target_node, self.other_node]
1953

    
1954
      elif self.mode == constants.REPLACE_DISK_SEC:
1955
        self.target_node = secondary_node
1956
        self.other_node = instance.primary_node
1957
        check_nodes = [self.target_node, self.other_node]
1958

    
1959
      elif self.mode == constants.REPLACE_DISK_CHG:
1960
        self.new_node = remote_node
1961
        self.other_node = instance.primary_node
1962
        self.target_node = secondary_node
1963
        check_nodes = [self.new_node, self.other_node]
1964

    
1965
        CheckNodeNotDrained(self.lu, remote_node)
1966
        CheckNodeVmCapable(self.lu, remote_node)
1967

    
1968
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1969
        assert old_node_info is not None
1970
        if old_node_info.offline and not self.early_release:
1971
          # doesn't make sense to delay the release
1972
          self.early_release = True
1973
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1974
                          " early-release mode", secondary_node)
1975

    
1976
      else:
1977
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1978
                                     self.mode)
1979

    
1980
      # If not specified all disks should be replaced
1981
      if not self.disks:
1982
        self.disks = range(len(self.instance.disks))
1983

    
1984
    # TODO: This is ugly, but right now we can't distinguish between internal
1985
    # submitted opcode and external one. We should fix that.
1986
    if self.remote_node_info:
1987
      # We change the node, lets verify it still meets instance policy
1988
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1989
      cluster = self.cfg.GetClusterInfo()
1990
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1991
                                                              new_group_info)
1992
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1993
                             self.cfg, ignore=self.ignore_ipolicy)
1994

    
1995
    for node in check_nodes:
1996
      CheckNodeOnline(self.lu, node)
1997

    
1998
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
1999
                                                          self.other_node,
2000
                                                          self.target_node]
2001
                              if node_name is not None)
2002

    
2003
    # Release unneeded node and node resource locks
2004
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2005
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2006
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2007

    
2008
    # Release any owned node group
2009
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2010

    
2011
    # Check whether disks are valid
2012
    for disk_idx in self.disks:
2013
      instance.FindDisk(disk_idx)
2014

    
2015
    # Get secondary node IP addresses
2016
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2017
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2018

    
2019
  def Exec(self, feedback_fn):
2020
    """Execute disk replacement.
2021

2022
    This dispatches the disk replacement to the appropriate handler.
2023

2024
    """
2025
    if __debug__:
2026
      # Verify owned locks before starting operation
2027
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2028
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2029
          ("Incorrect node locks, owning %s, expected %s" %
2030
           (owned_nodes, self.node_secondary_ip.keys()))
2031
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2032
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2033
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2034

    
2035
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2036
      assert list(owned_instances) == [self.instance_name], \
2037
          "Instance '%s' not locked" % self.instance_name
2038

    
2039
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2040
          "Should not own any node group lock at this point"
2041

    
2042
    if not self.disks:
2043
      feedback_fn("No disks need replacement for instance '%s'" %
2044
                  self.instance.name)
2045
      return
2046

    
2047
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2048
                (utils.CommaJoin(self.disks), self.instance.name))
2049
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2050
    feedback_fn("Current seconary node: %s" %
2051
                utils.CommaJoin(self.instance.secondary_nodes))
2052

    
2053
    activate_disks = not self.instance.disks_active
2054

    
2055
    # Activate the instance disks if we're replacing them on a down instance
2056
    if activate_disks:
2057
      StartInstanceDisks(self.lu, self.instance, True)
2058

    
2059
    try:
2060
      # Should we replace the secondary node?
2061
      if self.new_node is not None:
2062
        fn = self._ExecDrbd8Secondary
2063
      else:
2064
        fn = self._ExecDrbd8DiskOnly
2065

    
2066
      result = fn(feedback_fn)
2067
    finally:
2068
      # Deactivate the instance disks if we're replacing them on a
2069
      # down instance
2070
      if activate_disks:
2071
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2072

    
2073
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2074

    
2075
    if __debug__:
2076
      # Verify owned locks
2077
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2078
      nodes = frozenset(self.node_secondary_ip)
2079
      assert ((self.early_release and not owned_nodes) or
2080
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2081
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2082
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2083

    
2084
    return result
2085

    
2086
  def _CheckVolumeGroup(self, nodes):
2087
    self.lu.LogInfo("Checking volume groups")
2088

    
2089
    vgname = self.cfg.GetVGName()
2090

    
2091
    # Make sure volume group exists on all involved nodes
2092
    results = self.rpc.call_vg_list(nodes)
2093
    if not results:
2094
      raise errors.OpExecError("Can't list volume groups on the nodes")
2095

    
2096
    for node in nodes:
2097
      res = results[node]
2098
      res.Raise("Error checking node %s" % node)
2099
      if vgname not in res.payload:
2100
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2101
                                 (vgname, node))
2102

    
2103
  def _CheckDisksExistence(self, nodes):
2104
    # Check disk existence
2105
    for idx, dev in enumerate(self.instance.disks):
2106
      if idx not in self.disks:
2107
        continue
2108

    
2109
      for node in nodes:
2110
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2111
        self.cfg.SetDiskID(dev, node)
2112

    
2113
        result = _BlockdevFind(self, node, dev, self.instance)
2114

    
2115
        msg = result.fail_msg
2116
        if msg or not result.payload:
2117
          if not msg:
2118
            msg = "disk not found"
2119
          if not self._CheckDisksActivated(self.instance):
2120
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2121
                          " running activate-disks on the instance before"
2122
                          " using replace-disks.")
2123
          else:
2124
            extra_hint = ""
2125
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2126
                                   (idx, node, msg, extra_hint))
2127

    
2128
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2129
    for idx, dev in enumerate(self.instance.disks):
2130
      if idx not in self.disks:
2131
        continue
2132

    
2133
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2134
                      (idx, node_name))
2135

    
2136
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2137
                                  on_primary, ldisk=ldisk):
2138
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2139
                                 " replace disks for instance %s" %
2140
                                 (node_name, self.instance.name))
2141

    
2142
  def _CreateNewStorage(self, node_name):
2143
    """Create new storage on the primary or secondary node.
2144

2145
    This is only used for same-node replaces, not for changing the
2146
    secondary node, hence we don't want to modify the existing disk.
2147

2148
    """
2149
    iv_names = {}
2150

    
2151
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2152
    for idx, dev in enumerate(disks):
2153
      if idx not in self.disks:
2154
        continue
2155

    
2156
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2157

    
2158
      self.cfg.SetDiskID(dev, node_name)
2159

    
2160
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2161
      names = _GenerateUniqueNames(self.lu, lv_names)
2162

    
2163
      (data_disk, meta_disk) = dev.children
2164
      vg_data = data_disk.logical_id[0]
2165
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2166
                             logical_id=(vg_data, names[0]),
2167
                             params=data_disk.params)
2168
      vg_meta = meta_disk.logical_id[0]
2169
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2170
                             size=constants.DRBD_META_SIZE,
2171
                             logical_id=(vg_meta, names[1]),
2172
                             params=meta_disk.params)
2173

    
2174
      new_lvs = [lv_data, lv_meta]
2175
      old_lvs = [child.Copy() for child in dev.children]
2176
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2177
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2178

    
2179
      # we pass force_create=True to force the LVM creation
2180
      for new_lv in new_lvs:
2181
        try:
2182
          _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2183
                               GetInstanceInfoText(self.instance), False,
2184
                               excl_stor)
2185
        except errors.DeviceCreationError, e:
2186
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2187

    
2188
    return iv_names
2189

    
2190
  def _CheckDevices(self, node_name, iv_names):
2191
    for name, (dev, _, _) in iv_names.iteritems():
2192
      self.cfg.SetDiskID(dev, node_name)
2193

    
2194
      result = _BlockdevFind(self, node_name, dev, self.instance)
2195

    
2196
      msg = result.fail_msg
2197
      if msg or not result.payload:
2198
        if not msg:
2199
          msg = "disk not found"
2200
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2201
                                 (name, msg))
2202

    
2203
      if result.payload.is_degraded:
2204
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2205

    
2206
  def _RemoveOldStorage(self, node_name, iv_names):
2207
    for name, (_, old_lvs, _) in iv_names.iteritems():
2208
      self.lu.LogInfo("Remove logical volumes for %s", name)
2209

    
2210
      for lv in old_lvs:
2211
        self.cfg.SetDiskID(lv, node_name)
2212

    
2213
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2214
        if msg:
2215
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2216
                             hint="remove unused LVs manually")
2217

    
2218
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2219
    """Replace a disk on the primary or secondary for DRBD 8.
2220

2221
    The algorithm for replace is quite complicated:
2222

2223
      1. for each disk to be replaced:
2224

2225
        1. create new LVs on the target node with unique names
2226
        1. detach old LVs from the drbd device
2227
        1. rename old LVs to name_replaced.<time_t>
2228
        1. rename new LVs to old LVs
2229
        1. attach the new LVs (with the old names now) to the drbd device
2230

2231
      1. wait for sync across all devices
2232

2233
      1. for each modified disk:
2234

2235
        1. remove old LVs (which have the name name_replaces.<time_t>)
2236

2237
    Failures are not very well handled.
2238

2239
    """
2240
    steps_total = 6
2241

    
2242
    # Step: check device activation
2243
    self.lu.LogStep(1, steps_total, "Check device existence")
2244
    self._CheckDisksExistence([self.other_node, self.target_node])
2245
    self._CheckVolumeGroup([self.target_node, self.other_node])
2246

    
2247
    # Step: check other node consistency
2248
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2249
    self._CheckDisksConsistency(self.other_node,
2250
                                self.other_node == self.instance.primary_node,
2251
                                False)
2252

    
2253
    # Step: create new storage
2254
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2255
    iv_names = self._CreateNewStorage(self.target_node)
2256

    
2257
    # Step: for each lv, detach+rename*2+attach
2258
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2259
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2260
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2261

    
2262
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2263
                                                     old_lvs)
2264
      result.Raise("Can't detach drbd from local storage on node"
2265
                   " %s for device %s" % (self.target_node, dev.iv_name))
2266
      #dev.children = []
2267
      #cfg.Update(instance)
2268

    
2269
      # ok, we created the new LVs, so now we know we have the needed
2270
      # storage; as such, we proceed on the target node to rename
2271
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2272
      # using the assumption that logical_id == physical_id (which in
2273
      # turn is the unique_id on that node)
2274

    
2275
      # FIXME(iustin): use a better name for the replaced LVs
2276
      temp_suffix = int(time.time())
2277
      ren_fn = lambda d, suff: (d.physical_id[0],
2278
                                d.physical_id[1] + "_replaced-%s" % suff)
2279

    
2280
      # Build the rename list based on what LVs exist on the node
2281
      rename_old_to_new = []
2282
      for to_ren in old_lvs:
2283
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2284
        if not result.fail_msg and result.payload:
2285
          # device exists
2286
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2287

    
2288
      self.lu.LogInfo("Renaming the old LVs on the target node")
2289
      result = self.rpc.call_blockdev_rename(self.target_node,
2290
                                             rename_old_to_new)
2291
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2292

    
2293
      # Now we rename the new LVs to the old LVs
2294
      self.lu.LogInfo("Renaming the new LVs on the target node")
2295
      rename_new_to_old = [(new, old.physical_id)
2296
                           for old, new in zip(old_lvs, new_lvs)]
2297
      result = self.rpc.call_blockdev_rename(self.target_node,
2298
                                             rename_new_to_old)
2299
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2300

    
2301
      # Intermediate steps of in memory modifications
2302
      for old, new in zip(old_lvs, new_lvs):
2303
        new.logical_id = old.logical_id
2304
        self.cfg.SetDiskID(new, self.target_node)
2305

    
2306
      # We need to modify old_lvs so that removal later removes the
2307
      # right LVs, not the newly added ones; note that old_lvs is a
2308
      # copy here
2309
      for disk in old_lvs:
2310
        disk.logical_id = ren_fn(disk, temp_suffix)
2311
        self.cfg.SetDiskID(disk, self.target_node)
2312

    
2313
      # Now that the new lvs have the old name, we can add them to the device
2314
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2315
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2316
                                                  (dev, self.instance), new_lvs)
2317
      msg = result.fail_msg
2318
      if msg:
2319
        for new_lv in new_lvs:
2320
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2321
                                               new_lv).fail_msg
2322
          if msg2:
2323
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2324
                               hint=("cleanup manually the unused logical"
2325
                                     "volumes"))
2326
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2327

    
2328
    cstep = itertools.count(5)
2329

    
2330
    if self.early_release:
2331
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2332
      self._RemoveOldStorage(self.target_node, iv_names)
2333
      # TODO: Check if releasing locks early still makes sense
2334
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2335
    else:
2336
      # Release all resource locks except those used by the instance
2337
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2338
                   keep=self.node_secondary_ip.keys())
2339

    
2340
    # Release all node locks while waiting for sync
2341
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2342

    
2343
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2344
    # shutdown in the caller into consideration.
2345

    
2346
    # Wait for sync
2347
    # This can fail as the old devices are degraded and _WaitForSync
2348
    # does a combined result over all disks, so we don't check its return value
2349
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2350
    WaitForSync(self.lu, self.instance)
2351

    
2352
    # Check all devices manually
2353
    self._CheckDevices(self.instance.primary_node, iv_names)
2354

    
2355
    # Step: remove old storage
2356
    if not self.early_release:
2357
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2358
      self._RemoveOldStorage(self.target_node, iv_names)
2359

    
2360
  def _ExecDrbd8Secondary(self, feedback_fn):
2361
    """Replace the secondary node for DRBD 8.
2362

2363
    The algorithm for replace is quite complicated:
2364
      - for all disks of the instance:
2365
        - create new LVs on the new node with same names
2366
        - shutdown the drbd device on the old secondary
2367
        - disconnect the drbd network on the primary
2368
        - create the drbd device on the new secondary
2369
        - network attach the drbd on the primary, using an artifice:
2370
          the drbd code for Attach() will connect to the network if it
2371
          finds a device which is connected to the good local disks but
2372
          not network enabled
2373
      - wait for sync across all devices
2374
      - remove all disks from the old secondary
2375

2376
    Failures are not very well handled.
2377

2378
    """
2379
    steps_total = 6
2380

    
2381
    pnode = self.instance.primary_node
2382

    
2383
    # Step: check device activation
2384
    self.lu.LogStep(1, steps_total, "Check device existence")
2385
    self._CheckDisksExistence([self.instance.primary_node])
2386
    self._CheckVolumeGroup([self.instance.primary_node])
2387

    
2388
    # Step: check other node consistency
2389
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2390
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2391

    
2392
    # Step: create new storage
2393
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2394
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2395
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2396
    for idx, dev in enumerate(disks):
2397
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2398
                      (self.new_node, idx))
2399
      # we pass force_create=True to force LVM creation
2400
      for new_lv in dev.children:
2401
        try:
2402
          _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2403
                               True, GetInstanceInfoText(self.instance), False,
2404
                               excl_stor)
2405
        except errors.DeviceCreationError, e:
2406
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2407

    
2408
    # Step 4: dbrd minors and drbd setups changes
2409
    # after this, we must manually remove the drbd minors on both the
2410
    # error and the success paths
2411
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2412
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2413
                                         for dev in self.instance.disks],
2414
                                        self.instance.name)
2415
    logging.debug("Allocated minors %r", minors)
2416

    
2417
    iv_names = {}
2418
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2419
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2420
                      (self.new_node, idx))
2421
      # create new devices on new_node; note that we create two IDs:
2422
      # one without port, so the drbd will be activated without
2423
      # networking information on the new node at this stage, and one
2424
      # with network, for the latter activation in step 4
2425
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2426
      if self.instance.primary_node == o_node1:
2427
        p_minor = o_minor1
2428
      else:
2429
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2430
        p_minor = o_minor2
2431

    
2432
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2433
                      p_minor, new_minor, o_secret)
2434
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2435
                    p_minor, new_minor, o_secret)
2436

    
2437
      iv_names[idx] = (dev, dev.children, new_net_id)
2438
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2439
                    new_net_id)
2440
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2441
                              logical_id=new_alone_id,
2442
                              children=dev.children,
2443
                              size=dev.size,
2444
                              params={})
2445
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2446
                                            self.cfg)
2447
      try:
2448
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2449
                             anno_new_drbd,
2450
                             GetInstanceInfoText(self.instance), False,
2451
                             excl_stor)
2452
      except errors.GenericError:
2453
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2454
        raise
2455

    
2456
    # We have new devices, shutdown the drbd on the old secondary
2457
    for idx, dev in enumerate(self.instance.disks):
2458
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2459
      self.cfg.SetDiskID(dev, self.target_node)
2460
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2461
                                            (dev, self.instance)).fail_msg
2462
      if msg:
2463
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2464
                           "node: %s" % (idx, msg),
2465
                           hint=("Please cleanup this device manually as"
2466
                                 " soon as possible"))
2467

    
2468
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2469
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2470
                                               self.instance.disks)[pnode]
2471

    
2472
    msg = result.fail_msg
2473
    if msg:
2474
      # detaches didn't succeed (unlikely)
2475
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2476
      raise errors.OpExecError("Can't detach the disks from the network on"
2477
                               " old node: %s" % (msg,))
2478

    
2479
    # if we managed to detach at least one, we update all the disks of
2480
    # the instance to point to the new secondary
2481
    self.lu.LogInfo("Updating instance configuration")
2482
    for dev, _, new_logical_id in iv_names.itervalues():
2483
      dev.logical_id = new_logical_id
2484
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2485

    
2486
    self.cfg.Update(self.instance, feedback_fn)
2487

    
2488
    # Release all node locks (the configuration has been updated)
2489
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2490

    
2491
    # and now perform the drbd attach
2492
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2493
                    " (standalone => connected)")
2494
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2495
                                            self.new_node],
2496
                                           self.node_secondary_ip,
2497
                                           (self.instance.disks, self.instance),
2498
                                           self.instance.name,
2499
                                           False)
2500
    for to_node, to_result in result.items():
2501
      msg = to_result.fail_msg
2502
      if msg:
2503
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2504
                           to_node, msg,
2505
                           hint=("please do a gnt-instance info to see the"
2506
                                 " status of disks"))
2507

    
2508
    cstep = itertools.count(5)
2509

    
2510
    if self.early_release:
2511
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2512
      self._RemoveOldStorage(self.target_node, iv_names)
2513
      # TODO: Check if releasing locks early still makes sense
2514
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2515
    else:
2516
      # Release all resource locks except those used by the instance
2517
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2518
                   keep=self.node_secondary_ip.keys())
2519

    
2520
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2521
    # shutdown in the caller into consideration.
2522

    
2523
    # Wait for sync
2524
    # This can fail as the old devices are degraded and _WaitForSync
2525
    # does a combined result over all disks, so we don't check its return value
2526
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2527
    WaitForSync(self.lu, self.instance)
2528

    
2529
    # Check all devices manually
2530
    self._CheckDevices(self.instance.primary_node, iv_names)
2531

    
2532
    # Step: remove old storage
2533
    if not self.early_release:
2534
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2535
      self._RemoveOldStorage(self.target_node, iv_names)