Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 06c2fb4a

History | View | Annotate | Download (93.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    if constants.IDISK_METAVG in disk:
347
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348
    if constants.IDISK_ADOPT in disk:
349
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350

    
351
    # For extstorage, demand the `provider' option and add any
352
    # additional parameters (ext-params) to the dict
353
    if op.disk_template == constants.DT_EXT:
354
      if ext_provider:
355
        new_disk[constants.IDISK_PROVIDER] = ext_provider
356
        for key in disk:
357
          if key not in constants.IDISK_PARAMS:
358
            new_disk[key] = disk[key]
359
      else:
360
        raise errors.OpPrereqError("Missing provider for template '%s'" %
361
                                   constants.DT_EXT, errors.ECODE_INVAL)
362

    
363
    disks.append(new_disk)
364

    
365
  return disks
366

    
367

    
368
def CheckRADOSFreeSpace():
369
  """Compute disk size requirements inside the RADOS cluster.
370

371
  """
372
  # For the RADOS cluster we assume there is always enough space.
373
  pass
374

    
375

    
376
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377
                         iv_name, p_minor, s_minor):
378
  """Generate a drbd8 device complete with its children.
379

380
  """
381
  assert len(vgnames) == len(names) == 2
382
  port = lu.cfg.AllocatePort()
383
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384

    
385
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386
                          logical_id=(vgnames[0], names[0]),
387
                          params={})
388
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
390
                          size=constants.DRBD_META_SIZE,
391
                          logical_id=(vgnames[1], names[1]),
392
                          params={})
393
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395
                          logical_id=(primary, secondary, port,
396
                                      p_minor, s_minor,
397
                                      shared_secret),
398
                          children=[dev_data, dev_meta],
399
                          iv_name=iv_name, params={})
400
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401
  return drbd_dev
402

    
403

    
404
def GenerateDiskTemplate(
405
  lu, template_name, instance_name, primary_node, secondary_nodes,
406
  disk_info, file_storage_dir, file_driver, base_index,
407
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409
  """Generate the entire disk layout for a given template type.
410

411
  """
412
  vgname = lu.cfg.GetVGName()
413
  disk_count = len(disk_info)
414
  disks = []
415

    
416
  if template_name == constants.DT_DISKLESS:
417
    pass
418
  elif template_name == constants.DT_DRBD8:
419
    if len(secondary_nodes) != 1:
420
      raise errors.ProgrammerError("Wrong template configuration")
421
    remote_node = secondary_nodes[0]
422
    minors = lu.cfg.AllocateDRBDMinor(
423
      [primary_node, remote_node] * len(disk_info), instance_name)
424

    
425
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426
                                                       full_disk_params)
427
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428

    
429
    names = []
430
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431
                                               for i in range(disk_count)]):
432
      names.append(lv_prefix + "_data")
433
      names.append(lv_prefix + "_meta")
434
    for idx, disk in enumerate(disk_info):
435
      disk_index = idx + base_index
436
      data_vg = disk.get(constants.IDISK_VG, vgname)
437
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439
                                      disk[constants.IDISK_SIZE],
440
                                      [data_vg, meta_vg],
441
                                      names[idx * 2:idx * 2 + 2],
442
                                      "disk/%d" % disk_index,
443
                                      minors[idx * 2], minors[idx * 2 + 1])
444
      disk_dev.mode = disk[constants.IDISK_MODE]
445
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
446
      disks.append(disk_dev)
447
  else:
448
    if secondary_nodes:
449
      raise errors.ProgrammerError("Wrong template configuration")
450

    
451
    if template_name == constants.DT_FILE:
452
      _req_file_storage()
453
    elif template_name == constants.DT_SHARED_FILE:
454
      _req_shr_file_storage()
455

    
456
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457
    if name_prefix is None:
458
      names = None
459
    else:
460
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461
                                        (name_prefix, base_index + i)
462
                                        for i in range(disk_count)])
463

    
464
    if template_name == constants.DT_PLAIN:
465

    
466
      def logical_id_fn(idx, _, disk):
467
        vg = disk.get(constants.IDISK_VG, vgname)
468
        return (vg, names[idx])
469

    
470
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471
      logical_id_fn = \
472
        lambda _, disk_index, disk: (file_driver,
473
                                     "%s/disk%d" % (file_storage_dir,
474
                                                    disk_index))
475
    elif template_name == constants.DT_BLOCK:
476
      logical_id_fn = \
477
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478
                                       disk[constants.IDISK_ADOPT])
479
    elif template_name == constants.DT_RBD:
480
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481
    elif template_name == constants.DT_EXT:
482
      def logical_id_fn(idx, _, disk):
483
        provider = disk.get(constants.IDISK_PROVIDER, None)
484
        if provider is None:
485
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486
                                       " not found", constants.DT_EXT,
487
                                       constants.IDISK_PROVIDER)
488
        return (provider, names[idx])
489
    else:
490
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491

    
492
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493

    
494
    for idx, disk in enumerate(disk_info):
495
      params = {}
496
      # Only for the Ext template add disk_info to params
497
      if template_name == constants.DT_EXT:
498
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499
        for key in disk:
500
          if key not in constants.IDISK_PARAMS:
501
            params[key] = disk[key]
502
      disk_index = idx + base_index
503
      size = disk[constants.IDISK_SIZE]
504
      feedback_fn("* disk %s, size %s" %
505
                  (disk_index, utils.FormatUnit(size, "h")))
506
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
507
                              logical_id=logical_id_fn(idx, disk_index, disk),
508
                              iv_name="disk/%d" % disk_index,
509
                              mode=disk[constants.IDISK_MODE],
510
                              params=params)
511
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
512
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513
      disks.append(disk_dev)
514

    
515
  return disks
516

    
517

    
518
class LUInstanceRecreateDisks(LogicalUnit):
519
  """Recreate an instance's missing disks.
520

521
  """
522
  HPATH = "instance-recreate-disks"
523
  HTYPE = constants.HTYPE_INSTANCE
524
  REQ_BGL = False
525

    
526
  _MODIFYABLE = compat.UniqueFrozenset([
527
    constants.IDISK_SIZE,
528
    constants.IDISK_MODE,
529
    ])
530

    
531
  # New or changed disk parameters may have different semantics
532
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533
    constants.IDISK_ADOPT,
534

    
535
    # TODO: Implement support changing VG while recreating
536
    constants.IDISK_VG,
537
    constants.IDISK_METAVG,
538
    constants.IDISK_PROVIDER,
539
    constants.IDISK_NAME,
540
    constants.IDISK_SNAPSHOT_NAME,
541
    ]))
542

    
543
  def _RunAllocator(self):
544
    """Run the allocator based on input opcode.
545

546
    """
547
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
548

    
549
    # FIXME
550
    # The allocator should actually run in "relocate" mode, but current
551
    # allocators don't support relocating all the nodes of an instance at
552
    # the same time. As a workaround we use "allocate" mode, but this is
553
    # suboptimal for two reasons:
554
    # - The instance name passed to the allocator is present in the list of
555
    #   existing instances, so there could be a conflict within the
556
    #   internal structures of the allocator. This doesn't happen with the
557
    #   current allocators, but it's a liability.
558
    # - The allocator counts the resources used by the instance twice: once
559
    #   because the instance exists already, and once because it tries to
560
    #   allocate a new instance.
561
    # The allocator could choose some of the nodes on which the instance is
562
    # running, but that's not a problem. If the instance nodes are broken,
563
    # they should be already be marked as drained or offline, and hence
564
    # skipped by the allocator. If instance disks have been lost for other
565
    # reasons, then recreating the disks on the same nodes should be fine.
566
    disk_template = self.instance.disk_template
567
    spindle_use = be_full[constants.BE_SPINDLE_USE]
568
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
569
                                        disk_template=disk_template,
570
                                        tags=list(self.instance.GetTags()),
571
                                        os=self.instance.os,
572
                                        nics=[{}],
573
                                        vcpus=be_full[constants.BE_VCPUS],
574
                                        memory=be_full[constants.BE_MAXMEM],
575
                                        spindle_use=spindle_use,
576
                                        disks=[{constants.IDISK_SIZE: d.size,
577
                                                constants.IDISK_MODE: d.mode}
578
                                               for d in self.instance.disks],
579
                                        hypervisor=self.instance.hypervisor,
580
                                        node_whitelist=None)
581
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
582

    
583
    ial.Run(self.op.iallocator)
584

    
585
    assert req.RequiredNodes() == len(self.instance.all_nodes)
586

    
587
    if not ial.success:
588
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
589
                                 " %s" % (self.op.iallocator, ial.info),
590
                                 errors.ECODE_NORES)
591

    
592
    self.op.nodes = ial.result
593
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
594
                 self.op.instance_name, self.op.iallocator,
595
                 utils.CommaJoin(ial.result))
596

    
597
  def CheckArguments(self):
598
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
599
      # Normalize and convert deprecated list of disk indices
600
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
601

    
602
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
603
    if duplicates:
604
      raise errors.OpPrereqError("Some disks have been specified more than"
605
                                 " once: %s" % utils.CommaJoin(duplicates),
606
                                 errors.ECODE_INVAL)
607

    
608
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
609
    # when neither iallocator nor nodes are specified
610
    if self.op.iallocator or self.op.nodes:
611
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
612

    
613
    for (idx, params) in self.op.disks:
614
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
615
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
616
      if unsupported:
617
        raise errors.OpPrereqError("Parameters for disk %s try to change"
618
                                   " unmodifyable parameter(s): %s" %
619
                                   (idx, utils.CommaJoin(unsupported)),
620
                                   errors.ECODE_INVAL)
621

    
622
  def ExpandNames(self):
623
    self._ExpandAndLockInstance()
624
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
625

    
626
    if self.op.nodes:
627
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
628
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
629
    else:
630
      self.needed_locks[locking.LEVEL_NODE] = []
631
      if self.op.iallocator:
632
        # iallocator will select a new node in the same group
633
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
634
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
635

    
636
    self.needed_locks[locking.LEVEL_NODE_RES] = []
637

    
638
  def DeclareLocks(self, level):
639
    if level == locking.LEVEL_NODEGROUP:
640
      assert self.op.iallocator is not None
641
      assert not self.op.nodes
642
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
643
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
644
      # Lock the primary group used by the instance optimistically; this
645
      # requires going via the node before it's locked, requiring
646
      # verification later on
647
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
648
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
649

    
650
    elif level == locking.LEVEL_NODE:
651
      # If an allocator is used, then we lock all the nodes in the current
652
      # instance group, as we don't know yet which ones will be selected;
653
      # if we replace the nodes without using an allocator, locks are
654
      # already declared in ExpandNames; otherwise, we need to lock all the
655
      # instance nodes for disk re-creation
656
      if self.op.iallocator:
657
        assert not self.op.nodes
658
        assert not self.needed_locks[locking.LEVEL_NODE]
659
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
660

    
661
        # Lock member nodes of the group of the primary node
662
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
663
          self.needed_locks[locking.LEVEL_NODE].extend(
664
            self.cfg.GetNodeGroup(group_uuid).members)
665

    
666
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
667
      elif not self.op.nodes:
668
        self._LockInstancesNodes(primary_only=False)
669
    elif level == locking.LEVEL_NODE_RES:
670
      # Copy node locks
671
      self.needed_locks[locking.LEVEL_NODE_RES] = \
672
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
673

    
674
  def BuildHooksEnv(self):
675
    """Build hooks env.
676

677
    This runs on master, primary and secondary nodes of the instance.
678

679
    """
680
    return BuildInstanceHookEnvByObject(self, self.instance)
681

    
682
  def BuildHooksNodes(self):
683
    """Build hooks nodes.
684

685
    """
686
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
687
    return (nl, nl)
688

    
689
  def CheckPrereq(self):
690
    """Check prerequisites.
691

692
    This checks that the instance is in the cluster and is not running.
693

694
    """
695
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
696
    assert instance is not None, \
697
      "Cannot retrieve locked instance %s" % self.op.instance_name
698
    if self.op.nodes:
699
      if len(self.op.nodes) != len(instance.all_nodes):
700
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
701
                                   " %d replacement nodes were specified" %
702
                                   (instance.name, len(instance.all_nodes),
703
                                    len(self.op.nodes)),
704
                                   errors.ECODE_INVAL)
705
      assert instance.disk_template != constants.DT_DRBD8 or \
706
             len(self.op.nodes) == 2
707
      assert instance.disk_template != constants.DT_PLAIN or \
708
             len(self.op.nodes) == 1
709
      primary_node = self.op.nodes[0]
710
    else:
711
      primary_node = instance.primary_node
712
    if not self.op.iallocator:
713
      CheckNodeOnline(self, primary_node)
714

    
715
    if instance.disk_template == constants.DT_DISKLESS:
716
      raise errors.OpPrereqError("Instance '%s' has no disks" %
717
                                 self.op.instance_name, errors.ECODE_INVAL)
718

    
719
    # Verify if node group locks are still correct
720
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
721
    if owned_groups:
722
      # Node group locks are acquired only for the primary node (and only
723
      # when the allocator is used)
724
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
725
                              primary_only=True)
726

    
727
    # if we replace nodes *and* the old primary is offline, we don't
728
    # check the instance state
729
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
730
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
731
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
732
                         msg="cannot recreate disks")
733

    
734
    if self.op.disks:
735
      self.disks = dict(self.op.disks)
736
    else:
737
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
738

    
739
    maxidx = max(self.disks.keys())
740
    if maxidx >= len(instance.disks):
741
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
742
                                 errors.ECODE_INVAL)
743

    
744
    if ((self.op.nodes or self.op.iallocator) and
745
         sorted(self.disks.keys()) != range(len(instance.disks))):
746
      raise errors.OpPrereqError("Can't recreate disks partially and"
747
                                 " change the nodes at the same time",
748
                                 errors.ECODE_INVAL)
749

    
750
    self.instance = instance
751

    
752
    if self.op.iallocator:
753
      self._RunAllocator()
754
      # Release unneeded node and node resource locks
755
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
756
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
757
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
758

    
759
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
760

    
761
  def Exec(self, feedback_fn):
762
    """Recreate the disks.
763

764
    """
765
    instance = self.instance
766

    
767
    assert (self.owned_locks(locking.LEVEL_NODE) ==
768
            self.owned_locks(locking.LEVEL_NODE_RES))
769

    
770
    to_skip = []
771
    mods = [] # keeps track of needed changes
772

    
773
    for idx, disk in enumerate(instance.disks):
774
      try:
775
        changes = self.disks[idx]
776
      except KeyError:
777
        # Disk should not be recreated
778
        to_skip.append(idx)
779
        continue
780

    
781
      # update secondaries for disks, if needed
782
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
783
        # need to update the nodes and minors
784
        assert len(self.op.nodes) == 2
785
        assert len(disk.logical_id) == 6 # otherwise disk internals
786
                                         # have changed
787
        (_, _, old_port, _, _, old_secret) = disk.logical_id
788
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
789
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
790
                  new_minors[0], new_minors[1], old_secret)
791
        assert len(disk.logical_id) == len(new_id)
792
      else:
793
        new_id = None
794

    
795
      mods.append((idx, new_id, changes))
796

    
797
    # now that we have passed all asserts above, we can apply the mods
798
    # in a single run (to avoid partial changes)
799
    for idx, new_id, changes in mods:
800
      disk = instance.disks[idx]
801
      if new_id is not None:
802
        assert disk.dev_type == constants.LD_DRBD8
803
        disk.logical_id = new_id
804
      if changes:
805
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
806
                    mode=changes.get(constants.IDISK_MODE, None))
807

    
808
    # change primary node, if needed
809
    if self.op.nodes:
810
      instance.primary_node = self.op.nodes[0]
811
      self.LogWarning("Changing the instance's nodes, you will have to"
812
                      " remove any disks left on the older nodes manually")
813

    
814
    if self.op.nodes:
815
      self.cfg.Update(instance, feedback_fn)
816

    
817
    # All touched nodes must be locked
818
    mylocks = self.owned_locks(locking.LEVEL_NODE)
819
    assert mylocks.issuperset(frozenset(instance.all_nodes))
820
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
821

    
822
    # TODO: Release node locks before wiping, or explain why it's not possible
823
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
824
      wipedisks = [(idx, disk, 0)
825
                   for (idx, disk) in enumerate(instance.disks)
826
                   if idx not in to_skip]
827
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
828

    
829

    
830
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
831
  """Checks if nodes have enough free disk space in the specified VG.
832

833
  This function checks if all given nodes have the needed amount of
834
  free disk. In case any node has less disk or we cannot get the
835
  information from the node, this function raises an OpPrereqError
836
  exception.
837

838
  @type lu: C{LogicalUnit}
839
  @param lu: a logical unit from which we get configuration data
840
  @type nodenames: C{list}
841
  @param nodenames: the list of node names to check
842
  @type vg: C{str}
843
  @param vg: the volume group to check
844
  @type requested: C{int}
845
  @param requested: the amount of disk in MiB to check for
846
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
847
      or we cannot check the node
848

849
  """
850
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
851
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
852
  for node in nodenames:
853
    info = nodeinfo[node]
854
    info.Raise("Cannot get current information from node %s" % node,
855
               prereq=True, ecode=errors.ECODE_ENVIRON)
856
    (_, (vg_info, ), _) = info.payload
857
    vg_free = vg_info.get("vg_free", None)
858
    if not isinstance(vg_free, int):
