Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 4b92e992

History | View | Annotate | Download (92.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    if constants.IDISK_METAVG in disk:
347
      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348
    if constants.IDISK_ADOPT in disk:
349
      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350

    
351
    # For extstorage, demand the `provider' option and add any
352
    # additional parameters (ext-params) to the dict
353
    if op.disk_template == constants.DT_EXT:
354
      if ext_provider:
355
        new_disk[constants.IDISK_PROVIDER] = ext_provider
356
        for key in disk:
357
          if key not in constants.IDISK_PARAMS:
358
            new_disk[key] = disk[key]
359
      else:
360
        raise errors.OpPrereqError("Missing provider for template '%s'" %
361
                                   constants.DT_EXT, errors.ECODE_INVAL)
362

    
363
    disks.append(new_disk)
364

    
365
  return disks
366

    
367

    
368
def CheckRADOSFreeSpace():
369
  """Compute disk size requirements inside the RADOS cluster.
370

371
  """
372
  # For the RADOS cluster we assume there is always enough space.
373
  pass
374

    
375

    
376
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377
                         iv_name, p_minor, s_minor):
378
  """Generate a drbd8 device complete with its children.
379

380
  """
381
  assert len(vgnames) == len(names) == 2
382
  port = lu.cfg.AllocatePort()
383
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384

    
385
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386
                          logical_id=(vgnames[0], names[0]),
387
                          params={})
388
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
390
                          size=constants.DRBD_META_SIZE,
391
                          logical_id=(vgnames[1], names[1]),
392
                          params={})
393
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395
                          logical_id=(primary, secondary, port,
396
                                      p_minor, s_minor,
397
                                      shared_secret),
398
                          children=[dev_data, dev_meta],
399
                          iv_name=iv_name, params={})
400
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401
  return drbd_dev
402

    
403

    
404
def GenerateDiskTemplate(
405
  lu, template_name, instance_name, primary_node, secondary_nodes,
406
  disk_info, file_storage_dir, file_driver, base_index,
407
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409
  """Generate the entire disk layout for a given template type.
410

411
  """
412
  vgname = lu.cfg.GetVGName()
413
  disk_count = len(disk_info)
414
  disks = []
415

    
416
  if template_name == constants.DT_DISKLESS:
417
    pass
418
  elif template_name == constants.DT_DRBD8:
419
    if len(secondary_nodes) != 1:
420
      raise errors.ProgrammerError("Wrong template configuration")
421
    remote_node = secondary_nodes[0]
422
    minors = lu.cfg.AllocateDRBDMinor(
423
      [primary_node, remote_node] * len(disk_info), instance_name)
424

    
425
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426
                                                       full_disk_params)
427
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428

    
429
    names = []
430
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431
                                               for i in range(disk_count)]):
432
      names.append(lv_prefix + "_data")
433
      names.append(lv_prefix + "_meta")
434
    for idx, disk in enumerate(disk_info):
435
      disk_index = idx + base_index
436
      data_vg = disk.get(constants.IDISK_VG, vgname)
437
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439
                                      disk[constants.IDISK_SIZE],
440
                                      [data_vg, meta_vg],
441
                                      names[idx * 2:idx * 2 + 2],
442
                                      "disk/%d" % disk_index,
443
                                      minors[idx * 2], minors[idx * 2 + 1])
444
      disk_dev.mode = disk[constants.IDISK_MODE]
445
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
446
      disks.append(disk_dev)
447
  else:
448
    if secondary_nodes:
449
      raise errors.ProgrammerError("Wrong template configuration")
450

    
451
    if template_name == constants.DT_FILE:
452
      _req_file_storage()
453
    elif template_name == constants.DT_SHARED_FILE:
454
      _req_shr_file_storage()
455

    
456
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457
    if name_prefix is None:
458
      names = None
459
    else:
460
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461
                                        (name_prefix, base_index + i)
462
                                        for i in range(disk_count)])
463

    
464
    if template_name == constants.DT_PLAIN:
465

    
466
      def logical_id_fn(idx, _, disk):
467
        vg = disk.get(constants.IDISK_VG, vgname)
468
        return (vg, names[idx])
469

    
470
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471
      logical_id_fn = \
472
        lambda _, disk_index, disk: (file_driver,
473
                                     "%s/disk%d" % (file_storage_dir,
474
                                                    disk_index))
475
    elif template_name == constants.DT_BLOCK:
476
      logical_id_fn = \
477
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478
                                       disk[constants.IDISK_ADOPT])
479
    elif template_name == constants.DT_RBD:
480
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481
    elif template_name == constants.DT_EXT:
482
      def logical_id_fn(idx, _, disk):
483
        provider = disk.get(constants.IDISK_PROVIDER, None)
484
        if provider is None:
485
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486
                                       " not found", constants.DT_EXT,
487
                                       constants.IDISK_PROVIDER)
488
        return (provider, names[idx])
489
    else:
490
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491

    
492
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493

    
494
    for idx, disk in enumerate(disk_info):
495
      params = {}
496
      # Only for the Ext template add disk_info to params
497
      if template_name == constants.DT_EXT:
498
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499
        for key in disk:
500
          if key not in constants.IDISK_PARAMS:
501
            params[key] = disk[key]
502
      disk_index = idx + base_index
503
      size = disk[constants.IDISK_SIZE]
504
      feedback_fn("* disk %s, size %s" %
505
                  (disk_index, utils.FormatUnit(size, "h")))
506
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
507
                              logical_id=logical_id_fn(idx, disk_index, disk),
508
                              iv_name="disk/%d" % disk_index,
509
                              mode=disk[constants.IDISK_MODE],
510
                              params=params)
511
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
512
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513
      disks.append(disk_dev)
514

    
515
  return disks
516

    
517

    
518
class LUInstanceRecreateDisks(LogicalUnit):
519
  """Recreate an instance's missing disks.
520

521
  """
522
  HPATH = "instance-recreate-disks"
523
  HTYPE = constants.HTYPE_INSTANCE
524
  REQ_BGL = False
525

    
526
  _MODIFYABLE = compat.UniqueFrozenset([
527
    constants.IDISK_SIZE,
528
    constants.IDISK_MODE,
529
    ])
530

    
531
  # New or changed disk parameters may have different semantics
532
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533
    constants.IDISK_ADOPT,
534

    
535
    # TODO: Implement support changing VG while recreating
536
    constants.IDISK_VG,
537
    constants.IDISK_METAVG,
538
    constants.IDISK_PROVIDER,
539
    constants.IDISK_NAME,
540
    ]))
541

    
542
  def _RunAllocator(self):
543
    """Run the allocator based on input opcode.
544

545
    """
546
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
547

    
548
    # FIXME
549
    # The allocator should actually run in "relocate" mode, but current
550
    # allocators don't support relocating all the nodes of an instance at
551
    # the same time. As a workaround we use "allocate" mode, but this is
552
    # suboptimal for two reasons:
553
    # - The instance name passed to the allocator is present in the list of
554
    #   existing instances, so there could be a conflict within the
555
    #   internal structures of the allocator. This doesn't happen with the
556
    #   current allocators, but it's a liability.
557
    # - The allocator counts the resources used by the instance twice: once
558
    #   because the instance exists already, and once because it tries to
559
    #   allocate a new instance.
560
    # The allocator could choose some of the nodes on which the instance is
561
    # running, but that's not a problem. If the instance nodes are broken,
562
    # they should be already be marked as drained or offline, and hence
563
    # skipped by the allocator. If instance disks have been lost for other
564
    # reasons, then recreating the disks on the same nodes should be fine.
565
    disk_template = self.instance.disk_template
566
    spindle_use = be_full[constants.BE_SPINDLE_USE]
567
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
568
                                        disk_template=disk_template,
569
                                        tags=list(self.instance.GetTags()),
570
                                        os=self.instance.os,
571
                                        nics=[{}],
572
                                        vcpus=be_full[constants.BE_VCPUS],
573
                                        memory=be_full[constants.BE_MAXMEM],
574
                                        spindle_use=spindle_use,
575
                                        disks=[{constants.IDISK_SIZE: d.size,
576
                                                constants.IDISK_MODE: d.mode}
577
                                               for d in self.instance.disks],
578
                                        hypervisor=self.instance.hypervisor,
579
                                        node_whitelist=None)
580
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
581

    
582
    ial.Run(self.op.iallocator)
583

    
584
    assert req.RequiredNodes() == len(self.instance.all_nodes)
585

    
586
    if not ial.success:
587
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
588
                                 " %s" % (self.op.iallocator, ial.info),
589
                                 errors.ECODE_NORES)
590

    
591
    self.op.nodes = ial.result
592
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
593
                 self.op.instance_name, self.op.iallocator,
594
                 utils.CommaJoin(ial.result))
595

    
596
  def CheckArguments(self):
597
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
598
      # Normalize and convert deprecated list of disk indices
599
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
600

    
601
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
602
    if duplicates:
603
      raise errors.OpPrereqError("Some disks have been specified more than"
604
                                 " once: %s" % utils.CommaJoin(duplicates),
605
                                 errors.ECODE_INVAL)
606

    
607
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
608
    # when neither iallocator nor nodes are specified
609
    if self.op.iallocator or self.op.nodes:
610
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
611

    
612
    for (idx, params) in self.op.disks:
613
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
614
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
615
      if unsupported:
616
        raise errors.OpPrereqError("Parameters for disk %s try to change"
617
                                   " unmodifyable parameter(s): %s" %
618
                                   (idx, utils.CommaJoin(unsupported)),
619
                                   errors.ECODE_INVAL)
620

    
621
  def ExpandNames(self):
622
    self._ExpandAndLockInstance()
623
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
624

    
625
    if self.op.nodes:
626
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
627
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
628
    else:
629
      self.needed_locks[locking.LEVEL_NODE] = []
630
      if self.op.iallocator:
631
        # iallocator will select a new node in the same group
632
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
633
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
634

    
635
    self.needed_locks[locking.LEVEL_NODE_RES] = []
636

    
637
  def DeclareLocks(self, level):
638
    if level == locking.LEVEL_NODEGROUP:
639
      assert self.op.iallocator is not None
640
      assert not self.op.nodes
641
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
642
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
643
      # Lock the primary group used by the instance optimistically; this
644
      # requires going via the node before it's locked, requiring
645
      # verification later on
646
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
647
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
648

    
649
    elif level == locking.LEVEL_NODE:
650
      # If an allocator is used, then we lock all the nodes in the current
651
      # instance group, as we don't know yet which ones will be selected;
652
      # if we replace the nodes without using an allocator, locks are
653
      # already declared in ExpandNames; otherwise, we need to lock all the
654
      # instance nodes for disk re-creation
655
      if self.op.iallocator:
656
        assert not self.op.nodes
657
        assert not self.needed_locks[locking.LEVEL_NODE]
658
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
659

    
660
        # Lock member nodes of the group of the primary node
661
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
662
          self.needed_locks[locking.LEVEL_NODE].extend(
663
            self.cfg.GetNodeGroup(group_uuid).members)
664

    
665
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
666
      elif not self.op.nodes:
667
        self._LockInstancesNodes(primary_only=False)
668
    elif level == locking.LEVEL_NODE_RES:
669
      # Copy node locks
670
      self.needed_locks[locking.LEVEL_NODE_RES] = \
671
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
672

    
673
  def BuildHooksEnv(self):
674
    """Build hooks env.
675

676
    This runs on master, primary and secondary nodes of the instance.
677

678
    """
679
    return BuildInstanceHookEnvByObject(self, self.instance)
680

    
681
  def BuildHooksNodes(self):
682
    """Build hooks nodes.
683

684
    """
685
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
686
    return (nl, nl)
687

    
688
  def CheckPrereq(self):
689
    """Check prerequisites.
690

691
    This checks that the instance is in the cluster and is not running.
692

693
    """
694
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
695
    assert instance is not None, \
696
      "Cannot retrieve locked instance %s" % self.op.instance_name
697
    if self.op.nodes:
698
      if len(self.op.nodes) != len(instance.all_nodes):
699
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
700
                                   " %d replacement nodes were specified" %
701
                                   (instance.name, len(instance.all_nodes),
702
                                    len(self.op.nodes)),
703
                                   errors.ECODE_INVAL)
704
      assert instance.disk_template != constants.DT_DRBD8 or \
705
             len(self.op.nodes) == 2
706
      assert instance.disk_template != constants.DT_PLAIN or \
707
             len(self.op.nodes) == 1
708
      primary_node = self.op.nodes[0]
709
    else:
710
      primary_node = instance.primary_node
711
    if not self.op.iallocator:
712
      CheckNodeOnline(self, primary_node)
713

    
714
    if instance.disk_template == constants.DT_DISKLESS:
715
      raise errors.OpPrereqError("Instance '%s' has no disks" %
716
                                 self.op.instance_name, errors.ECODE_INVAL)
717

    
718
    # Verify if node group locks are still correct
719
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
720
    if owned_groups:
721
      # Node group locks are acquired only for the primary node (and only
722
      # when the allocator is used)
723
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
724
                              primary_only=True)
725

    
726
    # if we replace nodes *and* the old primary is offline, we don't
727
    # check the instance state
728
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
729
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
730
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
731
                         msg="cannot recreate disks")
732

    
733
    if self.op.disks:
734
      self.disks = dict(self.op.disks)
735
    else:
736
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
737

    
738
    maxidx = max(self.disks.keys())
739
    if maxidx >= len(instance.disks):
740
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
741
                                 errors.ECODE_INVAL)
742

    
743
    if ((self.op.nodes or self.op.iallocator) and
744
         sorted(self.disks.keys()) != range(len(instance.disks))):
745
      raise errors.OpPrereqError("Can't recreate disks partially and"
746
                                 " change the nodes at the same time",
747
                                 errors.ECODE_INVAL)
748

    
749
    self.instance = instance
750

    
751
    if self.op.iallocator:
752
      self._RunAllocator()
753
      # Release unneeded node and node resource locks
754
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
755
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
756
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
757

    
758
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
759

    
760
  def Exec(self, feedback_fn):
761
    """Recreate the disks.
762

763
    """
764
    instance = self.instance
765

    
766
    assert (self.owned_locks(locking.LEVEL_NODE) ==
767
            self.owned_locks(locking.LEVEL_NODE_RES))
768

    
769
    to_skip = []
770
    mods = [] # keeps track of needed changes
771

    
772
    for idx, disk in enumerate(instance.disks):
773
      try:
774
        changes = self.disks[idx]
775
      except KeyError:
776
        # Disk should not be recreated
777
        to_skip.append(idx)
778
        continue
779

    
780
      # update secondaries for disks, if needed
781
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
782
        # need to update the nodes and minors
783
        assert len(self.op.nodes) == 2
784
        assert len(disk.logical_id) == 6 # otherwise disk internals
785
                                         # have changed
786
        (_, _, old_port, _, _, old_secret) = disk.logical_id
787
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
788
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
789
                  new_minors[0], new_minors[1], old_secret)
790
        assert len(disk.logical_id) == len(new_id)
791
      else:
792
        new_id = None
793

    
794
      mods.append((idx, new_id, changes))
795

    
796
    # now that we have passed all asserts above, we can apply the mods
797
    # in a single run (to avoid partial changes)
798
    for idx, new_id, changes in mods:
799
      disk = instance.disks[idx]
800
      if new_id is not None:
801
        assert disk.dev_type == constants.LD_DRBD8
802
        disk.logical_id = new_id
803
      if changes:
804
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
805
                    mode=changes.get(constants.IDISK_MODE, None))
806

    
807
    # change primary node, if needed
808
    if self.op.nodes:
809
      instance.primary_node = self.op.nodes[0]
810
      self.LogWarning("Changing the instance's nodes, you will have to"
811
                      " remove any disks left on the older nodes manually")
812

    
813
    if self.op.nodes:
814
      self.cfg.Update(instance, feedback_fn)
815

    
816
    # All touched nodes must be locked
817
    mylocks = self.owned_locks(locking.LEVEL_NODE)
818
    assert mylocks.issuperset(frozenset(instance.all_nodes))
819
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
820

    
821
    # TODO: Release node locks before wiping, or explain why it's not possible
822
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
823
      wipedisks = [(idx, disk, 0)
824
                   for (idx, disk) in enumerate(instance.disks)
825
                   if idx not in to_skip]
826
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
827

    
828

    
829
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830
  """Checks if nodes have enough free disk space in the specified VG.
831

832
  This function checks if all given nodes have the needed amount of
833
  free disk. In case any node has less disk or we cannot get the
834
  information from the node, this function raises an OpPrereqError
835
  exception.
836

837
  @type lu: C{LogicalUnit}
838
  @param lu: a logical unit from which we get configuration data
839
  @type nodenames: C{list}
840
  @param nodenames: the list of node names to check
841
  @type vg: C{str}
842
  @param vg: the volume group to check
843
  @type requested: C{int}
844
  @param requested: the amount of disk in MiB to check for
845
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
846
      or we cannot check the node
847

848
  """
849
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
850
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
851
  # the current functionality. Refactor to make it more flexible.
852
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
853
                                   es_flags)
854
  for node in nodenames:
855
    info = nodeinfo[node]
856
    info.Raise("Cannot get current information from node %s" % node,
857
               prereq=True, ecode=errors.ECODE_ENVIRON)
858
    (_, (vg_info, ), _) = info.payload
859
    vg_free = vg_info.get("vg_free", None)
860
    if not isinstance(vg_free, int):
861
      raise errors.OpPrereqError("Can't compute free disk space on node"
862
                                 " %s for vg %s, result was '%s'" %
863
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
864
    if requested > vg_free:
865
      raise errors.OpPrereqError("Not enough disk space on target node %s"
866
                                 " vg %s: required %d MiB, available %d MiB" %
867
                                 (node, vg, requested, vg_free),
868
                                 errors.ECODE_NORES)
869

    
870

    
871
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
872
  """Checks if nodes have enough free disk space in all the VGs.
873

874
  This function checks if all given nodes have the needed amount of
875
  free disk. In case any node has less disk or we cannot get the
876
  information from the node, this function raises an OpPrereqError
877
  exception.
878

879
  @type lu: C{LogicalUnit}
880
  @param lu: a logical unit from which we get configuration data
881
  @type nodenames: C{list}
882
  @param nodenames: the list of node names to check
883
  @type req_sizes: C{dict}
884
  @param req_sizes: the hash of vg and corresponding amount of disk in
885
      MiB to check for
886
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
887
      or we cannot check the node
888

889
  """
890
  for vg, req_size in req_sizes.items():
891
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
892

    
893

    
894
def _DiskSizeInBytesToMebibytes(lu, size):
895
  """Converts a disk size in bytes to mebibytes.
896

897
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
898

899
  """
900
  (mib, remainder) = divmod(size, 1024 * 1024)
901

    
902
  if remainder != 0:
903
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
904
                  " to not overwrite existing data (%s bytes will not be"
905
                  " wiped)", (1024 * 1024) - remainder)
906
    mib += 1
907

    
908
  return mib
909

    
910

    
911
def _CalcEta(time_taken, written, total_size):
912
  """Calculates the ETA based on size written and total size.
913

914
  @param time_taken: The time taken so far
915
  @param written: amount written so far
916
  @param total_size: The total size of data to be written
917
  @return: The remaining time in seconds
918

919
  """
920
  avg_time = time_taken / float(written)
921
  return (total_size - written) * avg_time
922

    
923

    
924
def WipeDisks(lu, instance, disks=None):
925
  """Wipes instance disks.
926

927
  @type lu: L{LogicalUnit}
928
  @param lu: the logical unit on whose behalf we execute
929
  @type instance: L{objects.Instance}
930
  @param instance: the instance whose disks we should create
931
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
932
  @param disks: Disk details; tuple contains disk index, disk object and the
933
    start offset
934

935
  """
936
  node = instance.primary_node
937

    
938
  if disks is None:
939
    disks = [(idx, disk, 0)
940
             for (idx, disk) in enumerate(instance.disks)]
941

    
942
  for (_, device, _) in disks:
943
    lu.cfg.SetDiskID(device, node)
944

    
945
  logging.info("Pausing synchronization of disks of instance '%s'",
946
               instance.name)
947
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
948
                                                  (map(compat.snd, disks),
949
                                                   instance),
950
                                                  True)
951
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
952

    
953
  for idx, success in enumerate(result.payload):
954
    if not success:
955
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
956
                   " failed", idx, instance.name)
957

    
958
  try:
959
    for (idx, device, offset) in disks:
960
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
961
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
962
      wipe_chunk_size = \
963
        int(min(constants.MAX_WIPE_CHUNK,
964
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
965

    
966
      size = device.size
967
      last_output = 0
968
      start_time = time.time()
969

    
970
      if offset == 0:
971
        info_text = ""
972
      else:
973
        info_text = (" (from %s to %s)" %
974
                     (utils.FormatUnit(offset, "h"),
975
                      utils.FormatUnit(size, "h")))
976

    
977
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
978

    
979
      logging.info("Wiping disk %d for instance %s on node %s using"
980
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
981

    
982
      while offset < size:
983
        wipe_size = min(wipe_chunk_size, size - offset)
984

    
985
        logging.debug("Wiping disk %d, offset %s, chunk %s",
986
                      idx, offset, wipe_size)
987

    
988
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
989
                                           wipe_size)
990
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
991
                     (idx, offset, wipe_size))
992

    
993
        now = time.time()
994
        offset += wipe_size
995
        if now - last_output >= 60:
996
          eta = _CalcEta(now - start_time, offset, size)
997
          lu.LogInfo(" - done: %.1f%% ETA: %s",
998
                     offset / float(size) * 100, utils.FormatSeconds(eta))
999
          last_output = now
1000
  finally:
1001
    logging.info("Resuming synchronization of disks for instance '%s'",
1002
                 instance.name)
1003

    
1004
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1005
                                                    (map(compat.snd, disks),
1006
                                                     instance),
1007
                                                    False)
1008

    
1009
    if result.fail_msg:
1010
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1011
                    node, result.fail_msg)
1012
    else:
1013
      for idx, success in enumerate(result.payload):
1014
        if not success:
1015
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1016
                        " failed", idx, instance.name)
1017

    
1018

    
1019
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1020
  """Wrapper for L{WipeDisks} that handles errors.
1021

1022
  @type lu: L{LogicalUnit}
1023
  @param lu: the logical unit on whose behalf we execute
1024
  @type instance: L{objects.Instance}
1025
  @param instance: the instance whose disks we should wipe
1026
  @param disks: see L{WipeDisks}
1027
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1028
      case of error
1029
  @raise errors.OpPrereqError: in case of failure
1030

1031
  """
1032
  try:
1033
    WipeDisks(lu, instance, disks=disks)
1034
  except errors.OpExecError:
1035
    logging.warning("Wiping disks for instance '%s' failed",
1036
                    instance.name)
1037
    _UndoCreateDisks(lu, cleanup)
1038
    raise
1039

    
1040

    
1041
def ExpandCheckDisks(instance, disks):
1042
  """Return the instance disks selected by the disks list
1043

1044
  @type disks: list of L{objects.Disk} or None
1045
  @param disks: selected disks
1046
  @rtype: list of L{objects.Disk}
1047
  @return: selected instance disks to act on
1048

1049
  """
1050
  if disks is None:
1051
    return instance.disks
1052
  else:
1053
    if not set(disks).issubset(instance.disks):
1054
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1055
                                   " target instance")
1056
    return disks
1057

    
1058

    
1059
def WaitForSync(lu, instance, disks=None, oneshot=False):
1060
  """Sleep and poll for an instance's disk to sync.
1061

1062
  """
1063
  if not instance.disks or disks is not None and not disks:
1064
    return True
1065

    
1066
  disks = ExpandCheckDisks(instance, disks)
1067

    
1068
  if not oneshot:
1069
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1070

    
1071
  node = instance.primary_node
1072

    
1073
  for dev in disks:
1074
    lu.cfg.SetDiskID(dev, node)
1075

    
1076
  # TODO: Convert to utils.Retry
1077

    
1078
  retries = 0
1079
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1080
  while True:
1081
    max_time = 0
1082
    done = True
1083
    cumul_degraded = False
1084
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1085
    msg = rstats.fail_msg
1086
    if msg:
1087
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1088
      retries += 1
1089
      if retries >= 10:
1090
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1091
                                 " aborting." % node)
1092
      time.sleep(6)
1093
      continue
1094
    rstats = rstats.payload
1095
    retries = 0
1096
    for i, mstat in enumerate(rstats):
1097
      if mstat is None:
1098
        lu.LogWarning("Can't compute data for node %s/%s",
1099
                      node, disks[i].iv_name)
1100
        continue
1101

    
1102
      cumul_degraded = (cumul_degraded or
1103
                        (mstat.is_degraded and mstat.sync_percent is None))
1104
      if mstat.sync_percent is not None:
1105
        done = False
1106
        if mstat.estimated_time is not None:
1107
          rem_time = ("%s remaining (estimated)" %
1108
                      utils.FormatSeconds(mstat.estimated_time))
1109
          max_time = mstat.estimated_time
1110
        else:
1111
          rem_time = "no time estimate"
1112
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1113
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1114

    
1115
    # if we're done but degraded, let's do a few small retries, to
1116
    # make sure we see a stable and not transient situation; therefore
1117
    # we force restart of the loop
1118
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1119
      logging.info("Degraded disks found, %d retries left", degr_retries)
1120
      degr_retries -= 1
1121
      time.sleep(1)
1122
      continue
1123

    
1124
    if done or oneshot:
1125
      break
1126

    
1127
    time.sleep(min(60, max_time))
1128

    
1129
  if done:
1130
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1131

    
1132
  return not cumul_degraded
1133

    
1134

    
1135
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1136
  """Shutdown block devices of an instance.
1137

1138
  This does the shutdown on all nodes of the instance.
1139

1140
  If the ignore_primary is false, errors on the primary node are
1141
  ignored.
1142

1143
  """
1144
  all_result = True
1145
  disks = ExpandCheckDisks(instance, disks)
1146

    
1147
  for disk in disks:
1148
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1149
      lu.cfg.SetDiskID(top_disk, node)
1150
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1151
      msg = result.fail_msg
1152
      if msg:
1153
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1154
                      disk.iv_name, node, msg)
1155
        if ((node == instance.primary_node and not ignore_primary) or
1156
            (node != instance.primary_node and not result.offline)):