859
      raise errors.OpPrereqError("Can't compute free disk space on node"
860
                                 " %s for vg %s, result was '%s'" %
861
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
862
    if requested > vg_free:
863
      raise errors.OpPrereqError("Not enough disk space on target node %s"
864
                                 " vg %s: required %d MiB, available %d MiB" %
865
                                 (node, vg, requested, vg_free),
866
                                 errors.ECODE_NORES)
867

    
868

    
869
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
870
  """Checks if nodes have enough free disk space in all the VGs.
871

872
  This function checks if all given nodes have the needed amount of
873
  free disk. In case any node has less disk or we cannot get the
874
  information from the node, this function raises an OpPrereqError
875
  exception.
876

877
  @type lu: C{LogicalUnit}
878
  @param lu: a logical unit from which we get configuration data
879
  @type nodenames: C{list}
880
  @param nodenames: the list of node names to check
881
  @type req_sizes: C{dict}
882
  @param req_sizes: the hash of vg and corresponding amount of disk in
883
      MiB to check for
884
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
885
      or we cannot check the node
886

887
  """
888
  for vg, req_size in req_sizes.items():
889
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
890

    
891

    
892
def _DiskSizeInBytesToMebibytes(lu, size):
893
  """Converts a disk size in bytes to mebibytes.
894

895
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
896

897
  """
898
  (mib, remainder) = divmod(size, 1024 * 1024)
899

    
900
  if remainder != 0:
901
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
902
                  " to not overwrite existing data (%s bytes will not be"
903
                  " wiped)", (1024 * 1024) - remainder)
904
    mib += 1
905

    
906
  return mib
907

    
908

    
909
def _CalcEta(time_taken, written, total_size):
910
  """Calculates the ETA based on size written and total size.
911

912
  @param time_taken: The time taken so far
913
  @param written: amount written so far
914
  @param total_size: The total size of data to be written
915
  @return: The remaining time in seconds
916

917
  """
918
  avg_time = time_taken / float(written)
919
  return (total_size - written) * avg_time
920

    
921

    
922
def WipeDisks(lu, instance, disks=None):
923
  """Wipes instance disks.
924

925
  @type lu: L{LogicalUnit}
926
  @param lu: the logical unit on whose behalf we execute
927
  @type instance: L{objects.Instance}
928
  @param instance: the instance whose disks we should create
929
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
930
  @param disks: Disk details; tuple contains disk index, disk object and the
931
    start offset
932

933
  """
934
  node = instance.primary_node
935

    
936
  if disks is None:
937
    disks = [(idx, disk, 0)
938
             for (idx, disk) in enumerate(instance.disks)]
939

    
940
  for (_, device, _) in disks:
941
    lu.cfg.SetDiskID(device, node)
942

    
943
  logging.info("Pausing synchronization of disks of instance '%s'",
944
               instance.name)
945
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
946
                                                  (map(compat.snd, disks),
947
                                                   instance),
948
                                                  True)
949
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
950

    
951
  for idx, success in enumerate(result.payload):
952
    if not success:
953
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
954
                   " failed", idx, instance.name)
955

    
956
  try:
957
    for (idx, device, offset) in disks:
958
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
959
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
960
      wipe_chunk_size = \
961
        int(min(constants.MAX_WIPE_CHUNK,
962
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
963

    
964
      size = device.size
965
      last_output = 0
966
      start_time = time.time()
967

    
968
      if offset == 0:
969
        info_text = ""
970
      else:
971
        info_text = (" (from %s to %s)" %
972
                     (utils.FormatUnit(offset, "h"),
973
                      utils.FormatUnit(size, "h")))
974

    
975
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
976

    
977
      logging.info("Wiping disk %d for instance %s on node %s using"
978
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
979

    
980
      while offset < size:
981
        wipe_size = min(wipe_chunk_size, size - offset)
982

    
983
        logging.debug("Wiping disk %d, offset %s, chunk %s",
984
                      idx, offset, wipe_size)
985

    
986
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
987
                                           wipe_size)
988
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
989
                     (idx, offset, wipe_size))
990

    
991
        now = time.time()
992
        offset += wipe_size
993
        if now - last_output >= 60:
994
          eta = _CalcEta(now - start_time, offset, size)
995
          lu.LogInfo(" - done: %.1f%% ETA: %s",
996
                     offset / float(size) * 100, utils.FormatSeconds(eta))
997
          last_output = now
998
  finally:
999
    logging.info("Resuming synchronization of disks for instance '%s'",
1000
                 instance.name)
1001

    
1002
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1003
                                                    (map(compat.snd, disks),
1004
                                                     instance),
1005
                                                    False)
1006

    
1007
    if result.fail_msg:
1008
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1009
                    node, result.fail_msg)
1010
    else:
1011
      for idx, success in enumerate(result.payload):
1012
        if not success:
1013
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1014
                        " failed", idx, instance.name)
1015

    
1016

    
1017
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1018
  """Wrapper for L{WipeDisks} that handles errors.
1019

1020
  @type lu: L{LogicalUnit}
1021
  @param lu: the logical unit on whose behalf we execute
1022
  @type instance: L{objects.Instance}
1023
  @param instance: the instance whose disks we should wipe
1024
  @param disks: see L{WipeDisks}
1025
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1026
      case of error
1027
  @raise errors.OpPrereqError: in case of failure
1028

1029
  """
1030
  try:
1031
    WipeDisks(lu, instance, disks=disks)
1032
  except errors.OpExecError:
1033
    logging.warning("Wiping disks for instance '%s' failed",
1034
                    instance.name)
1035
    _UndoCreateDisks(lu, cleanup)
1036
    raise
1037

    
1038

    
1039
def ExpandCheckDisks(instance, disks):
1040
  """Return the instance disks selected by the disks list
1041

1042
  @type disks: list of L{objects.Disk} or None
1043
  @param disks: selected disks
1044
  @rtype: list of L{objects.Disk}
1045
  @return: selected instance disks to act on
1046

1047
  """
1048
  if disks is None:
1049
    return instance.disks
1050
  else:
1051
    if not set(disks).issubset(instance.disks):
1052
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1053
                                   " target instance: expected a subset of %r,"
1054
                                   " got %r" % (instance.disks, disks))
1055
    return disks
1056

    
1057

    
1058
def WaitForSync(lu, instance, disks=None, oneshot=False):
1059
  """Sleep and poll for an instance's disk to sync.
1060

1061
  """
1062
  if not instance.disks or disks is not None and not disks:
1063
    return True
1064

    
1065
  disks = ExpandCheckDisks(instance, disks)
1066

    
1067
  if not oneshot:
1068
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1069

    
1070
  node = instance.primary_node
1071

    
1072
  for dev in disks:
1073
    lu.cfg.SetDiskID(dev, node)
1074

    
1075
  # TODO: Convert to utils.Retry
1076

    
1077
  retries = 0
1078
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1079
  while True:
1080
    max_time = 0
1081
    done = True
1082
    cumul_degraded = False
1083
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1084
    msg = rstats.fail_msg
1085
    if msg:
1086
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1087
      retries += 1
1088
      if retries >= 10:
1089
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1090
                                 " aborting." % node)
1091
      time.sleep(6)
1092
      continue
1093
    rstats = rstats.payload
1094
    retries = 0
1095
    for i, mstat in enumerate(rstats):
1096
      if mstat is None:
1097
        lu.LogWarning("Can't compute data for node %s/%s",
1098
                      node, disks[i].iv_name)
1099
        continue
1100

    
1101
      cumul_degraded = (cumul_degraded or
1102
                        (mstat.is_degraded and mstat.sync_percent is None))
1103
      if mstat.sync_percent is not None:
1104
        done = False
1105
        if mstat.estimated_time is not None:
1106
          rem_time = ("%s remaining (estimated)" %
1107
                      utils.FormatSeconds(mstat.estimated_time))
1108
          max_time = mstat.estimated_time
1109
        else:
1110
          rem_time = "no time estimate"
1111
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1112
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1113

    
1114
    # if we're done but degraded, let's do a few small retries, to
1115
    # make sure we see a stable and not transient situation; therefore
1116
    # we force restart of the loop
1117
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1118
      logging.info("Degraded disks found, %d retries left", degr_retries)
1119
      degr_retries -= 1
1120
      time.sleep(1)
1121
      continue
1122

    
1123
    if done or oneshot:
1124
      break
1125

    
1126
    time.sleep(min(60, max_time))
1127

    
1128
  if done:
1129
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1130

    
1131
  return not cumul_degraded
1132

    
1133

    
1134
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1135
  """Shutdown block devices of an instance.
1136

1137
  This does the shutdown on all nodes of the instance.
1138

1139
  If the ignore_primary is false, errors on the primary node are
1140
  ignored.
1141

1142
  """
1143
  all_result = True
1144

    
1145
  if disks is None:
1146
    # only mark instance disks as inactive if all disks are affected
1147
    lu.cfg.MarkInstanceDisksInactive(instance.name)
1148

    
1149
  disks = ExpandCheckDisks(instance, disks)
1150

    
1151
  for disk in disks:
1152
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1153
      lu.cfg.SetDiskID(top_disk, node)
1154
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1155
      msg = result.fail_msg
1156
      if msg:
1157
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1158
                      disk.iv_name, node, msg)
1159
        if ((node == instance.primary_node and not ignore_primary) or
1160
            (node != instance.primary_node and not result.offline)):
1161
          all_result = False
1162
  return all_result