1157
          all_result = False
1158
  return all_result
1159

    
1160

    
1161
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1162
  """Shutdown block devices of an instance.
1163

1164
  This function checks if an instance is running, before calling
1165
  _ShutdownInstanceDisks.
1166

1167
  """
1168
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1169
  ShutdownInstanceDisks(lu, instance, disks=disks)
1170

    
1171

    
1172
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1173
                           ignore_size=False):
1174
  """Prepare the block devices for an instance.
1175

1176
  This sets up the block devices on all nodes.
1177

1178
  @type lu: L{LogicalUnit}
1179
  @param lu: the logical unit on whose behalf we execute
1180
  @type instance: L{objects.Instance}
1181
  @param instance: the instance for whose disks we assemble
1182
  @type disks: list of L{objects.Disk} or None
1183
  @param disks: which disks to assemble (or all, if None)
1184
  @type ignore_secondaries: boolean
1185
  @param ignore_secondaries: if true, errors on secondary nodes
1186
      won't result in an error return from the function
1187
  @type ignore_size: boolean
1188
  @param ignore_size: if true, the current known size of the disk
1189
      will not be used during the disk activation, useful for cases
1190
      when the size is wrong
1191
  @return: False if the operation failed, otherwise a list of
1192
      (host, instance_visible_name, node_visible_name)
1193
      with the mapping from node devices to instance devices
1194

1195
  """
1196
  device_info = []
1197
  disks_ok = True
1198
  iname = instance.name
1199
  disks = ExpandCheckDisks(instance, disks)
1200

    
1201
  # With the two passes mechanism we try to reduce the window of
1202
  # opportunity for the race condition of switching DRBD to primary
1203
  # before handshaking occured, but we do not eliminate it
1204

    
1205
  # The proper fix would be to wait (with some limits) until the
1206
  # connection has been made and drbd transitions from WFConnection
1207
  # into any other network-connected state (Connected, SyncTarget,
1208
  # SyncSource, etc.)
1209

    
1210
  # 1st pass, assemble on all nodes in secondary mode
1211
  for idx, inst_disk in enumerate(disks):
1212
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1213
      if ignore_size:
1214
        node_disk = node_disk.Copy()
1215
        node_disk.UnsetSize()
1216
      lu.cfg.SetDiskID(node_disk, node)
1217
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1218
                                             False, idx)
1219
      msg = result.fail_msg
1220
      if msg:
1221
        is_offline_secondary = (node in instance.secondary_nodes and
1222
                                result.offline)
1223
        lu.LogWarning("Could not prepare block device %s on node %s"
1224
                      " (is_primary=False, pass=1): %s",
1225
                      inst_disk.iv_name, node, msg)
1226
        if not (ignore_secondaries or is_offline_secondary):
1227
          disks_ok = False
1228

    
1229
  # FIXME: race condition on drbd migration to primary
1230

    
1231
  # 2nd pass, do only the primary node
1232
  for idx, inst_disk in enumerate(disks):
1233
    dev_path = None
1234

    
1235
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1236
      if node != instance.primary_node:
1237
        continue
1238
      if ignore_size:
1239
        node_disk = node_disk.Copy()
1240
        node_disk.UnsetSize()
1241
      lu.cfg.SetDiskID(node_disk, node)
1242
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1243
                                             True, idx)
1244
      msg = result.fail_msg
1245
      if msg:
1246
        lu.LogWarning("Could not prepare block device %s on node %s"
1247
                      " (is_primary=True, pass=2): %s",
1248
                      inst_disk.iv_name, node, msg)
1249
        disks_ok = False
1250
      else:
1251
        dev_path = result.payload
1252

    
1253
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1254

    
1255
  # leave the disks configured for the primary node
1256
  # this is a workaround that would be fixed better by
1257
  # improving the logical/physical id handling
1258
  for disk in disks:
1259
    lu.cfg.SetDiskID(disk, instance.primary_node)
1260

    
1261
  return disks_ok, device_info
1262

    
1263

    
1264
def StartInstanceDisks(lu, instance, force):
1265
  """Start the disks of an instance.
1266

1267
  """
1268
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1269
                                      ignore_secondaries=force)
1270
  if not disks_ok:
1271
    ShutdownInstanceDisks(lu, instance)
1272
    if force is not None and not force:
1273
      lu.LogWarning("",
1274
                    hint=("If the message above refers to a secondary node,"
1275
                          " you can retry the operation using '--force'"))
1276
    raise errors.OpExecError("Disk consistency error")
1277

    
1278

    
1279
class LUInstanceGrowDisk(LogicalUnit):
1280
  """Grow a disk of an instance.
1281

1282
  """
1283
  HPATH = "disk-grow"
1284
  HTYPE = constants.HTYPE_INSTANCE
1285
  REQ_BGL = False
1286

    
1287
  def ExpandNames(self):
1288
    self._ExpandAndLockInstance()
1289
    self.needed_locks[locking.LEVEL_NODE] = []
1290
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1291
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1292
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1293

    
1294
  def DeclareLocks(self, level):
1295
    if level == locking.LEVEL_NODE:
1296
      self._LockInstancesNodes()
1297
    elif level == locking.LEVEL_NODE_RES:
1298
      # Copy node locks
1299
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1300
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1301

    
1302
  def BuildHooksEnv(self):
1303
    """Build hooks env.
1304

1305
    This runs on the master, the primary and all the secondaries.
1306

1307
    """
1308
    env = {
1309
      "DISK": self.op.disk,
1310
      "AMOUNT": self.op.amount,
1311
      "ABSOLUTE": self.op.absolute,
1312
      }
1313
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1314
    return env
1315

    
1316
  def BuildHooksNodes(self):
1317
    """Build hooks nodes.
1318

1319
    """
1320
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1321
    return (nl, nl)
1322

    
1323
  def CheckPrereq(self):
1324
    """Check prerequisites.
1325

1326
    This checks that the instance is in the cluster.
1327

1328
    """
1329
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1330
    assert instance is not None, \
1331
      "Cannot retrieve locked instance %s" % self.op.instance_name
1332
    nodenames = list(instance.all_nodes)
1333
    for node in nodenames:
1334
      CheckNodeOnline(self, node)
1335

    
1336
    self.instance = instance
1337

    
1338
    if instance.disk_template not in constants.DTS_GROWABLE:
1339
      raise errors.OpPrereqError("Instance's disk layout does not support"
1340
                                 " growing", errors.ECODE_INVAL)
1341

    
1342
    self.disk = instance.FindDisk(self.op.disk)
1343

    
1344
    if self.op.absolute:
1345
      self.target = self.op.amount
1346
      self.delta = self.target - self.disk.size
1347
      if self.delta < 0:
1348
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1349
                                   "current disk size (%s)" %
1350
                                   (utils.FormatUnit(self.target, "h"),
1351
                                    utils.FormatUnit(self.disk.size, "h")),
1352
                                   errors.ECODE_STATE)
1353
    else:
1354
      self.delta = self.op.amount
1355
      self.target = self.disk.size + self.delta
1356
      if self.delta < 0:
1357
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1358
                                   utils.FormatUnit(self.delta, "h"),
1359
                                   errors.ECODE_INVAL)
1360

    
1361
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1362

    
1363
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1364
    template = self.instance.disk_template
1365
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1366
      # TODO: check the free disk space for file, when that feature will be
1367
      # supported
1368
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1369
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1370
                        nodes)
1371
      if es_nodes:
1372
        # With exclusive storage we need to something smarter than just looking
1373
        # at free space; for now, let's simply abort the operation.
1374
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1375
                                   " is enabled", errors.ECODE_STATE)
1376
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1377

    
1378
  def Exec(self, feedback_fn):
1379
    """Execute disk grow.
1380

1381
    """
1382
    instance = self.instance
1383
    disk = self.disk
1384

    
1385
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1386
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1387
            self.owned_locks(locking.LEVEL_NODE_RES))
1388

    
1389
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1390

    
1391
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1392
    if not disks_ok:
1393
      raise errors.OpExecError("Cannot activate block device to grow")
1394

    
1395
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1396
                (self.op.disk, instance.name,
1397
                 utils.FormatUnit(self.delta, "h"),
1398
                 utils.FormatUnit(self.target, "h")))
1399

    
1400
    # First run all grow ops in dry-run mode
1401
    for node in instance.all_nodes:
1402
      self.cfg.SetDiskID(disk, node)
1403
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1404
                                           True, True)
1405
      result.Raise("Dry-run grow request failed to node %s" % node)
1406

    
1407
    if wipe_disks:
1408
      # Get disk size from primary node for wiping
1409
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1410
      result.Raise("Failed to retrieve disk size from node '%s'" %
1411
                   instance.primary_node)
1412

    
1413
      (disk_size_in_bytes, ) = result.payload
1414

    
1415
      if disk_size_in_bytes is None:
1416
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1417
                                 " node '%s'" % instance.primary_node)