1163

    
1164

    
1165
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1166
  """Shutdown block devices of an instance.
1167

1168
  This function checks if an instance is running, before calling
1169
  _ShutdownInstanceDisks.
1170

1171
  """
1172
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1173
  ShutdownInstanceDisks(lu, instance, disks=disks)
1174

    
1175

    
1176
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1177
                          ignore_size=False):
1178
  """Prepare the block devices for an instance.
1179

1180
  This sets up the block devices on all nodes.
1181

1182
  @type lu: L{LogicalUnit}
1183
  @param lu: the logical unit on whose behalf we execute
1184
  @type instance: L{objects.Instance}
1185
  @param instance: the instance for whose disks we assemble
1186
  @type disks: list of L{objects.Disk} or None
1187
  @param disks: which disks to assemble (or all, if None)
1188
  @type ignore_secondaries: boolean
1189
  @param ignore_secondaries: if true, errors on secondary nodes
1190
      won't result in an error return from the function
1191
  @type ignore_size: boolean
1192
  @param ignore_size: if true, the current known size of the disk
1193
      will not be used during the disk activation, useful for cases
1194
      when the size is wrong
1195
  @return: False if the operation failed, otherwise a list of
1196
      (host, instance_visible_name, node_visible_name)
1197
      with the mapping from node devices to instance devices
1198

1199
  """
1200
  device_info = []
1201
  disks_ok = True
1202
  iname = instance.name
1203

    
1204
  if disks is None:
1205
    # only mark instance disks as active if all disks are affected
1206
    lu.cfg.MarkInstanceDisksActive(iname)
1207

    
1208
  disks = ExpandCheckDisks(instance, disks)
1209

    
1210
  # With the two passes mechanism we try to reduce the window of
1211
  # opportunity for the race condition of switching DRBD to primary
1212
  # before handshaking occured, but we do not eliminate it
1213

    
1214
  # The proper fix would be to wait (with some limits) until the
1215
  # connection has been made and drbd transitions from WFConnection
1216
  # into any other network-connected state (Connected, SyncTarget,
1217
  # SyncSource, etc.)
1218

    
1219
  # 1st pass, assemble on all nodes in secondary mode
1220
  for idx, inst_disk in enumerate(disks):
1221
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1222
      if ignore_size:
1223
        node_disk = node_disk.Copy()
1224
        node_disk.UnsetSize()
1225
      lu.cfg.SetDiskID(node_disk, node)
1226
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1227
                                             False, idx)
1228
      msg = result.fail_msg
1229
      if msg:
1230
        is_offline_secondary = (node in instance.secondary_nodes and
1231
                                result.offline)
1232
        lu.LogWarning("Could not prepare block device %s on node %s"
1233
                      " (is_primary=False, pass=1): %s",
1234
                      inst_disk.iv_name, node, msg)
1235
        if not (ignore_secondaries or is_offline_secondary):
1236
          disks_ok = False
1237

    
1238
  # FIXME: race condition on drbd migration to primary
1239

    
1240
  # 2nd pass, do only the primary node
1241
  for idx, inst_disk in enumerate(disks):
1242
    dev_path = None
1243

    
1244
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1245
      if node != instance.primary_node:
1246
        continue
1247
      if ignore_size:
1248
        node_disk = node_disk.Copy()
1249
        node_disk.UnsetSize()
1250
      lu.cfg.SetDiskID(node_disk, node)
1251
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252
                                             True, idx)
1253
      msg = result.fail_msg
1254
      if msg:
1255
        lu.LogWarning("Could not prepare block device %s on node %s"
1256
                      " (is_primary=True, pass=2): %s",
1257
                      inst_disk.iv_name, node, msg)
1258
        disks_ok = False
1259
      else:
1260
        dev_path, _ = result.payload
1261

    
1262
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1263

    
1264
  # leave the disks configured for the primary node
1265
  # this is a workaround that would be fixed better by
1266
  # improving the logical/physical id handling
1267
  for disk in disks:
1268
    lu.cfg.SetDiskID(disk, instance.primary_node)
1269

    
1270
  if not disks_ok:
1271
    lu.cfg.MarkInstanceDisksInactive(iname)
1272

    
1273
  return disks_ok, device_info
1274

    
1275

    
1276
def StartInstanceDisks(lu, instance, force):
1277
  """Start the disks of an instance.
1278

1279
  """
1280
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1281
                                      ignore_secondaries=force)
1282
  if not disks_ok:
1283
    ShutdownInstanceDisks(lu, instance)
1284
    if force is not None and not force:
1285
      lu.LogWarning("",
1286
                    hint=("If the message above refers to a secondary node,"
1287
                          " you can retry the operation using '--force'"))
1288
    raise errors.OpExecError("Disk consistency error")
1289

    
1290

    
1291
class LUInstanceGrowDisk(LogicalUnit):
1292
  """Grow a disk of an instance.
1293

1294
  """
1295
  HPATH = "disk-grow"
1296
  HTYPE = constants.HTYPE_INSTANCE
1297
  REQ_BGL = False
1298

    
1299
  def ExpandNames(self):
1300
    self._ExpandAndLockInstance()
1301
    self.needed_locks[locking.LEVEL_NODE] = []
1302
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1303
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1304
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1305

    
1306
  def DeclareLocks(self, level):
1307
    if level == locking.LEVEL_NODE:
1308
      self._LockInstancesNodes()
1309
    elif level == locking.LEVEL_NODE_RES:
1310
      # Copy node locks
1311
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1312
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1313

    
1314
  def BuildHooksEnv(self):
1315
    """Build hooks env.
1316

1317
    This runs on the master, the primary and all the secondaries.
1318

1319
    """
1320
    env = {
1321
      "DISK": self.op.disk,
1322
      "AMOUNT": self.op.amount,
1323
      "ABSOLUTE": self.op.absolute,
1324
      }
1325
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1326
    return env
1327

    
1328
  def BuildHooksNodes(self):
1329
    """Build hooks nodes.
1330

1331
    """
1332
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1333
    return (nl, nl)
1334

    
1335
  def CheckPrereq(self):
1336
    """Check prerequisites.
1337

1338
    This checks that the instance is in the cluster.
1339

1340
    """
1341
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1342
    assert instance is not None, \
1343
      "Cannot retrieve locked instance %s" % self.op.instance_name
1344
    nodenames = list(instance.all_nodes)
1345
    for node in nodenames:
1346
      CheckNodeOnline(self, node)
1347

    
1348
    self.instance = instance
1349

    
1350
    if instance.disk_template not in constants.DTS_GROWABLE:
1351
      raise errors.OpPrereqError("Instance's disk layout does not support"
1352
                                 " growing", errors.ECODE_INVAL)
1353

    
1354
    self.disk = instance.FindDisk(self.op.disk)
1355

    
1356
    if self.op.absolute:
1357
      self.target = self.op.amount
1358
      self.delta = self.target - self.disk.size
1359
      if self.delta < 0:
1360
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1361
                                   "current disk size (%s)" %
1362
                                   (utils.FormatUnit(self.target, "h"),
1363
                                    utils.FormatUnit(self.disk.size, "h")),
1364
                                   errors.ECODE_STATE)
1365
    else:
1366
      self.delta = self.op.amount
1367
      self.target = self.disk.size + self.delta
1368
      if self.delta < 0:
1369
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1370
                                   utils.FormatUnit(self.delta, "h"),
1371
                                   errors.ECODE_INVAL)
1372

    
1373
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1374

    
1375
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1376
    template = self.instance.disk_template
1377
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1378
      # TODO: check the free disk space for file, when that feature will be
1379
      # supported
1380
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1381
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1382
                        nodes)
1383
      if es_nodes:
1384
        # With exclusive storage we need to something smarter than just looking
1385
        # at free space; for now, let's simply abort the operation.
1386
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1387
                                   " is enabled", errors.ECODE_STATE)
1388
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1389

    
1390
  def Exec(self, feedback_fn):
1391
    """Execute disk grow.
1392

1393
    """
1394
    instance = self.instance
1395
    disk = self.disk
1396

    
1397
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1398
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1399
            self.owned_locks(locking.LEVEL_NODE_RES))
1400

    
1401
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1402

    
1403
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1404
    if not disks_ok:
1405
      raise errors.OpExecError("Cannot activate block device to grow")
1406

    
1407
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1408
                (self.op.disk, instance.name,
1409
                 utils.FormatUnit(self.delta, "h"),
1410
                 utils.FormatUnit(self.target, "h")))
1411

    
1412
    # First run all grow ops in dry-run mode
1413
    for node in instance.all_nodes:
1414
      self.cfg.SetDiskID(disk, node)
1415
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1416
                                           True, True)
1417
      result.Raise("Dry-run grow request failed to node %s" % node)
1418

    
1419
    if wipe_disks:
1420
      # Get disk size from primary node for wiping
1421
      self.cfg.SetDiskID(disk, instance.primary_node)
1422
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1423
      result.Raise("Failed to retrieve disk size from node '%s'" %
1424
                   instance.primary_node)
1425

    
1426
      (disk_size_in_bytes, ) = result.payload
1427

    
1428
      if disk_size_in_bytes is None:
1429
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1430
                                 " node '%s'" % instance.primary_node)
1431

    
1432
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1433

    
1434
      assert old_disk_size >= disk.size, \
1435
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1436
         (old_disk_size, disk.size))
1437
    else:
1438
      old_disk_size = None
1439

    
1440
    # We know that (as far as we can test) operations across different
1441
    # nodes will succeed, time to run it for real on the backing storage
1442
    for node in instance.all_nodes:
1443
      self.cfg.SetDiskID(disk, node)
1444
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1445
                                           False, True)
1446
      result.Raise("Grow request failed to node %s" % node)
1447

    
1448
    # And now execute it for logical storage, on the primary node
1449
    node = instance.primary_node
1450
    self.cfg.SetDiskID(disk, node)
1451
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1452
                                         False, False)
1453
    result.Raise("Grow request failed to node %s" % node)
1454

    
1455
    disk.RecordGrow(self.delta)
1456
    self.cfg.Update(instance, feedback_fn)
1457

    
1458
    # Changes have been recorded, release node lock
1459
    ReleaseLocks(self, locking.LEVEL_NODE)
1460

    
1461
    # Downgrade lock while waiting for sync
1462
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1463

    
1464
    assert wipe_disks ^ (old_disk_size is None)
1465

    
1466
    if wipe_disks:
1467
      assert instance.disks[self.op.disk] == disk
1468

    
1469
      # Wipe newly added disk space
1470
      WipeDisks(self, instance,
1471
                disks=[(self.op.disk, disk, old_disk_size)])
1472

    
1473
    if self.op.wait_for_sync:
1474
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1475
      if disk_abort:
1476
        self.LogWarning("Disk syncing has not returned a good status; check"
1477
                        " the instance")
1478
      if not instance.disks_active:
1479
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1480
    elif not instance.disks_active:
1481
      self.LogWarning("Not shutting down the disk even if the instance is"
1482
                      " not supposed to be running because no wait for"
1483
                      " sync mode was requested")
1484

    
1485
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1486
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1487

    
1488

    
1489
class LUInstanceReplaceDisks(LogicalUnit):
1490
  """Replace the disks of an instance.
1491

1492
  """
1493
  HPATH = "mirrors-replace"
1494
  HTYPE = constants.HTYPE_INSTANCE
1495
  REQ_BGL = False
1496

    
1497
  def CheckArguments(self):
1498
    """Check arguments.
1499

1500
    """
1501
    remote_node = self.op.remote_node
1502
    ialloc = self.op.iallocator
1503
    if self.op.mode == constants.REPLACE_DISK_CHG:
1504
      if remote_node is None and ialloc is None:
1505
        raise errors.OpPrereqError("When changing the secondary either an"
1506
                                   " iallocator script must be used or the"
1507
                                   " new node given", errors.ECODE_INVAL)
1508
      else:
1509
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1510

    
1511
    elif remote_node is not None or ialloc is not None:
1512
      # Not replacing the secondary
1513
      raise errors.OpPrereqError("The iallocator and new node options can"
1514
                                 " only be used when changing the"
1515
                                 " secondary node", errors.ECODE_INVAL)
1516

    
1517
  def ExpandNames(self):
1518
    self._ExpandAndLockInstance()
1519

    
1520
    assert locking.LEVEL_NODE not in self.needed_locks
1521
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1522
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1523

    
1524
    assert self.op.iallocator is None or self.op.remote_node is None, \
1525
      "Conflicting options"
1526

    
1527
    if self.op.remote_node is not None:
1528
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1529

    
1530
      # Warning: do not remove the locking of the new secondary here
1531
      # unless DRBD8.AddChildren is changed to work in parallel;
1532
      # currently it doesn't since parallel invocations of
1533
      # FindUnusedMinor will conflict
1534
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1535
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1536
    else:
1537
      self.needed_locks[locking.LEVEL_NODE] = []
1538
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1539

    
1540
      if self.op.iallocator is not None:
1541
        # iallocator will select a new node in the same group
1542
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1543
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1544

    
1545
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1546

    
1547
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1548
                                   self.op.iallocator, self.op.remote_node,
1549
                                   self.op.disks, self.op.early_release,
1550
                                   self.op.ignore_ipolicy)
1551

    
1552
    self.tasklets = [self.replacer]
1553

    
1554
  def DeclareLocks(self, level):
1555
    if level == locking.LEVEL_NODEGROUP:
1556
      assert self.op.remote_node is None
1557
      assert self.op.iallocator is not None
1558
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1559

    
1560
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1561
      # Lock all groups used by instance optimistically; this requires going
1562
      # via the node before it's locked, requiring verification later on
1563
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1564
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1565

    
1566
    elif level == locking.LEVEL_NODE:
1567
      if self.op.iallocator is not None:
1568
        assert self.op.remote_node is None
1569
        assert not self.needed_locks[locking.LEVEL_NODE]
1570
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1571

    
1572
        # Lock member nodes of all locked groups
1573
        self.needed_locks[locking.LEVEL_NODE] = \
1574
          [node_name
1575
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1576
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1577
      else:
1578
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1579

    
1580
        self._LockInstancesNodes()
1581

    
1582
    elif level == locking.LEVEL_NODE_RES:
1583
      # Reuse node locks
1584
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1585
        self.needed_locks[locking.LEVEL_NODE]
1586

    
1587
  def BuildHooksEnv(self):
1588
    """Build hooks env.
1589

1590
    This runs on the master, the primary and all the secondaries.
1591

1592
    """
1593
    instance = self.replacer.instance
1594
    env = {
1595
      "MODE": self.op.mode,
1596
      "NEW_SECONDARY": self.op.remote_node,
1597
      "OLD_SECONDARY": instance.secondary_nodes[0],
1598
      }
1599
    env.update(BuildInstanceHookEnvByObject(self, instance))
1600
    return env
1601

    
1602
  def BuildHooksNodes(self):
1603
    """Build hooks nodes.
1604

1605
    """
1606
    instance = self.replacer.instance
1607
    nl = [
1608
      self.cfg.GetMasterNode(),
1609
      instance.primary_node,
1610
      ]
1611
    if self.op.remote_node is not None:
1612
      nl.append(self.op.remote_node)
1613
    return nl, nl
1614

    
1615
  def CheckPrereq(self):
1616
    """Check prerequisites.
1617

1618
    """
1619
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1620
            self.op.iallocator is None)
1621

    
1622
    # Verify if node group locks are still correct
1623
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1624
    if owned_groups:
1625
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1626

    
1627
    return LogicalUnit.CheckPrereq(self)
1628

    
1629

    
1630
class LUInstanceActivateDisks(NoHooksLU):
1631
  """Bring up an instance's disks.
1632

1633
  """
1634
  REQ_BGL = False
1635

    
1636
  def ExpandNames(self):
1637
    self._ExpandAndLockInstance()
1638
    self.needed_locks[locking.LEVEL_NODE] = []
1639
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1640

    
1641
  def DeclareLocks(self, level):
1642
    if level == locking.LEVEL_NODE:
1643
      self._LockInstancesNodes()
1644

    
1645
  def CheckPrereq(self):
1646
    """Check prerequisites.
1647

1648
    This checks that the instance is in the cluster.
1649

1650
    """
1651
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1652
    assert self.instance is not None, \
1653
      "Cannot retrieve locked instance %s" % self.op.instance_name
1654
    CheckNodeOnline(self, self.instance.primary_node)
1655

    
1656
  def Exec(self, feedback_fn):
1657
    """Activate the disks.
1658

1659
    """
1660
    disks_ok, disks_info = \
1661
              AssembleInstanceDisks(self, self.instance,
1662
                                    ignore_size=self.op.ignore_size)
1663
    if not disks_ok:
1664
      raise errors.OpExecError("Cannot activate block devices")
1665

    
1666
    if self.op.wait_for_sync:
1667
      if not WaitForSync(self, self.instance):
1668
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1669
        raise errors.OpExecError("Some disks of the instance are degraded!")
1670

    
1671
    return disks_info
1672

    
1673

    
1674
class LUInstanceDeactivateDisks(NoHooksLU):
1675
  """Shutdown an instance's disks.
1676

1677
  """
1678
  REQ_BGL = False
1679

    
1680
  def ExpandNames(self):
1681
    self._ExpandAndLockInstance()
1682
    self.needed_locks[locking.LEVEL_NODE] = []
1683
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1684

    
1685
  def DeclareLocks(self, level):
1686
    if level == locking.LEVEL_NODE:
1687
      self._LockInstancesNodes()
1688

    
1689
  def CheckPrereq(self):
1690
    """Check prerequisites.
1691

1692
    This checks that the instance is in the cluster.
1693

1694
    """
1695
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1696
    assert self.instance is not None, \
1697
      "Cannot retrieve locked instance %s" % self.op.instance_name
1698

    
1699
  def Exec(self, feedback_fn):
1700
    """Deactivate the disks
1701

1702
    """
1703
    instance = self.instance
1704
    if self.op.force:
1705
      ShutdownInstanceDisks(self, instance)