1418

    
1419
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1420

    
1421
      assert old_disk_size >= disk.size, \
1422
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1423
         (old_disk_size, disk.size))
1424
    else:
1425
      old_disk_size = None
1426

    
1427
    # We know that (as far as we can test) operations across different
1428
    # nodes will succeed, time to run it for real on the backing storage
1429
    for node in instance.all_nodes:
1430
      self.cfg.SetDiskID(disk, node)
1431
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1432
                                           False, True)
1433
      result.Raise("Grow request failed to node %s" % node)
1434

    
1435
    # And now execute it for logical storage, on the primary node
1436
    node = instance.primary_node
1437
    self.cfg.SetDiskID(disk, node)
1438
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1439
                                         False, False)
1440
    result.Raise("Grow request failed to node %s" % node)
1441

    
1442
    disk.RecordGrow(self.delta)
1443
    self.cfg.Update(instance, feedback_fn)
1444

    
1445
    # Changes have been recorded, release node lock
1446
    ReleaseLocks(self, locking.LEVEL_NODE)
1447

    
1448
    # Downgrade lock while waiting for sync
1449
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1450

    
1451
    assert wipe_disks ^ (old_disk_size is None)
1452

    
1453
    if wipe_disks:
1454
      assert instance.disks[self.op.disk] == disk
1455

    
1456
      # Wipe newly added disk space
1457
      WipeDisks(self, instance,
1458
                disks=[(self.op.disk, disk, old_disk_size)])
1459

    
1460
    if self.op.wait_for_sync:
1461
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1462
      if disk_abort:
1463
        self.LogWarning("Disk syncing has not returned a good status; check"
1464
                        " the instance")
1465
      if instance.admin_state != constants.ADMINST_UP:
1466
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1467
    elif instance.admin_state != constants.ADMINST_UP:
1468
      self.LogWarning("Not shutting down the disk even if the instance is"
1469
                      " not supposed to be running because no wait for"
1470
                      " sync mode was requested")
1471

    
1472
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1473
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1474

    
1475

    
1476
class LUInstanceReplaceDisks(LogicalUnit):
1477
  """Replace the disks of an instance.
1478

1479
  """
1480
  HPATH = "mirrors-replace"
1481
  HTYPE = constants.HTYPE_INSTANCE
1482
  REQ_BGL = False
1483

    
1484
  def CheckArguments(self):
1485
    """Check arguments.
1486

1487
    """
1488
    remote_node = self.op.remote_node
1489
    ialloc = self.op.iallocator
1490
    if self.op.mode == constants.REPLACE_DISK_CHG:
1491
      if remote_node is None and ialloc is None:
1492
        raise errors.OpPrereqError("When changing the secondary either an"
1493
                                   " iallocator script must be used or the"
1494
                                   " new node given", errors.ECODE_INVAL)
1495
      else:
1496
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1497

    
1498
    elif remote_node is not None or ialloc is not None:
1499
      # Not replacing the secondary
1500
      raise errors.OpPrereqError("The iallocator and new node options can"
1501
                                 " only be used when changing the"
1502
                                 " secondary node", errors.ECODE_INVAL)
1503

    
1504
  def ExpandNames(self):
1505
    self._ExpandAndLockInstance()
1506

    
1507
    assert locking.LEVEL_NODE not in self.needed_locks
1508
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1509
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1510

    
1511
    assert self.op.iallocator is None or self.op.remote_node is None, \
1512
      "Conflicting options"
1513

    
1514
    if self.op.remote_node is not None:
1515
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1516

    
1517
      # Warning: do not remove the locking of the new secondary here
1518
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1519
      # currently it doesn't since parallel invocations of
1520
      # FindUnusedMinor will conflict
1521
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1522
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1523
    else:
1524
      self.needed_locks[locking.LEVEL_NODE] = []
1525
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1526

    
1527
      if self.op.iallocator is not None:
1528
        # iallocator will select a new node in the same group
1529
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1530
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1531

    
1532
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1533

    
1534
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1535
                                   self.op.iallocator, self.op.remote_node,
1536
                                   self.op.disks, self.op.early_release,
1537
                                   self.op.ignore_ipolicy)
1538

    
1539
    self.tasklets = [self.replacer]
1540

    
1541
  def DeclareLocks(self, level):
1542
    if level == locking.LEVEL_NODEGROUP:
1543
      assert self.op.remote_node is None
1544
      assert self.op.iallocator is not None
1545
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1546

    
1547
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1548
      # Lock all groups used by instance optimistically; this requires going
1549
      # via the node before it's locked, requiring verification later on
1550
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1551
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1552

    
1553
    elif level == locking.LEVEL_NODE:
1554
      if self.op.iallocator is not None:
1555
        assert self.op.remote_node is None
1556
        assert not self.needed_locks[locking.LEVEL_NODE]
1557
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1558

    
1559
        # Lock member nodes of all locked groups
1560
        self.needed_locks[locking.LEVEL_NODE] = \
1561
          [node_name
1562
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1563
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1564
      else:
1565
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1566

    
1567
        self._LockInstancesNodes()
1568

    
1569
    elif level == locking.LEVEL_NODE_RES:
1570
      # Reuse node locks
1571
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1572
        self.needed_locks[locking.LEVEL_NODE]
1573

    
1574
  def BuildHooksEnv(self):
1575
    """Build hooks env.
1576

1577
    This runs on the master, the primary and all the secondaries.
1578

1579
    """
1580
    instance = self.replacer.instance
1581
    env = {
1582
      "MODE": self.op.mode,
1583
      "NEW_SECONDARY": self.op.remote_node,
1584
      "OLD_SECONDARY": instance.secondary_nodes[0],
1585
      }
1586
    env.update(BuildInstanceHookEnvByObject(self, instance))
1587
    return env
1588

    
1589
  def BuildHooksNodes(self):
1590
    """Build hooks nodes.
1591

1592
    """
1593
    instance = self.replacer.instance
1594
    nl = [
1595
      self.cfg.GetMasterNode(),
1596
      instance.primary_node,
1597
      ]
1598
    if self.op.remote_node is not None:
1599
      nl.append(self.op.remote_node)
1600
    return nl, nl
1601

    
1602
  def CheckPrereq(self):
1603
    """Check prerequisites.
1604

1605
    """
1606
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1607
            self.op.iallocator is None)
1608

    
1609
    # Verify if node group locks are still correct
1610
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1611
    if owned_groups:
1612
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1613

    
1614
    return LogicalUnit.CheckPrereq(self)
1615

    
1616

    
1617
class LUInstanceActivateDisks(NoHooksLU):
1618
  """Bring up an instance's disks.
1619

1620
  """
1621
  REQ_BGL = False
1622

    
1623
  def ExpandNames(self):
1624
    self._ExpandAndLockInstance()
1625
    self.needed_locks[locking.LEVEL_NODE] = []
1626
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1627

    
1628
  def DeclareLocks(self, level):
1629
    if level == locking.LEVEL_NODE:
1630
      self._LockInstancesNodes()
1631

    
1632
  def CheckPrereq(self):
1633
    """Check prerequisites.
1634

1635
    This checks that the instance is in the cluster.
1636

1637
    """
1638
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1639
    assert self.instance is not None, \
1640
      "Cannot retrieve locked instance %s" % self.op.instance_name
1641
    CheckNodeOnline(self, self.instance.primary_node)
1642

    
1643
  def Exec(self, feedback_fn):
1644
    """Activate the disks.
1645

1646
    """
1647
    disks_ok, disks_info = \
1648
              AssembleInstanceDisks(self, self.instance,
1649
                                    ignore_size=self.op.ignore_size)
1650
    if not disks_ok:
1651
      raise errors.OpExecError("Cannot activate block devices")
1652

    
1653
    if self.op.wait_for_sync:
1654
      if not WaitForSync(self, self.instance):
1655
        raise errors.OpExecError("Some disks of the instance are degraded!")
1656

    
1657
    return disks_info
1658

    
1659

    
1660
class LUInstanceDeactivateDisks(NoHooksLU):
1661
  """Shutdown an instance's disks.
1662

1663
  """
1664
  REQ_BGL = False
1665

    
1666
  def ExpandNames(self):
1667
    self._ExpandAndLockInstance()
1668
    self.needed_locks[locking.LEVEL_NODE] = []
1669
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1670

    
1671
  def DeclareLocks(self, level):
1672
    if level == locking.LEVEL_NODE:
1673
      self._LockInstancesNodes()
1674

    
1675
  def CheckPrereq(self):
1676
    """Check prerequisites.
1677

1678
    This checks that the instance is in the cluster.
1679

1680
    """
1681
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1682
    assert self.instance is not None, \
1683
      "Cannot retrieve locked instance %s" % self.op.instance_name
1684

    
1685
  def Exec(self, feedback_fn):
1686
    """Deactivate the disks
1687

1688
    """
1689
    instance = self.instance
1690
    if self.op.force:
1691
      ShutdownInstanceDisks(self, instance)
1692
    else:
1693
      _SafeShutdownInstanceDisks(self, instance)
1694

    
1695

    
1696
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1697
                               ldisk=False):
1698
  """Check that mirrors are not degraded.
1699

1700
  @attention: The device has to be annotated already.
1701

1702
  The ldisk parameter, if True, will change the test from the
1703
  is_degraded attribute (which represents overall non-ok status for
1704
  the device(s)) to the ldisk (representing the local storage status).
1705

1706
  """
1707
  lu.cfg.SetDiskID(dev, node)
1708

    
1709
  result = True
1710

    
1711
  if on_primary or dev.AssembleOnSecondary():
1712
    rstats = lu.rpc.call_blockdev_find(node, dev)
1713
    msg = rstats.fail_msg
1714
    if msg:
1715
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1716
      result = False
1717
    elif not rstats.payload:
1718
      lu.LogWarning("Can't find disk on node %s", node)
1719
      result = False
1720
    else:
1721
      if ldisk:
1722
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1723
      else:
1724
        result = result and not rstats.payload.is_degraded
1725

    
1726
  if dev.children:
1727
    for child in dev.children:
1728
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1729
                                                     on_primary)
1730

    
1731
  return result
1732

    
1733

    
1734
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1735
  """Wrapper around L{_CheckDiskConsistencyInner}.
1736

1737
  """
1738
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1739
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1740
                                    ldisk=ldisk)
1741

    
1742

    
1743
def _BlockdevFind(lu, node, dev, instance):
1744
  """Wrapper around call_blockdev_find to annotate diskparams.
1745

1746
  @param lu: A reference to the lu object
1747
  @param node: The node to call out
1748
  @param dev: The device to find
1749
  @param instance: The instance object the device belongs to
1750
  @returns The result of the rpc call
1751

1752
  """
1753
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1754
  return lu.rpc.call_blockdev_find(node, disk)
1755

    
1756

    
1757
def _GenerateUniqueNames(lu, exts):
1758
  """Generate a suitable LV name.
1759

1760
  This will generate a logical volume name for the given instance.
1761

1762
  """
1763
  results = []
1764
  for val in exts:
1765
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1766
    results.append("%s%s" % (new_id, val))
1767
  return results
1768

    
1769

    
1770
class TLReplaceDisks(Tasklet):
1771
  """Replaces disks for an instance.
1772

1773
  Note: Locking is not within the scope of this class.
1774

1775
  """
1776
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1777
               disks, early_release, ignore_ipolicy):
1778
    """Initializes this class.
1779

1780
    """
1781
    Tasklet.__init__(self, lu)
1782

    
1783
    # Parameters
1784
    self.instance_name = instance_name
1785
    self.mode = mode
1786
    self.iallocator_name = iallocator_name
1787
    self.remote_node = remote_node
1788
    self.disks = disks
1789
    self.early_release = early_release
1790
    self.ignore_ipolicy = ignore_ipolicy
1791

    
1792
    # Runtime data
1793
    self.instance = None
1794
    self.new_node = None
1795
    self.target_node = None
1796
    self.other_node = None
1797
    self.remote_node_info = None
1798
    self.node_secondary_ip = None
1799

    
1800
  @staticmethod
1801
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1802
    """Compute a new secondary node using an IAllocator.
1803

1804
    """
1805
    req = iallocator.IAReqRelocate(name=instance_name,
1806
                                   relocate_from=list(relocate_from))
1807
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1808

    
1809
    ial.Run(iallocator_name)
1810

    
1811
    if not ial.success:
1812
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1813
                                 " %s" % (iallocator_name, ial.info),
1814
                                 errors.ECODE_NORES)
1815

    
1816
    remote_node_name = ial.result[0]
1817

    
1818
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1819
               instance_name, remote_node_name)
1820

    
1821
    return remote_node_name
1822

    
1823
  def _FindFaultyDisks(self, node_name):
1824
    """Wrapper for L{FindFaultyInstanceDisks}.
1825

1826
    """
1827
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1828
                                   node_name, True)
1829

    
1830
  def _CheckDisksActivated(self, instance):
1831
    """Checks if the instance disks are activated.
1832

1833
    @param instance: The instance to check disks
1834
    @return: True if they are activated, False otherwise
1835

1836
    """
1837
    nodes = instance.all_nodes
1838

    
1839
    for idx, dev in enumerate(instance.disks):
1840
      for node in nodes:
1841
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1842
        self.cfg.SetDiskID(dev, node)
1843

    
1844
        result = _BlockdevFind(self, node, dev, instance)
1845

    
1846
        if result.offline:
1847
          continue
1848
        elif result.fail_msg or not result.payload:
1849
          return False
1850

    
1851
    return True
1852

    
1853
  def CheckPrereq(self):
1854
    """Check prerequisites.
1855

1856
    This checks that the instance is in the cluster.
1857

1858
    """
1859
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1860
    assert instance is not None, \
1861
      "Cannot retrieve locked instance %s" % self.instance_name
1862

    
1863
    if instance.disk_template != constants.DT_DRBD8:
1864
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1865
                                 " instances", errors.ECODE_INVAL)
1866

    
1867
    if len(instance.secondary_nodes) != 1:
1868
      raise errors.OpPrereqError("The instance has a strange layout,"
1869
                                 " expected one secondary but found %d" %
1870
                                 len(instance.secondary_nodes),
1871
                                 errors.ECODE_FAULT)
1872

    
1873
    instance = self.instance
1874
    secondary_node = instance.secondary_nodes[0]
1875

    
1876
    if self.iallocator_name is None:
1877
      remote_node = self.remote_node
1878
    else:
1879
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1880
                                       instance.name, instance.secondary_nodes)
1881

    
1882
    if remote_node is None:
1883
      self.remote_node_info = None
1884
    else:
1885
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1886
             "Remote node '%s' is not locked" % remote_node
1887

    
1888
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1889
      assert self.remote_node_info is not None, \
1890
        "Cannot retrieve locked node %s" % remote_node
1891

    
1892
    if remote_node == self.instance.primary_node:
1893
      raise errors.OpPrereqError("The specified node is the primary node of"
1894
                                 " the instance", errors.ECODE_INVAL)
1895

    
1896
    if remote_node == secondary_node:
1897
      raise errors.OpPrereqError("The specified node is already the"
1898
                                 " secondary node of the instance",
1899
                                 errors.ECODE_INVAL)
1900

    
1901
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1902
                                    constants.REPLACE_DISK_CHG):
1903
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1904
                                 errors.ECODE_INVAL)
1905

    
1906
    if self.mode == constants.REPLACE_DISK_AUTO:
1907
      if not self._CheckDisksActivated(instance):
1908
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1909
                                   " first" % self.instance_name,
1910
                                   errors.ECODE_STATE)
1911
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1912
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1913

    
1914
      if faulty_primary and faulty_secondary:
1915
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1916
                                   " one node and can not be repaired"
1917
                                   " automatically" % self.instance_name,
1918
                                   errors.ECODE_STATE)
1919

    
1920
      if faulty_primary:
1921
        self.disks = faulty_primary
1922
        self.target_node = instance.primary_node
1923
        self.other_node = secondary_node
1924
        check_nodes = [self.target_node, self.other_node]
1925
      elif faulty_secondary:
1926
        self.disks = faulty_secondary
1927
        self.target_node = secondary_node
1928
        self.other_node = instance.primary_node
1929
        check_nodes = [self.target_node, self.other_node]
1930
      else:
1931
        self.disks = []
1932
        check_nodes = []
1933

    
1934
    else:
1935
      # Non-automatic modes
1936
      if self.mode == constants.REPLACE_DISK_PRI:
1937
        self.target_node = instance.primary_node
1938
        self.other_node = secondary_node
1939
        check_nodes = [self.target_node, self.other_node]
1940

    
1941
      elif self.mode == constants.REPLACE_DISK_SEC:
1942
        self.target_node = secondary_node
1943
        self.other_node = instance.primary_node
1944
        check_nodes = [self.target_node, self.other_node]
1945

    
1946
      elif self.mode == constants.REPLACE_DISK_CHG:
1947
        self.new_node = remote_node
1948
        self.other_node = instance.primary_node
1949
        self.target_node = secondary_node
1950
        check_nodes = [self.new_node, self.other_node]
1951

    
1952
        CheckNodeNotDrained(self.lu, remote_node)
1953
        CheckNodeVmCapable(self.lu, remote_node)
1954

    
1955
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1956
        assert old_node_info is not None
1957
        if old_node_info.offline and not self.early_release:
1958
          # doesn't make sense to delay the release
1959
          self.early_release = True
1960
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1961
                          " early-release mode", secondary_node)
1962

    
1963
      else:
1964
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1965
                                     self.mode)
1966

    
1967
      # If not specified all disks should be replaced
1968
      if not self.disks:
1969
        self.disks = range(len(self.instance.disks))