1706
    else:
1707
      _SafeShutdownInstanceDisks(self, instance)
1708

    
1709

    
1710
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1711
                               ldisk=False):
1712
  """Check that mirrors are not degraded.
1713

1714
  @attention: The device has to be annotated already.
1715

1716
  The ldisk parameter, if True, will change the test from the
1717
  is_degraded attribute (which represents overall non-ok status for
1718
  the device(s)) to the ldisk (representing the local storage status).
1719

1720
  """
1721
  lu.cfg.SetDiskID(dev, node)
1722

    
1723
  result = True
1724

    
1725
  if on_primary or dev.AssembleOnSecondary():
1726
    rstats = lu.rpc.call_blockdev_find(node, dev)
1727
    msg = rstats.fail_msg
1728
    if msg:
1729
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1730
      result = False
1731
    elif not rstats.payload:
1732
      lu.LogWarning("Can't find disk on node %s", node)
1733
      result = False
1734
    else:
1735
      if ldisk:
1736
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1737
      else:
1738
        result = result and not rstats.payload.is_degraded
1739

    
1740
  if dev.children:
1741
    for child in dev.children:
1742
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1743
                                                     on_primary)
1744

    
1745
  return result
1746

    
1747

    
1748
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1749
  """Wrapper around L{_CheckDiskConsistencyInner}.
1750

1751
  """
1752
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1753
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1754
                                    ldisk=ldisk)
1755

    
1756

    
1757
def _BlockdevFind(lu, node, dev, instance):
1758
  """Wrapper around call_blockdev_find to annotate diskparams.
1759

1760
  @param lu: A reference to the lu object
1761
  @param node: The node to call out
1762
  @param dev: The device to find
1763
  @param instance: The instance object the device belongs to
1764
  @returns The result of the rpc call
1765

1766
  """
1767
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1768
  return lu.rpc.call_blockdev_find(node, disk)
1769

    
1770

    
1771
def _GenerateUniqueNames(lu, exts):
1772
  """Generate a suitable LV name.
1773

1774
  This will generate a logical volume name for the given instance.
1775

1776
  """
1777
  results = []
1778
  for val in exts:
1779
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1780
    results.append("%s%s" % (new_id, val))
1781
  return results
1782

    
1783

    
1784
class TLReplaceDisks(Tasklet):
1785
  """Replaces disks for an instance.
1786

1787
  Note: Locking is not within the scope of this class.
1788

1789
  """
1790
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1791
               disks, early_release, ignore_ipolicy):
1792
    """Initializes this class.
1793

1794
    """
1795
    Tasklet.__init__(self, lu)
1796

    
1797
    # Parameters
1798
    self.instance_name = instance_name
1799
    self.mode = mode
1800
    self.iallocator_name = iallocator_name
1801
    self.remote_node = remote_node
1802
    self.disks = disks
1803
    self.early_release = early_release
1804
    self.ignore_ipolicy = ignore_ipolicy
1805

    
1806
    # Runtime data
1807
    self.instance = None
1808
    self.new_node = None
1809
    self.target_node = None
1810
    self.other_node = None
1811
    self.remote_node_info = None
1812
    self.node_secondary_ip = None
1813

    
1814
  @staticmethod
1815
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1816
    """Compute a new secondary node using an IAllocator.
1817

1818
    """
1819
    req = iallocator.IAReqRelocate(name=instance_name,
1820
                                   relocate_from=list(relocate_from))
1821
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1822

    
1823
    ial.Run(iallocator_name)
1824

    
1825
    if not ial.success:
1826
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1827
                                 " %s" % (iallocator_name, ial.info),
1828
                                 errors.ECODE_NORES)
1829

    
1830
    remote_node_name = ial.result[0]
1831

    
1832
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1833
               instance_name, remote_node_name)
1834

    
1835
    return remote_node_name
1836

    
1837
  def _FindFaultyDisks(self, node_name):
1838
    """Wrapper for L{FindFaultyInstanceDisks}.
1839

1840
    """
1841
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1842
                                   node_name, True)
1843

    
1844
  def _CheckDisksActivated(self, instance):
1845
    """Checks if the instance disks are activated.
1846

1847
    @param instance: The instance to check disks
1848
    @return: True if they are activated, False otherwise
1849

1850
    """
1851
    nodes = instance.all_nodes
1852

    
1853
    for idx, dev in enumerate(instance.disks):
1854
      for node in nodes:
1855
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1856
        self.cfg.SetDiskID(dev, node)
1857

    
1858
        result = _BlockdevFind(self, node, dev, instance)
1859

    
1860
        if result.offline:
1861
          continue
1862
        elif result.fail_msg or not result.payload:
1863
          return False
1864

    
1865
    return True
1866

    
1867
  def CheckPrereq(self):
1868
    """Check prerequisites.
1869

1870
    This checks that the instance is in the cluster.
1871

1872
    """
1873
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1874
    assert instance is not None, \
1875
      "Cannot retrieve locked instance %s" % self.instance_name
1876

    
1877
    if instance.disk_template != constants.DT_DRBD8:
1878
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1879
                                 " instances", errors.ECODE_INVAL)
1880

    
1881
    if len(instance.secondary_nodes) != 1:
1882
      raise errors.OpPrereqError("The instance has a strange layout,"
1883
                                 " expected one secondary but found %d" %
1884
                                 len(instance.secondary_nodes),
1885
                                 errors.ECODE_FAULT)
1886

    
1887
    instance = self.instance
1888
    secondary_node = instance.secondary_nodes[0]
1889

    
1890
    if self.iallocator_name is None:
1891
      remote_node = self.remote_node
1892
    else:
1893
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1894
                                       instance.name, instance.secondary_nodes)
1895

    
1896
    if remote_node is None:
1897
      self.remote_node_info = None
1898
    else:
1899
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1900
             "Remote node '%s' is not locked" % remote_node
1901

    
1902
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1903
      assert self.remote_node_info is not None, \
1904
        "Cannot retrieve locked node %s" % remote_node
1905

    
1906
    if remote_node == self.instance.primary_node:
1907
      raise errors.OpPrereqError("The specified node is the primary node of"
1908
                                 " the instance", errors.ECODE_INVAL)
1909

    
1910
    if remote_node == secondary_node:
1911
      raise errors.OpPrereqError("The specified node is already the"
1912
                                 " secondary node of the instance",
1913
                                 errors.ECODE_INVAL)
1914

    
1915
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1916
                                    constants.REPLACE_DISK_CHG):
1917
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1918
                                 errors.ECODE_INVAL)
1919

    
1920
    if self.mode == constants.REPLACE_DISK_AUTO:
1921
      if not self._CheckDisksActivated(instance):
1922
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1923
                                   " first" % self.instance_name,
1924
                                   errors.ECODE_STATE)
1925
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1926
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1927

    
1928
      if faulty_primary and faulty_secondary:
1929
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1930
                                   " one node and can not be repaired"
1931
                                   " automatically" % self.instance_name,
1932
                                   errors.ECODE_STATE)
1933

    
1934
      if faulty_primary:
1935
        self.disks = faulty_primary
1936
        self.target_node = instance.primary_node
1937
        self.other_node = secondary_node
1938
        check_nodes = [self.target_node, self.other_node]
1939
      elif faulty_secondary:
1940
        self.disks = faulty_secondary
1941
        self.target_node = secondary_node
1942
        self.other_node = instance.primary_node
1943
        check_nodes = [self.target_node, self.other_node]
1944
      else:
1945
        self.disks = []
1946
        check_nodes = []
1947

    
1948
    else:
1949
      # Non-automatic modes
1950
      if self.mode == constants.REPLACE_DISK_PRI:
1951
        self.target_node = instance.primary_node
1952
        self.other_node = secondary_node
1953
        check_nodes = [self.target_node, self.other_node]
1954

    
1955
      elif self.mode == constants.REPLACE_DISK_SEC:
1956
        self.target_node = secondary_node
1957
        self.other_node = instance.primary_node
1958
        check_nodes = [self.target_node, self.other_node]
1959

    
1960
      elif self.mode == constants.REPLACE_DISK_CHG:
1961
        self.new_node = remote_node
1962
        self.other_node = instance.primary_node
1963
        self.target_node = secondary_node
1964
        check_nodes = [self.new_node, self.other_node]
1965

    
1966
        CheckNodeNotDrained(self.lu, remote_node)
1967
        CheckNodeVmCapable(self.lu, remote_node)
1968

    
1969
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1970
        assert old_node_info is not None
1971
        if old_node_info.offline and not self.early_release:
1972
          # doesn't make sense to delay the release
1973
          self.early_release = True
1974
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1975
                          " early-release mode", secondary_node)
1976

    
1977
      else:
1978
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1979
                                     self.mode)
1980

    
1981
      # If not specified all disks should be replaced
1982
      if not self.disks:
1983
        self.disks = range(len(self.instance.disks))
1984

    
1985
    # TODO: This is ugly, but right now we can't distinguish between internal
1986
    # submitted opcode and external one. We should fix that.
1987
    if self.remote_node_info:
1988
      # We change the node, lets verify it still meets instance policy
1989
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1990
      cluster = self.cfg.GetClusterInfo()
1991
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1992
                                                              new_group_info)
1993
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1994
                             self.cfg, ignore=self.ignore_ipolicy)
1995

    
1996
    for node in check_nodes:
1997
      CheckNodeOnline(self.lu, node)
1998

    
1999
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2000
                                                          self.other_node,
2001
                                                          self.target_node]
2002
                              if node_name is not None)
2003

    
2004
    # Release unneeded node and node resource locks
2005
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2006
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2007
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2008

    
2009
    # Release any owned node group
2010
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2011

    
2012
    # Check whether disks are valid
2013
    for disk_idx in self.disks:
2014
      instance.FindDisk(disk_idx)
2015

    
2016
    # Get secondary node IP addresses
2017
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2018
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2019

    
2020
  def Exec(self, feedback_fn):
2021
    """Execute disk replacement.
2022

2023
    This dispatches the disk replacement to the appropriate handler.
2024

2025
    """
2026
    if __debug__:
2027
      # Verify owned locks before starting operation
2028
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2029
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2030
          ("Incorrect node locks, owning %s, expected %s" %
2031
           (owned_nodes, self.node_secondary_ip.keys()))
2032
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2033
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2034
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2035

    
2036
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2037
      assert list(owned_instances) == [self.instance_name], \
2038
          "Instance '%s' not locked" % self.instance_name
2039

    
2040
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2041
          "Should not own any node group lock at this point"
2042

    
2043
    if not self.disks:
2044
      feedback_fn("No disks need replacement for instance '%s'" %
2045
                  self.instance.name)
2046
      return
2047

    
2048
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2049
                (utils.CommaJoin(self.disks), self.instance.name))
2050
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2051
    feedback_fn("Current seconary node: %s" %
2052
                utils.CommaJoin(self.instance.secondary_nodes))
2053

    
2054
    activate_disks = not self.instance.disks_active
2055

    
2056
    # Activate the instance disks if we're replacing them on a down instance
2057
    if activate_disks:
2058
      StartInstanceDisks(self.lu, self.instance, True)
2059

    
2060
    try:
2061
      # Should we replace the secondary node?
2062
      if self.new_node is not None:
2063
        fn = self._ExecDrbd8Secondary
2064
      else:
2065
        fn = self._ExecDrbd8DiskOnly
2066

    
2067
      result = fn(feedback_fn)
2068
    finally:
2069
      # Deactivate the instance disks if we're replacing them on a
2070
      # down instance
2071
      if activate_disks:
2072
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2073

    
2074
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2075

    
2076
    if __debug__:
2077
      # Verify owned locks
2078
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2079
      nodes = frozenset(self.node_secondary_ip)
2080
      assert ((self.early_release and not owned_nodes) or
2081
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2082
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2083
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2084

    
2085
    return result
2086

    
2087
  def _CheckVolumeGroup(self, nodes):
2088
    self.lu.LogInfo("Checking volume groups")
2089

    
2090
    vgname = self.cfg.GetVGName()
2091

    
2092
    # Make sure volume group exists on all involved nodes
2093
    results = self.rpc.call_vg_list(nodes)
2094
    if not results:
2095
      raise errors.OpExecError("Can't list volume groups on the nodes")
2096

    
2097
    for node in nodes:
2098
      res = results[node]
2099
      res.Raise("Error checking node %s" % node)
2100
      if vgname not in res.payload:
2101
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2102
                                 (vgname, node))
2103

    
2104
  def _CheckDisksExistence(self, nodes):
2105
    # Check disk existence
2106
    for idx, dev in enumerate(self.instance.disks):
2107
      if idx not in self.disks:
2108
        continue
2109

    
2110
      for node in nodes:
2111
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2112
        self.cfg.SetDiskID(dev, node)
2113

    
2114
        result = _BlockdevFind(self, node, dev, self.instance)
2115

    
2116
        msg = result.fail_msg
2117
        if msg or not result.payload:
2118
          if not msg:
2119
            msg = "disk not found"
2120
          if not self._CheckDisksActivated(self.instance):
2121
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2122
                          " running activate-disks on the instance before"
2123
                          " using replace-disks.")
2124
          else:
2125
            extra_hint = ""
2126
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2127
                                   (idx, node, msg, extra_hint))
2128

    
2129
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2130
    for idx, dev in enumerate(self.instance.disks):
2131
      if idx not in self.disks:
2132
        continue
2133

    
2134
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2135
                      (idx, node_name))
2136

    
2137
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2138
                                  on_primary, ldisk=ldisk):
2139
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2140
                                 " replace disks for instance %s" %
2141
                                 (node_name, self.instance.name))
2142

    
2143
  def _CreateNewStorage(self, node_name):
2144
    """Create new storage on the primary or secondary node.
2145

2146
    This is only used for same-node replaces, not for changing the
2147
    secondary node, hence we don't want to modify the existing disk.
2148

2149
    """
2150
    iv_names = {}
2151

    
2152
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2153
    for idx, dev in enumerate(disks):
2154
      if idx not in self.disks:
2155
        continue
2156

    
2157
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2158

    
2159
      self.cfg.SetDiskID(dev, node_name)
2160

    
2161
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2162
      names = _GenerateUniqueNames(self.lu, lv_names)
2163

    
2164
      (data_disk, meta_disk) = dev.children
2165
      vg_data = data_disk.logical_id[0]
2166
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2167
                             logical_id=(vg_data, names[0]),
2168
                             params=data_disk.params)
2169
      vg_meta = meta_disk.logical_id[0]
2170
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2171
                             size=constants.DRBD_META_SIZE,
2172
                             logical_id=(vg_meta, names[1]),
2173
                             params=meta_disk.params)
2174

    
2175
      new_lvs = [lv_data, lv_meta]
2176
      old_lvs = [child.Copy() for child in dev.children]
2177
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2178
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2179

    
2180
      # we pass force_create=True to force the LVM creation
2181
      for new_lv in new_lvs:
2182
        try:
2183
          _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2184
                               GetInstanceInfoText(self.instance), False,
2185
                               excl_stor)
2186
        except errors.DeviceCreationError, e:
2187
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2188

    
2189
    return iv_names
2190

    
2191
  def _CheckDevices(self, node_name, iv_names):
2192
    for name, (dev, _, _) in iv_names.iteritems():
2193
      self.cfg.SetDiskID(dev, node_name)
2194

    
2195
      result = _BlockdevFind(self, node_name, dev, self.instance)
2196

    
2197
      msg = result.fail_msg
2198
      if msg or not result.payload:
2199
        if not msg:
2200
          msg = "disk not found"
2201
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2202
                                 (name, msg))
2203

    
2204
      if result.payload.is_degraded:
2205
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2206

    
2207
  def _RemoveOldStorage(self, node_name, iv_names):
2208
    for name, (_, old_lvs, _) in iv_names.iteritems():
2209
      self.lu.LogInfo("Remove logical volumes for %s", name)
2210

    
2211
      for lv in old_lvs:
2212
        self.cfg.SetDiskID(lv, node_name)
2213

    
2214
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2215
        if msg:
2216
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2217
                             hint="remove unused LVs manually")
2218

    
2219
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2220
    """Replace a disk on the primary or secondary for DRBD 8.
2221

2222
    The algorithm for replace is quite complicated:
2223

2224
      1. for each disk to be replaced:
2225

2226
        1. create new LVs on the target node with unique names
2227
        1. detach old LVs from the drbd device
2228
        1. rename old LVs to name_replaced.<time_t>
2229
        1. rename new LVs to old LVs
2230
        1. attach the new LVs (with the old names now) to the drbd device
2231

2232
      1. wait for sync across all devices
2233

2234
      1. for each modified disk:
2235

2236
        1. remove old LVs (which have the name name_replaces.<time_t>)
2237

2238
    Failures are not very well handled.
2239

2240
    """
2241
    steps_total = 6
2242

    
2243
    # Step: check device activation
2244
    self.lu.LogStep(1, steps_total, "Check device existence")
2245
    self._CheckDisksExistence([self.other_node, self.target_node])
2246
    self._CheckVolumeGroup([self.target_node, self.other_node])
2247

    
2248
    # Step: check other node consistency
2249
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2250
    self._CheckDisksConsistency(self.other_node,
2251
                                self.other_node == self.instance.primary_node,
2252
                                False)
2253

    
2254
    # Step: create new storage
2255
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2256
    iv_names = self._CreateNewStorage(self.target_node)
2257

    
2258
    # Step: for each lv, detach+rename*2+attach
2259
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2260
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2261
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2262

    
2263
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2264
                                                     old_lvs)
2265
      result.Raise("Can't detach drbd from local storage on node"
2266
                   " %s for device %s" % (self.target_node, dev.iv_name))
2267
      #dev.children = []
2268
      #cfg.Update(instance)
2269

    
2270
      # ok, we created the new LVs, so now we know we have the needed
2271
      # storage; as such, we proceed on the target node to rename