1970

    
1971
    # TODO: This is ugly, but right now we can't distinguish between internal
1972
    # submitted opcode and external one. We should fix that.
1973
    if self.remote_node_info:
1974
      # We change the node, lets verify it still meets instance policy
1975
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1976
      cluster = self.cfg.GetClusterInfo()
1977
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1978
                                                              new_group_info)
1979
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1980
                             self.cfg, ignore=self.ignore_ipolicy)
1981

    
1982
    for node in check_nodes:
1983
      CheckNodeOnline(self.lu, node)
1984

    
1985
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
1986
                                                          self.other_node,
1987
                                                          self.target_node]
1988
                              if node_name is not None)
1989

    
1990
    # Release unneeded node and node resource locks
1991
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1992
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1993
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1994

    
1995
    # Release any owned node group
1996
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1997

    
1998
    # Check whether disks are valid
1999
    for disk_idx in self.disks:
2000
      instance.FindDisk(disk_idx)
2001

    
2002
    # Get secondary node IP addresses
2003
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2004
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2005

    
2006
  def Exec(self, feedback_fn):
2007
    """Execute disk replacement.
2008

2009
    This dispatches the disk replacement to the appropriate handler.
2010

2011
    """
2012
    if __debug__:
2013
      # Verify owned locks before starting operation
2014
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2015
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2016
          ("Incorrect node locks, owning %s, expected %s" %
2017
           (owned_nodes, self.node_secondary_ip.keys()))
2018
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2019
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2020
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2021

    
2022
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2023
      assert list(owned_instances) == [self.instance_name], \
2024
          "Instance '%s' not locked" % self.instance_name
2025

    
2026
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2027
          "Should not own any node group lock at this point"
2028

    
2029
    if not self.disks:
2030
      feedback_fn("No disks need replacement for instance '%s'" %
2031
                  self.instance.name)
2032
      return
2033

    
2034
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2035
                (utils.CommaJoin(self.disks), self.instance.name))
2036
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2037
    feedback_fn("Current seconary node: %s" %
2038
                utils.CommaJoin(self.instance.secondary_nodes))
2039

    
2040
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2041

    
2042
    # Activate the instance disks if we're replacing them on a down instance
2043
    if activate_disks:
2044
      StartInstanceDisks(self.lu, self.instance, True)
2045

    
2046
    try:
2047
      # Should we replace the secondary node?
2048
      if self.new_node is not None:
2049
        fn = self._ExecDrbd8Secondary
2050
      else:
2051
        fn = self._ExecDrbd8DiskOnly
2052

    
2053
      result = fn(feedback_fn)
2054
    finally:
2055
      # Deactivate the instance disks if we're replacing them on a
2056
      # down instance
2057
      if activate_disks:
2058
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2059

    
2060
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2061

    
2062
    if __debug__:
2063
      # Verify owned locks
2064
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2065
      nodes = frozenset(self.node_secondary_ip)
2066
      assert ((self.early_release and not owned_nodes) or
2067
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2068
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2069
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2070

    
2071
    return result
2072

    
2073
  def _CheckVolumeGroup(self, nodes):
2074
    self.lu.LogInfo("Checking volume groups")
2075

    
2076
    vgname = self.cfg.GetVGName()
2077

    
2078
    # Make sure volume group exists on all involved nodes
2079
    results = self.rpc.call_vg_list(nodes)
2080
    if not results:
2081
      raise errors.OpExecError("Can't list volume groups on the nodes")
2082

    
2083
    for node in nodes:
2084
      res = results[node]
2085
      res.Raise("Error checking node %s" % node)
2086
      if vgname not in res.payload:
2087
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2088
                                 (vgname, node))
2089

    
2090
  def _CheckDisksExistence(self, nodes):
2091
    # Check disk existence
2092
    for idx, dev in enumerate(self.instance.disks):
2093
      if idx not in self.disks:
2094
        continue
2095

    
2096
      for node in nodes:
2097
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2098
        self.cfg.SetDiskID(dev, node)
2099

    
2100
        result = _BlockdevFind(self, node, dev, self.instance)
2101

    
2102
        msg = result.fail_msg
2103
        if msg or not result.payload:
2104
          if not msg:
2105
            msg = "disk not found"
2106
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2107
                                   (idx, node, msg))
2108

    
2109
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2110
    for idx, dev in enumerate(self.instance.disks):
2111
      if idx not in self.disks:
2112
        continue
2113

    
2114
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2115
                      (idx, node_name))
2116

    
2117
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2118
                                  on_primary, ldisk=ldisk):
2119
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2120
                                 " replace disks for instance %s" %
2121
                                 (node_name, self.instance.name))
2122

    
2123
  def _CreateNewStorage(self, node_name):
2124
    """Create new storage on the primary or secondary node.
2125

2126
    This is only used for same-node replaces, not for changing the
2127
    secondary node, hence we don't want to modify the existing disk.
2128

2129
    """
2130
    iv_names = {}
2131

    
2132
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2133
    for idx, dev in enumerate(disks):
2134
      if idx not in self.disks:
2135
        continue
2136

    
2137
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2138

    
2139
      self.cfg.SetDiskID(dev, node_name)
2140

    
2141
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2142
      names = _GenerateUniqueNames(self.lu, lv_names)
2143

    
2144
      (data_disk, meta_disk) = dev.children
2145
      vg_data = data_disk.logical_id[0]
2146
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2147
                             logical_id=(vg_data, names[0]),
2148
                             params=data_disk.params)
2149
      vg_meta = meta_disk.logical_id[0]
2150
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2151
                             size=constants.DRBD_META_SIZE,
2152
                             logical_id=(vg_meta, names[1]),
2153
                             params=meta_disk.params)
2154

    
2155
      new_lvs = [lv_data, lv_meta]
2156
      old_lvs = [child.Copy() for child in dev.children]
2157
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2158
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2159

    
2160
      # we pass force_create=True to force the LVM creation
2161
      for new_lv in new_lvs:
2162
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2163
                             GetInstanceInfoText(self.instance), False,
2164
                             excl_stor)
2165

    
2166
    return iv_names
2167

    
2168
  def _CheckDevices(self, node_name, iv_names):
2169
    for name, (dev, _, _) in iv_names.iteritems():
2170
      self.cfg.SetDiskID(dev, node_name)
2171

    
2172
      result = _BlockdevFind(self, node_name, dev, self.instance)
2173

    
2174
      msg = result.fail_msg
2175
      if msg or not result.payload:
2176
        if not msg:
2177
          msg = "disk not found"
2178
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2179
                                 (name, msg))
2180

    
2181
      if result.payload.is_degraded:
2182
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2183

    
2184
  def _RemoveOldStorage(self, node_name, iv_names):
2185
    for name, (_, old_lvs, _) in iv_names.iteritems():
2186
      self.lu.LogInfo("Remove logical volumes for %s", name)
2187

    
2188
      for lv in old_lvs:
2189
        self.cfg.SetDiskID(lv, node_name)
2190

    
2191
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2192
        if msg:
2193
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2194
                             hint="remove unused LVs manually")
2195

    
2196
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2197
    """Replace a disk on the primary or secondary for DRBD 8.
2198

2199
    The algorithm for replace is quite complicated:
2200

2201
      1. for each disk to be replaced:
2202

2203
        1. create new LVs on the target node with unique names
2204
        1. detach old LVs from the drbd device
2205
        1. rename old LVs to name_replaced.<time_t>
2206
        1. rename new LVs to old LVs
2207
        1. attach the new LVs (with the old names now) to the drbd device
2208

2209
      1. wait for sync across all devices
2210

2211
      1. for each modified disk:
2212

2213
        1. remove old LVs (which have the name name_replaces.<time_t>)
2214

2215
    Failures are not very well handled.
2216

2217
    """
2218
    steps_total = 6
2219

    
2220
    # Step: check device activation
2221
    self.lu.LogStep(1, steps_total, "Check device existence")
2222
    self._CheckDisksExistence([self.other_node, self.target_node])
2223
    self._CheckVolumeGroup([self.target_node, self.other_node])
2224

    
2225
    # Step: check other node consistency
2226
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2227
    self._CheckDisksConsistency(self.other_node,
2228
                                self.other_node == self.instance.primary_node,
2229
                                False)
2230

    
2231
    # Step: create new storage
2232
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2233
    iv_names = self._CreateNewStorage(self.target_node)
2234

    
2235
    # Step: for each lv, detach+rename*2+attach
2236
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2237
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2238
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2239

    
2240
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2241
                                                     old_lvs)
2242
      result.Raise("Can't detach drbd from local storage on node"
2243
                   " %s for device %s" % (self.target_node, dev.iv_name))
2244
      #dev.children = []
2245
      #cfg.Update(instance)
2246

    
2247
      # ok, we created the new LVs, so now we know we have the needed
2248
      # storage; as such, we proceed on the target node to rename
2249
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2250
      # using the assumption that logical_id == physical_id (which in