2272
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2273
      # using the assumption that logical_id == physical_id (which in
2274
      # turn is the unique_id on that node)
2275

    
2276
      # FIXME(iustin): use a better name for the replaced LVs
2277
      temp_suffix = int(time.time())
2278
      ren_fn = lambda d, suff: (d.physical_id[0],
2279
                                d.physical_id[1] + "_replaced-%s" % suff)
2280

    
2281
      # Build the rename list based on what LVs exist on the node
2282
      rename_old_to_new = []
2283
      for to_ren in old_lvs:
2284
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2285
        if not result.fail_msg and result.payload:
2286
          # device exists
2287
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2288

    
2289
      self.lu.LogInfo("Renaming the old LVs on the target node")
2290
      result = self.rpc.call_blockdev_rename(self.target_node,
2291
                                             rename_old_to_new)
2292
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2293

    
2294
      # Now we rename the new LVs to the old LVs
2295
      self.lu.LogInfo("Renaming the new LVs on the target node")
2296
      rename_new_to_old = [(new, old.physical_id)
2297
                           for old, new in zip(old_lvs, new_lvs)]
2298
      result = self.rpc.call_blockdev_rename(self.target_node,
2299
                                             rename_new_to_old)
2300
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2301

    
2302
      # Intermediate steps of in memory modifications
2303
      for old, new in zip(old_lvs, new_lvs):
2304
        new.logical_id = old.logical_id
2305
        self.cfg.SetDiskID(new, self.target_node)
2306

    
2307
      # We need to modify old_lvs so that removal later removes the
2308
      # right LVs, not the newly added ones; note that old_lvs is a
2309
      # copy here
2310
      for disk in old_lvs:
2311
        disk.logical_id = ren_fn(disk, temp_suffix)
2312
        self.cfg.SetDiskID(disk, self.target_node)
2313

    
2314
      # Now that the new lvs have the old name, we can add them to the device
2315
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2316
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2317
                                                  (dev, self.instance), new_lvs)
2318
      msg = result.fail_msg
2319
      if msg:
2320
        for new_lv in new_lvs:
2321
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2322
                                               new_lv).fail_msg
2323
          if msg2:
2324
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2325
                               hint=("cleanup manually the unused logical"
2326
                                     "volumes"))
2327
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2328

    
2329
    cstep = itertools.count(5)
2330

    
2331
    if self.early_release:
2332
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2333
      self._RemoveOldStorage(self.target_node, iv_names)
2334
      # TODO: Check if releasing locks early still makes sense
2335
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2336
    else:
2337
      # Release all resource locks except those used by the instance
2338
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2339
                   keep=self.node_secondary_ip.keys())
2340

    
2341
    # Release all node locks while waiting for sync
2342
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2343

    
2344
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2345
    # shutdown in the caller into consideration.
2346

    
2347
    # Wait for sync
2348
    # This can fail as the old devices are degraded and _WaitForSync
2349
    # does a combined result over all disks, so we don't check its return value
2350
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2351
    WaitForSync(self.lu, self.instance)
2352

    
2353
    # Check all devices manually
2354
    self._CheckDevices(self.instance.primary_node, iv_names)
2355

    
2356
    # Step: remove old storage
2357
    if not self.early_release:
2358
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2359
      self._RemoveOldStorage(self.target_node, iv_names)
2360

    
2361
  def _ExecDrbd8Secondary(self, feedback_fn):
2362
    """Replace the secondary node for DRBD 8.
2363

2364
    The algorithm for replace is quite complicated:
2365
      - for all disks of the instance:
2366
        - create new LVs on the new node with same names
2367
        - shutdown the drbd device on the old secondary
2368
        - disconnect the drbd network on the primary
2369
        - create the drbd device on the new secondary
2370
        - network attach the drbd on the primary, using an artifice:
2371
          the drbd code for Attach() will connect to the network if it
2372
          finds a device which is connected to the good local disks but
2373
          not network enabled
2374
      - wait for sync across all devices
2375
      - remove all disks from the old secondary
2376

2377
    Failures are not very well handled.
2378

2379
    """
2380
    steps_total = 6
2381

    
2382
    pnode = self.instance.primary_node
2383

    
2384
    # Step: check device activation
2385
    self.lu.LogStep(1, steps_total, "Check device existence")
2386
    self._CheckDisksExistence([self.instance.primary_node])
2387
    self._CheckVolumeGroup([self.instance.primary_node])
2388

    
2389
    # Step: check other node consistency
2390
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2391
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2392

    
2393
    # Step: create new storage
2394
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2395
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2396
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2397
    for idx, dev in enumerate(disks):
2398
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2399
                      (self.new_node, idx))
2400
      # we pass force_create=True to force LVM creation
2401
      for new_lv in dev.children:
2402
        try:
2403
          _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2404
                               True, GetInstanceInfoText(self.instance), False,
2405
                               excl_stor)
2406
        except errors.DeviceCreationError, e:
2407
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2408

    
2409
    # Step 4: dbrd minors and drbd setups changes
2410
    # after this, we must manually remove the drbd minors on both the
2411
    # error and the success paths
2412
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2413
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2414
                                         for dev in self.instance.disks],
2415
                                        self.instance.name)
2416
    logging.debug("Allocated minors %r", minors)
2417

    
2418
    iv_names = {}
2419
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2420
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2421
                      (self.new_node, idx))
2422
      # create new devices on new_node; note that we create two IDs:
2423
      # one without port, so the drbd will be activated without
2424
      # networking information on the new node at this stage, and one
2425
      # with network, for the latter activation in step 4
2426
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2427
      if self.instance.primary_node == o_node1:
2428
        p_minor = o_minor1
2429
      else:
2430
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2431
        p_minor = o_minor2
2432

    
2433
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2434
                      p_minor, new_minor, o_secret)
2435
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2436
                    p_minor, new_minor, o_secret)
2437

    
2438
      iv_names[idx] = (dev, dev.children, new_net_id)
2439
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2440
                    new_net_id)
2441
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2442
                              logical_id=new_alone_id,
2443
                              children=dev.children,
2444
                              size=dev.size,
2445
                              params={})
2446
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2447
                                            self.cfg)
2448
      try:
2449
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2450
                             anno_new_drbd,
2451
                             GetInstanceInfoText(self.instance), False,
2452
                             excl_stor)
2453
      except errors.GenericError:
2454
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2455
        raise
2456

    
2457
    # We have new devices, shutdown the drbd on the old secondary
2458
    for idx, dev in enumerate(self.instance.disks):
2459
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2460
      self.cfg.SetDiskID(dev, self.target_node)
2461
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2462
                                            (dev, self.instance)).fail_msg
2463
      if msg:
2464
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2465
                           "node: %s" % (idx, msg),
2466
                           hint=("Please cleanup this device manually as"
2467
                                 " soon as possible"))
2468

    
2469
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2470
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2471
                                               self.instance.disks)[pnode]
2472

    
2473
    msg = result.fail_msg
2474
    if msg:
2475
      # detaches didn't succeed (unlikely)
2476
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2477
      raise errors.OpExecError("Can't detach the disks from the network on"
2478
                               " old node: %s" % (msg,))
2479

    
2480
    # if we managed to detach at least one, we update all the disks of
2481
    # the instance to point to the new secondary
2482
    self.lu.LogInfo("Updating instance configuration")
2483
    for dev, _, new_logical_id in iv_names.itervalues():
2484
      dev.logical_id = new_logical_id
2485
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2486

    
2487
    self.cfg.Update(self.instance, feedback_fn)
2488

    
2489
    # Release all node locks (the configuration has been updated)
2490
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2491

    
2492
    # and now perform the drbd attach
2493
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2494
                    " (standalone => connected)")
2495
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2496
                                            self.new_node],
2497
                                           self.node_secondary_ip,
2498
                                           (self.instance.disks, self.instance),
2499
                                           self.instance.name,
2500
                                           False)
2501
    for to_node, to_result in result.items():
2502
      msg = to_result.fail_msg
2503
      if msg:
2504
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2505
                           to_node, msg,
2506
                           hint=("please do a gnt-instance info to see the"
2507
                                 " status of disks"))
2508

    
2509
    cstep = itertools.count(5)
2510

    
2511
    if self.early_release:
2512
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2513
      self._RemoveOldStorage(self.target_node, iv_names)
2514
      # TODO: Check if releasing locks early still makes sense
2515
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2516
    else:
2517
      # Release all resource locks except those used by the instance
2518
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2519
                   keep=self.node_secondary_ip.keys())
2520

    
2521
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2522
    # shutdown in the caller into consideration.
2523

    
2524
    # Wait for sync
2525
    # This can fail as the old devices are degraded and _WaitForSync
2526
    # does a combined result over all disks, so we don't check its return value
2527
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2528
    WaitForSync(self.lu, self.instance)
2529

    
2530
    # Check all devices manually
2531
    self._CheckDevices(self.instance.primary_node, iv_names)
2532

    
2533
    # Step: remove old storage
2534
    if not self.early_release:
2535
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2536
      self._RemoveOldStorage(self.target_node, iv_names)