2251
      # turn is the unique_id on that node)
2252

    
2253
      # FIXME(iustin): use a better name for the replaced LVs
2254
      temp_suffix = int(time.time())
2255
      ren_fn = lambda d, suff: (d.physical_id[0],
2256
                                d.physical_id[1] + "_replaced-%s" % suff)
2257

    
2258
      # Build the rename list based on what LVs exist on the node
2259
      rename_old_to_new = []
2260
      for to_ren in old_lvs:
2261
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2262
        if not result.fail_msg and result.payload:
2263
          # device exists
2264
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2265

    
2266
      self.lu.LogInfo("Renaming the old LVs on the target node")
2267
      result = self.rpc.call_blockdev_rename(self.target_node,
2268
                                             rename_old_to_new)
2269
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2270

    
2271
      # Now we rename the new LVs to the old LVs
2272
      self.lu.LogInfo("Renaming the new LVs on the target node")
2273
      rename_new_to_old = [(new, old.physical_id)
2274
                           for old, new in zip(old_lvs, new_lvs)]
2275
      result = self.rpc.call_blockdev_rename(self.target_node,
2276
                                             rename_new_to_old)
2277
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2278

    
2279
      # Intermediate steps of in memory modifications
2280
      for old, new in zip(old_lvs, new_lvs):
2281
        new.logical_id = old.logical_id
2282
        self.cfg.SetDiskID(new, self.target_node)
2283

    
2284
      # We need to modify old_lvs so that removal later removes the
2285
      # right LVs, not the newly added ones; note that old_lvs is a
2286
      # copy here
2287
      for disk in old_lvs:
2288
        disk.logical_id = ren_fn(disk, temp_suffix)
2289
        self.cfg.SetDiskID(disk, self.target_node)
2290

    
2291
      # Now that the new lvs have the old name, we can add them to the device
2292
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2293
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2294
                                                  (dev, self.instance), new_lvs)
2295
      msg = result.fail_msg
2296
      if msg:
2297
        for new_lv in new_lvs:
2298
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2299
                                               new_lv).fail_msg
2300
          if msg2:
2301
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2302
                               hint=("cleanup manually the unused logical"
2303
                                     "volumes"))
2304
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2305

    
2306
    cstep = itertools.count(5)
2307

    
2308
    if self.early_release:
2309
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2310
      self._RemoveOldStorage(self.target_node, iv_names)
2311
      # TODO: Check if releasing locks early still makes sense
2312
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2313
    else:
2314
      # Release all resource locks except those used by the instance
2315
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2316
                   keep=self.node_secondary_ip.keys())
2317

    
2318
    # Release all node locks while waiting for sync
2319
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2320

    
2321
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2322
    # shutdown in the caller into consideration.
2323

    
2324
    # Wait for sync
2325
    # This can fail as the old devices are degraded and _WaitForSync
2326
    # does a combined result over all disks, so we don't check its return value
2327
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2328
    WaitForSync(self.lu, self.instance)
2329

    
2330
    # Check all devices manually
2331
    self._CheckDevices(self.instance.primary_node, iv_names)
2332

    
2333
    # Step: remove old storage
2334
    if not self.early_release:
2335
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2336
      self._RemoveOldStorage(self.target_node, iv_names)
2337

    
2338
  def _ExecDrbd8Secondary(self, feedback_fn):
2339
    """Replace the secondary node for DRBD 8.
2340

2341
    The algorithm for replace is quite complicated:
2342
      - for all disks of the instance:
2343
        - create new LVs on the new node with same names
2344
        - shutdown the drbd device on the old secondary
2345
        - disconnect the drbd network on the primary
2346
        - create the drbd device on the new secondary
2347
        - network attach the drbd on the primary, using an artifice:
2348
          the drbd code for Attach() will connect to the network if it
2349
          finds a device which is connected to the good local disks but
2350
          not network enabled
2351
      - wait for sync across all devices
2352
      - remove all disks from the old secondary
2353

2354
    Failures are not very well handled.
2355

2356
    """
2357
    steps_total = 6
2358

    
2359
    pnode = self.instance.primary_node
2360

    
2361
    # Step: check device activation
2362
    self.lu.LogStep(1, steps_total, "Check device existence")
2363
    self._CheckDisksExistence([self.instance.primary_node])
2364
    self._CheckVolumeGroup([self.instance.primary_node])
2365

    
2366
    # Step: check other node consistency
2367
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2368
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2369

    
2370
    # Step: create new storage
2371
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2372
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2373
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2374
    for idx, dev in enumerate(disks):
2375
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2376
                      (self.new_node, idx))
2377
      # we pass force_create=True to force LVM creation
2378
      for new_lv in dev.children:
2379
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2380
                             True, GetInstanceInfoText(self.instance), False,
2381
                             excl_stor)
2382

    
2383
    # Step 4: dbrd minors and drbd setups changes
2384
    # after this, we must manually remove the drbd minors on both the
2385
    # error and the success paths
2386
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2387
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2388
                                         for dev in self.instance.disks],
2389
                                        self.instance.name)
2390
    logging.debug("Allocated minors %r", minors)
2391

    
2392
    iv_names = {}
2393
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2394
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2395
                      (self.new_node, idx))
2396
      # create new devices on new_node; note that we create two IDs:
2397
      # one without port, so the drbd will be activated without
2398
      # networking information on the new node at this stage, and one
2399
      # with network, for the latter activation in step 4
2400
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2401
      if self.instance.primary_node == o_node1:
2402
        p_minor = o_minor1
2403
      else:
2404
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2405
        p_minor = o_minor2
2406

    
2407
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2408
                      p_minor, new_minor, o_secret)
2409
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2410
                    p_minor, new_minor, o_secret)
2411

    
2412
      iv_names[idx] = (dev, dev.children, new_net_id)
2413
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2414
                    new_net_id)
2415
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2416
                              logical_id=new_alone_id,
2417
                              children=dev.children,
2418
                              size=dev.size,
2419
                              params={})
2420
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2421
                                            self.cfg)
2422
      try:
2423
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2424
                             anno_new_drbd,
2425
                             GetInstanceInfoText(self.instance), False,
2426
                             excl_stor)
2427
      except errors.GenericError:
2428
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2429
        raise
2430

    
2431
    # We have new devices, shutdown the drbd on the old secondary
2432
    for idx, dev in enumerate(self.instance.disks):
2433
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2434
      self.cfg.SetDiskID(dev, self.target_node)
2435
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2436
                                            (dev, self.instance)).fail_msg
2437
      if msg:
2438
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2439
                           "node: %s" % (idx, msg),
2440
                           hint=("Please cleanup this device manually as"
2441
                                 " soon as possible"))
2442

    
2443
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2444
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2445
                                               self.instance.disks)[pnode]
2446

    
2447
    msg = result.fail_msg
2448
    if msg:
2449
      # detaches didn't succeed (unlikely)
2450
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2451
      raise errors.OpExecError("Can't detach the disks from the network on"
2452
                               " old node: %s" % (msg,))
2453

    
2454
    # if we managed to detach at least one, we update all the disks of
2455
    # the instance to point to the new secondary
2456
    self.lu.LogInfo("Updating instance configuration")
2457
    for dev, _, new_logical_id in iv_names.itervalues():
2458
      dev.logical_id = new_logical_id
2459
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2460

    
2461
    self.cfg.Update(self.instance, feedback_fn)
2462

    
2463
    # Release all node locks (the configuration has been updated)
2464
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2465

    
2466
    # and now perform the drbd attach
2467
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2468
                    " (standalone => connected)")
2469
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2470
                                            self.new_node],
2471
                                           self.node_secondary_ip,
2472
                                           (self.instance.disks, self.instance),
2473
                                           self.instance.name,
2474
                                           False)
2475
    for to_node, to_result in result.items():
2476
      msg = to_result.fail_msg
2477
      if msg:
2478
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2479
                           to_node, msg,
2480
                           hint=("please do a gnt-instance info to see the"
2481
                                 " status of disks"))
2482

    
2483
    cstep = itertools.count(5)
2484

    
2485
    if self.early_release:
2486
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2487
      self._RemoveOldStorage(self.target_node, iv_names)
2488
      # TODO: Check if releasing locks early still makes sense
2489
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2490
    else:
2491
      # Release all resource locks except those used by the instance
2492
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2493
                   keep=self.node_secondary_ip.keys())
2494

    
2495
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2496
    # shutdown in the caller into consideration.
2497

    
2498
    # Wait for sync
2499
    # This can fail as the old devices are degraded and _WaitForSync
2500
    # does a combined result over all disks, so we don't check its return value
2501
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2502
    WaitForSync(self.lu, self.instance)
2503

    
2504
    # Check all devices manually
2505
    self._CheckDevices(self.instance.primary_node, iv_names)
2506

    
2507
    # Step: remove old storage
2508
    if not self.early_release:
2509
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2510
      self._RemoveOldStorage(self.target_node, iv_names)