Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / __init__.py @ 31b836b8

History | View | Annotate | Download (340 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import time
33
import logging
34
import copy
35
import OpenSSL
36
import itertools
37
import operator
38

    
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import compat
46
from ganeti import masterd
47
from ganeti import netutils
48
from ganeti import query
49
from ganeti import qlang
50
from ganeti import opcodes
51
from ganeti import ht
52
from ganeti import rpc
53
from ganeti import pathutils
54
from ganeti import network
55
from ganeti.masterd import iallocator
56

    
57
from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
58
  Tasklet, _QueryBase
59
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
60
  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
61
  _ExpandInstanceName, _ExpandItemName, \
62
  _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
63
  _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
64
  _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
65
  _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
66
  _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
67
  _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
68
  _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
69
  _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
70
  _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
71
  _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
72
  _CheckIAllocatorOrNode, _FindFaultyInstanceDisks
73

    
74
from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
75
  LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
76
  LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
77
  LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
78
  LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
79
  LUClusterVerifyDisks
80
from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
81
  _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
82
  LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
83
from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
84
  LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
85
  _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
86
  LUNodeRemove, LURepairNodeStorage
87
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
88
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
89
  LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
90
  LUNetworkDisconnect
91
from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
92

    
93
import ganeti.masterd.instance # pylint: disable=W0611
94

    
95

    
96
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
97
  """Whether exclusive_storage is in effect for the given node.
98

99
  @type cfg: L{config.ConfigWriter}
100
  @param cfg: The cluster configuration
101
  @type nodename: string
102
  @param nodename: The node
103
  @rtype: bool
104
  @return: The effective value of exclusive_storage
105
  @raise errors.OpPrereqError: if no node exists with the given name
106

107
  """
108
  ni = cfg.GetNodeInfo(nodename)
109
  if ni is None:
110
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
111
                               errors.ECODE_NOENT)
112
  return _IsExclusiveStorageEnabledNode(cfg, ni)
113

    
114

    
115
def _CopyLockList(names):
116
  """Makes a copy of a list of lock names.
117

118
  Handles L{locking.ALL_SET} correctly.
119

120
  """
121
  if names == locking.ALL_SET:
122
    return locking.ALL_SET
123
  else:
124
    return names[:]
125

    
126

    
127
def _ReleaseLocks(lu, level, names=None, keep=None):
128
  """Releases locks owned by an LU.
129

130
  @type lu: L{LogicalUnit}
131
  @param level: Lock level
132
  @type names: list or None
133
  @param names: Names of locks to release
134
  @type keep: list or None
135
  @param keep: Names of locks to retain
136

137
  """
138
  assert not (keep is not None and names is not None), \
139
         "Only one of the 'names' and the 'keep' parameters can be given"
140

    
141
  if names is not None:
142
    should_release = names.__contains__
143
  elif keep:
144
    should_release = lambda name: name not in keep
145
  else:
146
    should_release = None
147

    
148
  owned = lu.owned_locks(level)
149
  if not owned:
150
    # Not owning any lock at this level, do nothing
151
    pass
152

    
153
  elif should_release:
154
    retain = []
155
    release = []
156

    
157
    # Determine which locks to release
158
    for name in owned:
159
      if should_release(name):
160
        release.append(name)
161
      else:
162
        retain.append(name)
163

    
164
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
165

    
166
    # Release just some locks
167
    lu.glm.release(level, names=release)
168

    
169
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
170
  else:
171
    # Release everything
172
    lu.glm.release(level)
173

    
174
    assert not lu.glm.is_owned(level), "No locks should be owned"
175

    
176

    
177
def _CheckNodeOnline(lu, node, msg=None):
178
  """Ensure that a given node is online.
179

180
  @param lu: the LU on behalf of which we make the check
181
  @param node: the node to check
182
  @param msg: if passed, should be a message to replace the default one
183
  @raise errors.OpPrereqError: if the node is offline
184

185
  """
186
  if msg is None:
187
    msg = "Can't use offline node"
188
  if lu.cfg.GetNodeInfo(node).offline:
189
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
190

    
191

    
192
def _CheckNodeNotDrained(lu, node):
193
  """Ensure that a given node is not drained.
194

195
  @param lu: the LU on behalf of which we make the check
196
  @param node: the node to check
197
  @raise errors.OpPrereqError: if the node is drained
198

199
  """
200
  if lu.cfg.GetNodeInfo(node).drained:
201
    raise errors.OpPrereqError("Can't use drained node %s" % node,
202
                               errors.ECODE_STATE)
203

    
204

    
205
def _CheckNodeVmCapable(lu, node):
206
  """Ensure that a given node is vm capable.
207

208
  @param lu: the LU on behalf of which we make the check
209
  @param node: the node to check
210
  @raise errors.OpPrereqError: if the node is not vm capable
211

212
  """
213
  if not lu.cfg.GetNodeInfo(node).vm_capable:
214
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
215
                               errors.ECODE_STATE)
216

    
217

    
218
def _CheckNodeHasOS(lu, node, os_name, force_variant):
219
  """Ensure that a node supports a given OS.
220

221
  @param lu: the LU on behalf of which we make the check
222
  @param node: the node to check
223
  @param os_name: the OS to query about
224
  @param force_variant: whether to ignore variant errors
225
  @raise errors.OpPrereqError: if the node is not supporting the OS
226

227
  """
228
  result = lu.rpc.call_os_get(node, os_name)
229
  result.Raise("OS '%s' not in supported OS list for node %s" %
230
               (os_name, node),
231
               prereq=True, ecode=errors.ECODE_INVAL)
232
  if not force_variant:
233
    _CheckOSVariant(result.payload, os_name)
234

    
235

    
236
def _GetClusterDomainSecret():
237
  """Reads the cluster domain secret.
238

239
  """
240
  return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
241
                               strict=True)
242

    
243

    
244
def _ComputeIPolicyInstanceSpecViolation(
245
  ipolicy, instance_spec, disk_template,
246
  _compute_fn=_ComputeIPolicySpecViolation):
247
  """Compute if instance specs meets the specs of ipolicy.
248

249
  @type ipolicy: dict
250
  @param ipolicy: The ipolicy to verify against
251
  @param instance_spec: dict
252
  @param instance_spec: The instance spec to verify
253
  @type disk_template: string
254
  @param disk_template: the disk template of the instance
255
  @param _compute_fn: The function to verify ipolicy (unittest only)
256
  @see: L{_ComputeIPolicySpecViolation}
257

258
  """
259
  mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
260
  cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
261
  disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
262
  disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
263
  nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
264
  spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
265

    
266
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
267
                     disk_sizes, spindle_use, disk_template)
268

    
269

    
270
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
271
                                 target_group, cfg,
272
                                 _compute_fn=_ComputeIPolicyInstanceViolation):
273
  """Compute if instance meets the specs of the new target group.
274

275
  @param ipolicy: The ipolicy to verify
276
  @param instance: The instance object to verify
277
  @param current_group: The current group of the instance
278
  @param target_group: The new group of the instance
279
  @type cfg: L{config.ConfigWriter}
280
  @param cfg: Cluster configuration
281
  @param _compute_fn: The function to verify ipolicy (unittest only)
282
  @see: L{_ComputeIPolicySpecViolation}
283

284
  """
285
  if current_group == target_group:
286
    return []
287
  else:
288
    return _compute_fn(ipolicy, instance, cfg)
289

    
290

    
291
def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
292
                            _compute_fn=_ComputeIPolicyNodeViolation):
293
  """Checks that the target node is correct in terms of instance policy.
294

295
  @param ipolicy: The ipolicy to verify
296
  @param instance: The instance object to verify
297
  @param node: The new node to relocate
298
  @type cfg: L{config.ConfigWriter}
299
  @param cfg: Cluster configuration
300
  @param ignore: Ignore violations of the ipolicy
301
  @param _compute_fn: The function to verify ipolicy (unittest only)
302
  @see: L{_ComputeIPolicySpecViolation}
303

304
  """
305
  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
306
  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
307

    
308
  if res:
309
    msg = ("Instance does not meet target node group's (%s) instance"
310
           " policy: %s") % (node.group, utils.CommaJoin(res))
311
    if ignore:
312
      lu.LogWarning(msg)
313
    else:
314
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
315

    
316

    
317
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
318
                          minmem, maxmem, vcpus, nics, disk_template, disks,
319
                          bep, hvp, hypervisor_name, tags):
320
  """Builds instance related env variables for hooks
321

322
  This builds the hook environment from individual variables.
323

324
  @type name: string
325
  @param name: the name of the instance
326
  @type primary_node: string
327
  @param primary_node: the name of the instance's primary node
328
  @type secondary_nodes: list
329
  @param secondary_nodes: list of secondary nodes as strings
330
  @type os_type: string
331
  @param os_type: the name of the instance's OS
332
  @type status: string
333
  @param status: the desired status of the instance
334
  @type minmem: string
335
  @param minmem: the minimum memory size of the instance
336
  @type maxmem: string
337
  @param maxmem: the maximum memory size of the instance
338
  @type vcpus: string
339
  @param vcpus: the count of VCPUs the instance has
340
  @type nics: list
341
  @param nics: list of tuples (name, uuid, ip, mac, mode, link, net, netinfo)
342
      representing the NICs the instance has
343
  @type disk_template: string
344
  @param disk_template: the disk template of the instance
345
  @type disks: list
346
  @param disks: list of tuples (name, uuid, size, mode)
347
  @type bep: dict
348
  @param bep: the backend parameters for the instance
349
  @type hvp: dict
350
  @param hvp: the hypervisor parameters for the instance
351
  @type hypervisor_name: string
352
  @param hypervisor_name: the hypervisor for the instance
353
  @type tags: list
354
  @param tags: list of instance tags as strings
355
  @rtype: dict
356
  @return: the hook environment for this instance
357

358
  """
359
  env = {
360
    "OP_TARGET": name,
361
    "INSTANCE_NAME": name,
362
    "INSTANCE_PRIMARY": primary_node,
363
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
364
    "INSTANCE_OS_TYPE": os_type,
365
    "INSTANCE_STATUS": status,
366
    "INSTANCE_MINMEM": minmem,
367
    "INSTANCE_MAXMEM": maxmem,
368
    # TODO(2.9) remove deprecated "memory" value
369
    "INSTANCE_MEMORY": maxmem,
370
    "INSTANCE_VCPUS": vcpus,
371
    "INSTANCE_DISK_TEMPLATE": disk_template,
372
    "INSTANCE_HYPERVISOR": hypervisor_name,
373
  }
374
  if nics:
375
    nic_count = len(nics)
376
    for idx, (name, _, ip, mac, mode, link, net, netinfo) in enumerate(nics):
377
      if ip is None:
378
        ip = ""
379
      env["INSTANCE_NIC%d_NAME" % idx] = name
380
      env["INSTANCE_NIC%d_IP" % idx] = ip
381
      env["INSTANCE_NIC%d_MAC" % idx] = mac
382
      env["INSTANCE_NIC%d_MODE" % idx] = mode
383
      env["INSTANCE_NIC%d_LINK" % idx] = link
384
      if netinfo:
385
        nobj = objects.Network.FromDict(netinfo)
386
        env.update(nobj.HooksDict("INSTANCE_NIC%d_" % idx))
387
      elif network:
388
        # FIXME: broken network reference: the instance NIC specifies a
389
        # network, but the relevant network entry was not in the config. This
390
        # should be made impossible.
391
        env["INSTANCE_NIC%d_NETWORK_NAME" % idx] = net
392
      if mode == constants.NIC_MODE_BRIDGED:
393
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
394
  else:
395
    nic_count = 0
396

    
397
  env["INSTANCE_NIC_COUNT"] = nic_count
398

    
399
  if disks:
400
    disk_count = len(disks)
401
    for idx, (name, size, mode) in enumerate(disks):
402
      env["INSTANCE_DISK%d_NAME" % idx] = name
403
      env["INSTANCE_DISK%d_SIZE" % idx] = size
404
      env["INSTANCE_DISK%d_MODE" % idx] = mode
405
  else:
406
    disk_count = 0
407

    
408
  env["INSTANCE_DISK_COUNT"] = disk_count
409

    
410
  if not tags:
411
    tags = []
412

    
413
  env["INSTANCE_TAGS"] = " ".join(tags)
414

    
415
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
416
    for key, value in source.items():
417
      env["INSTANCE_%s_%s" % (kind, key)] = value
418

    
419
  return env
420

    
421

    
422
def _NICToTuple(lu, nic):
423
  """Build a tupple of nic information.
424

425
  @type lu:  L{LogicalUnit}
426
  @param lu: the logical unit on whose behalf we execute
427
  @type nic: L{objects.NIC}
428
  @param nic: nic to convert to hooks tuple
429

430
  """
431
  cluster = lu.cfg.GetClusterInfo()
432
  filled_params = cluster.SimpleFillNIC(nic.nicparams)
433
  mode = filled_params[constants.NIC_MODE]
434
  link = filled_params[constants.NIC_LINK]
435
  netinfo = None
436
  if nic.network:
437
    nobj = lu.cfg.GetNetwork(nic.network)
438
    netinfo = objects.Network.ToDict(nobj)
439
  return (nic.name, nic.uuid, nic.ip, nic.mac, mode, link, nic.network, netinfo)
440

    
441

    
442
def _NICListToTuple(lu, nics):
443
  """Build a list of nic information tuples.
444

445
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
446
  value in LUInstanceQueryData.
447

448
  @type lu:  L{LogicalUnit}
449
  @param lu: the logical unit on whose behalf we execute
450
  @type nics: list of L{objects.NIC}
451
  @param nics: list of nics to convert to hooks tuples
452

453
  """
454
  hooks_nics = []
455
  for nic in nics:
456
    hooks_nics.append(_NICToTuple(lu, nic))
457
  return hooks_nics
458

    
459

    
460
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
461
  """Builds instance related env variables for hooks from an object.
462

463
  @type lu: L{LogicalUnit}
464
  @param lu: the logical unit on whose behalf we execute
465
  @type instance: L{objects.Instance}
466
  @param instance: the instance for which we should build the
467
      environment
468
  @type override: dict
469
  @param override: dictionary with key/values that will override
470
      our values
471
  @rtype: dict
472
  @return: the hook environment dictionary
473

474
  """
475
  cluster = lu.cfg.GetClusterInfo()
476
  bep = cluster.FillBE(instance)
477
  hvp = cluster.FillHV(instance)
478
  args = {
479
    "name": instance.name,
480
    "primary_node": instance.primary_node,
481
    "secondary_nodes": instance.secondary_nodes,
482
    "os_type": instance.os,
483
    "status": instance.admin_state,
484
    "maxmem": bep[constants.BE_MAXMEM],
485
    "minmem": bep[constants.BE_MINMEM],
486
    "vcpus": bep[constants.BE_VCPUS],
487
    "nics": _NICListToTuple(lu, instance.nics),
488
    "disk_template": instance.disk_template,
489
    "disks": [(disk.name, disk.size, disk.mode)
490
              for disk in instance.disks],
491
    "bep": bep,
492
    "hvp": hvp,
493
    "hypervisor_name": instance.hypervisor,
494
    "tags": instance.tags,
495
  }
496
  if override:
497
    args.update(override)
498
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
499

    
500

    
501
def _CheckNicsBridgesExist(lu, target_nics, target_node):
502
  """Check that the brigdes needed by a list of nics exist.
503

504
  """
505
  cluster = lu.cfg.GetClusterInfo()
506
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
507
  brlist = [params[constants.NIC_LINK] for params in paramslist
508
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
509
  if brlist:
510
    result = lu.rpc.call_bridges_exist(target_node, brlist)
511
    result.Raise("Error checking bridges on destination node '%s'" %
512
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
513

    
514

    
515
def _CheckInstanceBridgesExist(lu, instance, node=None):
516
  """Check that the brigdes needed by an instance exist.
517

518
  """
519
  if node is None:
520
    node = instance.primary_node
521
  _CheckNicsBridgesExist(lu, instance.nics, node)
522

    
523

    
524
def _CheckOSVariant(os_obj, name):
525
  """Check whether an OS name conforms to the os variants specification.
526

527
  @type os_obj: L{objects.OS}
528
  @param os_obj: OS object to check
529
  @type name: string
530
  @param name: OS name passed by the user, to check for validity
531

532
  """
533
  variant = objects.OS.GetVariant(name)
534
  if not os_obj.supported_variants:
535
    if variant:
536
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
537
                                 " passed)" % (os_obj.name, variant),
538
                                 errors.ECODE_INVAL)
539
    return
540
  if not variant:
541
    raise errors.OpPrereqError("OS name must include a variant",
542
                               errors.ECODE_INVAL)
543

    
544
  if variant not in os_obj.supported_variants:
545
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
546

    
547

    
548
def _CheckHostnameSane(lu, name):
549
  """Ensures that a given hostname resolves to a 'sane' name.
550

551
  The given name is required to be a prefix of the resolved hostname,
552
  to prevent accidental mismatches.
553

554
  @param lu: the logical unit on behalf of which we're checking
555
  @param name: the name we should resolve and check
556
  @return: the resolved hostname object
557

558
  """
559
  hostname = netutils.GetHostname(name=name)
560
  if hostname.name != name:
561
    lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
562
  if not utils.MatchNameComponent(name, [hostname.name]):
563
    raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
564
                                " same as given hostname '%s'") %
565
                                (hostname.name, name), errors.ECODE_INVAL)
566
  return hostname
567

    
568

    
569
def _WaitForSync(lu, instance, disks=None, oneshot=False):
570
  """Sleep and poll for an instance's disk to sync.
571

572
  """
573
  if not instance.disks or disks is not None and not disks:
574
    return True
575

    
576
  disks = _ExpandCheckDisks(instance, disks)
577

    
578
  if not oneshot:
579
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
580

    
581
  node = instance.primary_node
582

    
583
  for dev in disks:
584
    lu.cfg.SetDiskID(dev, node)
585

    
586
  # TODO: Convert to utils.Retry
587

    
588
  retries = 0
589
  degr_retries = 10 # in seconds, as we sleep 1 second each time
590
  while True:
591
    max_time = 0
592
    done = True
593
    cumul_degraded = False
594
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
595
    msg = rstats.fail_msg
596
    if msg:
597
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
598
      retries += 1
599
      if retries >= 10:
600
        raise errors.RemoteError("Can't contact node %s for mirror data,"
601
                                 " aborting." % node)
602
      time.sleep(6)
603
      continue
604
    rstats = rstats.payload
605
    retries = 0
606
    for i, mstat in enumerate(rstats):
607
      if mstat is None:
608
        lu.LogWarning("Can't compute data for node %s/%s",
609
                           node, disks[i].iv_name)
610
        continue
611

    
612
      cumul_degraded = (cumul_degraded or
613
                        (mstat.is_degraded and mstat.sync_percent is None))
614
      if mstat.sync_percent is not None:
615
        done = False
616
        if mstat.estimated_time is not None:
617
          rem_time = ("%s remaining (estimated)" %
618
                      utils.FormatSeconds(mstat.estimated_time))
619
          max_time = mstat.estimated_time
620
        else:
621
          rem_time = "no time estimate"
622
        lu.LogInfo("- device %s: %5.2f%% done, %s",
623
                   disks[i].iv_name, mstat.sync_percent, rem_time)
624

    
625
    # if we're done but degraded, let's do a few small retries, to
626
    # make sure we see a stable and not transient situation; therefore
627
    # we force restart of the loop
628
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
629
      logging.info("Degraded disks found, %d retries left", degr_retries)
630
      degr_retries -= 1
631
      time.sleep(1)
632
      continue
633

    
634
    if done or oneshot:
635
      break
636

    
637
    time.sleep(min(60, max_time))
638

    
639
  if done:
640
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
641

    
642
  return not cumul_degraded
643

    
644

    
645
def _BlockdevFind(lu, node, dev, instance):
646
  """Wrapper around call_blockdev_find to annotate diskparams.
647

648
  @param lu: A reference to the lu object
649
  @param node: The node to call out
650
  @param dev: The device to find
651
  @param instance: The instance object the device belongs to
652
  @returns The result of the rpc call
653

654
  """
655
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
656
  return lu.rpc.call_blockdev_find(node, disk)
657

    
658

    
659
def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
660
  """Wrapper around L{_CheckDiskConsistencyInner}.
661

662
  """
663
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
664
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
665
                                    ldisk=ldisk)
666

    
667

    
668
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
669
                               ldisk=False):
670
  """Check that mirrors are not degraded.
671

672
  @attention: The device has to be annotated already.
673

674
  The ldisk parameter, if True, will change the test from the
675
  is_degraded attribute (which represents overall non-ok status for
676
  the device(s)) to the ldisk (representing the local storage status).
677

678
  """
679
  lu.cfg.SetDiskID(dev, node)
680

    
681
  result = True
682

    
683
  if on_primary or dev.AssembleOnSecondary():
684
    rstats = lu.rpc.call_blockdev_find(node, dev)
685
    msg = rstats.fail_msg
686
    if msg:
687
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
688
      result = False
689
    elif not rstats.payload:
690
      lu.LogWarning("Can't find disk on node %s", node)
691
      result = False
692
    else:
693
      if ldisk:
694
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
695
      else:
696
        result = result and not rstats.payload.is_degraded
697

    
698
  if dev.children:
699
    for child in dev.children:
700
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
701
                                                     on_primary)
702

    
703
  return result
704

    
705

    
706
class LUOobCommand(NoHooksLU):
707
  """Logical unit for OOB handling.
708

709
  """
710
  REQ_BGL = False
711
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
712

    
713
  def ExpandNames(self):
714
    """Gather locks we need.
715

716
    """
717
    if self.op.node_names:
718
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
719
      lock_names = self.op.node_names
720
    else:
721
      lock_names = locking.ALL_SET
722

    
723
    self.needed_locks = {
724
      locking.LEVEL_NODE: lock_names,
725
      }
726

    
727
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
728

    
729
    if not self.op.node_names:
730
      # Acquire node allocation lock only if all nodes are affected
731
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
732

    
733
  def CheckPrereq(self):
734
    """Check prerequisites.
735

736
    This checks:
737
     - the node exists in the configuration
738
     - OOB is supported
739

740
    Any errors are signaled by raising errors.OpPrereqError.
741

742
    """
743
    self.nodes = []
744
    self.master_node = self.cfg.GetMasterNode()
745

    
746
    assert self.op.power_delay >= 0.0
747

    
748
    if self.op.node_names:
749
      if (self.op.command in self._SKIP_MASTER and
750
          self.master_node in self.op.node_names):
751
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
752
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
753

    
754
        if master_oob_handler:
755
          additional_text = ("run '%s %s %s' if you want to operate on the"
756
                             " master regardless") % (master_oob_handler,
757
                                                      self.op.command,
758
                                                      self.master_node)
759
        else:
760
          additional_text = "it does not support out-of-band operations"
761

    
762
        raise errors.OpPrereqError(("Operating on the master node %s is not"
763
                                    " allowed for %s; %s") %
764
                                   (self.master_node, self.op.command,
765
                                    additional_text), errors.ECODE_INVAL)
766
    else:
767
      self.op.node_names = self.cfg.GetNodeList()
768
      if self.op.command in self._SKIP_MASTER:
769
        self.op.node_names.remove(self.master_node)
770

    
771
    if self.op.command in self._SKIP_MASTER:
772
      assert self.master_node not in self.op.node_names
773

    
774
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
775
      if node is None:
776
        raise errors.OpPrereqError("Node %s not found" % node_name,
777
                                   errors.ECODE_NOENT)
778
      else:
779
        self.nodes.append(node)
780

    
781
      if (not self.op.ignore_status and
782
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
783
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
784
                                    " not marked offline") % node_name,
785
                                   errors.ECODE_STATE)
786

    
787
  def Exec(self, feedback_fn):
788
    """Execute OOB and return result if we expect any.
789

790
    """
791
    master_node = self.master_node
792
    ret = []
793

    
794
    for idx, node in enumerate(utils.NiceSort(self.nodes,
795
                                              key=lambda node: node.name)):
796
      node_entry = [(constants.RS_NORMAL, node.name)]
797
      ret.append(node_entry)
798

    
799
      oob_program = _SupportsOob(self.cfg, node)
800

    
801
      if not oob_program:
802
        node_entry.append((constants.RS_UNAVAIL, None))
803
        continue
804

    
805
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
806
                   self.op.command, oob_program, node.name)
807
      result = self.rpc.call_run_oob(master_node, oob_program,
808
                                     self.op.command, node.name,
809
                                     self.op.timeout)
810

    
811
      if result.fail_msg:
812
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
813
                        node.name, result.fail_msg)
814
        node_entry.append((constants.RS_NODATA, None))
815
      else:
816
        try:
817
          self._CheckPayload(result)
818
        except errors.OpExecError, err:
819
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
820
                          node.name, err)
821
          node_entry.append((constants.RS_NODATA, None))
822
        else:
823
          if self.op.command == constants.OOB_HEALTH:
824
            # For health we should log important events
825
            for item, status in result.payload:
826
              if status in [constants.OOB_STATUS_WARNING,
827
                            constants.OOB_STATUS_CRITICAL]:
828
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
829
                                item, node.name, status)
830

    
831
          if self.op.command == constants.OOB_POWER_ON:
832
            node.powered = True
833
          elif self.op.command == constants.OOB_POWER_OFF:
834
            node.powered = False
835
          elif self.op.command == constants.OOB_POWER_STATUS:
836
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
837
            if powered != node.powered:
838
              logging.warning(("Recorded power state (%s) of node '%s' does not"
839
                               " match actual power state (%s)"), node.powered,
840
                              node.name, powered)
841

    
842
          # For configuration changing commands we should update the node
843
          if self.op.command in (constants.OOB_POWER_ON,
844
                                 constants.OOB_POWER_OFF):
845
            self.cfg.Update(node, feedback_fn)
846

    
847
          node_entry.append((constants.RS_NORMAL, result.payload))
848

    
849
          if (self.op.command == constants.OOB_POWER_ON and
850
              idx < len(self.nodes) - 1):
851
            time.sleep(self.op.power_delay)
852

    
853
    return ret
854

    
855
  def _CheckPayload(self, result):
856
    """Checks if the payload is valid.
857

858
    @param result: RPC result
859
    @raises errors.OpExecError: If payload is not valid
860

861
    """
862
    errs = []
863
    if self.op.command == constants.OOB_HEALTH:
864
      if not isinstance(result.payload, list):
865
        errs.append("command 'health' is expected to return a list but got %s" %
866
                    type(result.payload))
867
      else:
868
        for item, status in result.payload:
869
          if status not in constants.OOB_STATUSES:
870
            errs.append("health item '%s' has invalid status '%s'" %
871
                        (item, status))
872

    
873
    if self.op.command == constants.OOB_POWER_STATUS:
874
      if not isinstance(result.payload, dict):
875
        errs.append("power-status is expected to return a dict but got %s" %
876
                    type(result.payload))
877

    
878
    if self.op.command in [
879
      constants.OOB_POWER_ON,
880
      constants.OOB_POWER_OFF,
881
      constants.OOB_POWER_CYCLE,
882
      ]:
883
      if result.payload is not None:
884
        errs.append("%s is expected to not return payload but got '%s'" %
885
                    (self.op.command, result.payload))
886

    
887
    if errs:
888
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
889
                               utils.CommaJoin(errs))
890

    
891

    
892
class _OsQuery(_QueryBase):
893
  FIELDS = query.OS_FIELDS
894

    
895
  def ExpandNames(self, lu):
896
    # Lock all nodes in shared mode
897
    # Temporary removal of locks, should be reverted later
898
    # TODO: reintroduce locks when they are lighter-weight
899
    lu.needed_locks = {}
900
    #self.share_locks[locking.LEVEL_NODE] = 1
901
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
902

    
903
    # The following variables interact with _QueryBase._GetNames
904
    if self.names:
905
      self.wanted = self.names
906
    else:
907
      self.wanted = locking.ALL_SET
908

    
909
    self.do_locking = self.use_locking
910

    
911
  def DeclareLocks(self, lu, level):
912
    pass
913

    
914
  @staticmethod
915
  def _DiagnoseByOS(rlist):
916
    """Remaps a per-node return list into an a per-os per-node dictionary
917

918
    @param rlist: a map with node names as keys and OS objects as values
919

920
    @rtype: dict
921
    @return: a dictionary with osnames as keys and as value another
922
        map, with nodes as keys and tuples of (path, status, diagnose,
923
        variants, parameters, api_versions) as values, eg::
924

925
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
926
                                     (/srv/..., False, "invalid api")],
927
                           "node2": [(/srv/..., True, "", [], [])]}
928
          }
929

930
    """
931
    all_os = {}
932
    # we build here the list of nodes that didn't fail the RPC (at RPC
933
    # level), so that nodes with a non-responding node daemon don't
934
    # make all OSes invalid
935
    good_nodes = [node_name for node_name in rlist
936
                  if not rlist[node_name].fail_msg]
937
    for node_name, nr in rlist.items():
938
      if nr.fail_msg or not nr.payload:
939
        continue
940
      for (name, path, status, diagnose, variants,
941
           params, api_versions) in nr.payload:
942
        if name not in all_os:
943
          # build a list of nodes for this os containing empty lists
944
          # for each node in node_list
945
          all_os[name] = {}
946
          for nname in good_nodes:
947
            all_os[name][nname] = []
948
        # convert params from [name, help] to (name, help)
949
        params = [tuple(v) for v in params]
950
        all_os[name][node_name].append((path, status, diagnose,
951
                                        variants, params, api_versions))
952
    return all_os
953

    
954
  def _GetQueryData(self, lu):
955
    """Computes the list of nodes and their attributes.
956

957
    """
958
    # Locking is not used
959
    assert not (compat.any(lu.glm.is_owned(level)
960
                           for level in locking.LEVELS
961
                           if level != locking.LEVEL_CLUSTER) or
962
                self.do_locking or self.use_locking)
963

    
964
    valid_nodes = [node.name
965
                   for node in lu.cfg.GetAllNodesInfo().values()
966
                   if not node.offline and node.vm_capable]
967
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
968
    cluster = lu.cfg.GetClusterInfo()
969

    
970
    data = {}
971

    
972
    for (os_name, os_data) in pol.items():
973
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
974
                          hidden=(os_name in cluster.hidden_os),
975
                          blacklisted=(os_name in cluster.blacklisted_os))
976

    
977
      variants = set()
978
      parameters = set()
979
      api_versions = set()
980

    
981
      for idx, osl in enumerate(os_data.values()):
982
        info.valid = bool(info.valid and osl and osl[0][1])
983
        if not info.valid:
984
          break
985

    
986
        (node_variants, node_params, node_api) = osl[0][3:6]
987
        if idx == 0:
988
          # First entry
989
          variants.update(node_variants)
990
          parameters.update(node_params)
991
          api_versions.update(node_api)
992
        else:
993
          # Filter out inconsistent values
994
          variants.intersection_update(node_variants)
995
          parameters.intersection_update(node_params)
996
          api_versions.intersection_update(node_api)
997

    
998
      info.variants = list(variants)
999
      info.parameters = list(parameters)
1000
      info.api_versions = list(api_versions)
1001

    
1002
      data[os_name] = info
1003

    
1004
    # Prepare data in requested order
1005
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1006
            if name in data]
1007

    
1008

    
1009
class LUOsDiagnose(NoHooksLU):
1010
  """Logical unit for OS diagnose/query.
1011

1012
  """
1013
  REQ_BGL = False
1014

    
1015
  @staticmethod
1016
  def _BuildFilter(fields, names):
1017
    """Builds a filter for querying OSes.
1018

1019
    """
1020
    name_filter = qlang.MakeSimpleFilter("name", names)
1021

    
1022
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
1023
    # respective field is not requested
1024
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
1025
                     for fname in ["hidden", "blacklisted"]
1026
                     if fname not in fields]
1027
    if "valid" not in fields:
1028
      status_filter.append([qlang.OP_TRUE, "valid"])
1029

    
1030
    if status_filter:
1031
      status_filter.insert(0, qlang.OP_AND)
1032
    else:
1033
      status_filter = None
1034

    
1035
    if name_filter and status_filter:
1036
      return [qlang.OP_AND, name_filter, status_filter]
1037
    elif name_filter:
1038
      return name_filter
1039
    else:
1040
      return status_filter
1041

    
1042
  def CheckArguments(self):
1043
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
1044
                       self.op.output_fields, False)
1045

    
1046
  def ExpandNames(self):
1047
    self.oq.ExpandNames(self)
1048

    
1049
  def Exec(self, feedback_fn):
1050
    return self.oq.OldStyleQuery(self)
1051

    
1052

    
1053
class _ExtStorageQuery(_QueryBase):
1054
  FIELDS = query.EXTSTORAGE_FIELDS
1055

    
1056
  def ExpandNames(self, lu):
1057
    # Lock all nodes in shared mode
1058
    # Temporary removal of locks, should be reverted later
1059
    # TODO: reintroduce locks when they are lighter-weight
1060
    lu.needed_locks = {}
1061
    #self.share_locks[locking.LEVEL_NODE] = 1
1062
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1063

    
1064
    # The following variables interact with _QueryBase._GetNames
1065
    if self.names:
1066
      self.wanted = self.names
1067
    else:
1068
      self.wanted = locking.ALL_SET
1069

    
1070
    self.do_locking = self.use_locking
1071

    
1072
  def DeclareLocks(self, lu, level):
1073
    pass
1074

    
1075
  @staticmethod
1076
  def _DiagnoseByProvider(rlist):
1077
    """Remaps a per-node return list into an a per-provider per-node dictionary
1078

1079
    @param rlist: a map with node names as keys and ExtStorage objects as values
1080

1081
    @rtype: dict
1082
    @return: a dictionary with extstorage providers as keys and as
1083
        value another map, with nodes as keys and tuples of
1084
        (path, status, diagnose, parameters) as values, eg::
1085

1086
          {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
1087
                         "node2": [(/srv/..., False, "missing file")]
1088
                         "node3": [(/srv/..., True, "", [])]
1089
          }
1090

1091
    """
1092
    all_es = {}
1093
    # we build here the list of nodes that didn't fail the RPC (at RPC
1094
    # level), so that nodes with a non-responding node daemon don't
1095
    # make all OSes invalid
1096
    good_nodes = [node_name for node_name in rlist
1097
                  if not rlist[node_name].fail_msg]
1098
    for node_name, nr in rlist.items():
1099
      if nr.fail_msg or not nr.payload:
1100
        continue
1101
      for (name, path, status, diagnose, params) in nr.payload:
1102
        if name not in all_es:
1103
          # build a list of nodes for this os containing empty lists
1104
          # for each node in node_list
1105
          all_es[name] = {}
1106
          for nname in good_nodes:
1107
            all_es[name][nname] = []
1108
        # convert params from [name, help] to (name, help)
1109
        params = [tuple(v) for v in params]
1110
        all_es[name][node_name].append((path, status, diagnose, params))
1111
    return all_es
1112

    
1113
  def _GetQueryData(self, lu):
1114
    """Computes the list of nodes and their attributes.
1115

1116
    """
1117
    # Locking is not used
1118
    assert not (compat.any(lu.glm.is_owned(level)
1119
                           for level in locking.LEVELS
1120
                           if level != locking.LEVEL_CLUSTER) or
1121
                self.do_locking or self.use_locking)
1122

    
1123
    valid_nodes = [node.name
1124
                   for node in lu.cfg.GetAllNodesInfo().values()
1125
                   if not node.offline and node.vm_capable]
1126
    pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
1127

    
1128
    data = {}
1129

    
1130
    nodegroup_list = lu.cfg.GetNodeGroupList()
1131

    
1132
    for (es_name, es_data) in pol.items():
1133
      # For every provider compute the nodegroup validity.
1134
      # To do this we need to check the validity of each node in es_data
1135
      # and then construct the corresponding nodegroup dict:
1136
      #      { nodegroup1: status
1137
      #        nodegroup2: status
1138
      #      }
1139
      ndgrp_data = {}
1140
      for nodegroup in nodegroup_list:
1141
        ndgrp = lu.cfg.GetNodeGroup(nodegroup)
1142

    
1143
        nodegroup_nodes = ndgrp.members
1144
        nodegroup_name = ndgrp.name
1145
        node_statuses = []
1146

    
1147
        for node in nodegroup_nodes:
1148
          if node in valid_nodes:
1149
            if es_data[node] != []:
1150
              node_status = es_data[node][0][1]
1151
              node_statuses.append(node_status)
1152
            else:
1153
              node_statuses.append(False)
1154

    
1155
        if False in node_statuses:
1156
          ndgrp_data[nodegroup_name] = False
1157
        else:
1158
          ndgrp_data[nodegroup_name] = True
1159

    
1160
      # Compute the provider's parameters
1161
      parameters = set()
1162
      for idx, esl in enumerate(es_data.values()):
1163
        valid = bool(esl and esl[0][1])
1164
        if not valid:
1165
          break
1166

    
1167
        node_params = esl[0][3]
1168
        if idx == 0:
1169
          # First entry
1170
          parameters.update(node_params)
1171
        else:
1172
          # Filter out inconsistent values
1173
          parameters.intersection_update(node_params)
1174

    
1175
      params = list(parameters)
1176

    
1177
      # Now fill all the info for this provider
1178
      info = query.ExtStorageInfo(name=es_name, node_status=es_data,
1179
                                  nodegroup_status=ndgrp_data,
1180
                                  parameters=params)
1181

    
1182
      data[es_name] = info
1183

    
1184
    # Prepare data in requested order
1185
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1186
            if name in data]
1187

    
1188

    
1189
class LUExtStorageDiagnose(NoHooksLU):
1190
  """Logical unit for ExtStorage diagnose/query.
1191

1192
  """
1193
  REQ_BGL = False
1194

    
1195
  def CheckArguments(self):
1196
    self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
1197
                               self.op.output_fields, False)
1198

    
1199
  def ExpandNames(self):
1200
    self.eq.ExpandNames(self)
1201

    
1202
  def Exec(self, feedback_fn):
1203
    return self.eq.OldStyleQuery(self)
1204

    
1205

    
1206
class _InstanceQuery(_QueryBase):
1207
  FIELDS = query.INSTANCE_FIELDS
1208

    
1209
  def ExpandNames(self, lu):
1210
    lu.needed_locks = {}
1211
    lu.share_locks = _ShareAll()
1212

    
1213
    if self.names:
1214
      self.wanted = _GetWantedInstances(lu, self.names)
1215
    else:
1216
      self.wanted = locking.ALL_SET
1217

    
1218
    self.do_locking = (self.use_locking and
1219
                       query.IQ_LIVE in self.requested_data)
1220
    if self.do_locking:
1221
      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1222
      lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1223
      lu.needed_locks[locking.LEVEL_NODE] = []
1224
      lu.needed_locks[locking.LEVEL_NETWORK] = []
1225
      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1226

    
1227
    self.do_grouplocks = (self.do_locking and
1228
                          query.IQ_NODES in self.requested_data)
1229

    
1230
  def DeclareLocks(self, lu, level):
1231
    if self.do_locking:
1232
      if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1233
        assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1234

    
1235
        # Lock all groups used by instances optimistically; this requires going
1236
        # via the node before it's locked, requiring verification later on
1237
        lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1238
          set(group_uuid
1239
              for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1240
              for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1241
      elif level == locking.LEVEL_NODE:
1242
        lu._LockInstancesNodes() # pylint: disable=W0212
1243

    
1244
      elif level == locking.LEVEL_NETWORK:
1245
        lu.needed_locks[locking.LEVEL_NETWORK] = \
1246
          frozenset(net_uuid
1247
                    for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1248
                    for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1249

    
1250
  @staticmethod
1251
  def _CheckGroupLocks(lu):
1252
    owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1253
    owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1254

    
1255
    # Check if node groups for locked instances are still correct
1256
    for instance_name in owned_instances:
1257
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1258

    
1259
  def _GetQueryData(self, lu):
1260
    """Computes the list of instances and their attributes.
1261

1262
    """
1263
    if self.do_grouplocks:
1264
      self._CheckGroupLocks(lu)
1265

    
1266
    cluster = lu.cfg.GetClusterInfo()
1267
    all_info = lu.cfg.GetAllInstancesInfo()
1268

    
1269
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1270

    
1271
    instance_list = [all_info[name] for name in instance_names]
1272
    nodes = frozenset(itertools.chain(*(inst.all_nodes
1273
                                        for inst in instance_list)))
1274
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
1275
    bad_nodes = []
1276
    offline_nodes = []
1277
    wrongnode_inst = set()
1278

    
1279
    # Gather data as requested
1280
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1281
      live_data = {}
1282
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1283
      for name in nodes:
1284
        result = node_data[name]
1285
        if result.offline:
1286
          # offline nodes will be in both lists
1287
          assert result.fail_msg
1288
          offline_nodes.append(name)
1289
        if result.fail_msg:
1290
          bad_nodes.append(name)
1291
        elif result.payload:
1292
          for inst in result.payload:
1293
            if inst in all_info:
1294
              if all_info[inst].primary_node == name:
1295
                live_data.update(result.payload)
1296
              else:
1297
                wrongnode_inst.add(inst)
1298
            else:
1299
              # orphan instance; we don't list it here as we don't
1300
              # handle this case yet in the output of instance listing
1301
              logging.warning("Orphan instance '%s' found on node %s",
1302
                              inst, name)
1303
        # else no instance is alive
1304
    else:
1305
      live_data = {}
1306

    
1307
    if query.IQ_DISKUSAGE in self.requested_data:
1308
      gmi = ganeti.masterd.instance
1309
      disk_usage = dict((inst.name,
1310
                         gmi.ComputeDiskSize(inst.disk_template,
1311
                                             [{constants.IDISK_SIZE: disk.size}
1312
                                              for disk in inst.disks]))
1313
                        for inst in instance_list)
1314
    else:
1315
      disk_usage = None
1316

    
1317
    if query.IQ_CONSOLE in self.requested_data:
1318
      consinfo = {}
1319
      for inst in instance_list:
1320
        if inst.name in live_data:
1321
          # Instance is running
1322
          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
1323
        else:
1324
          consinfo[inst.name] = None
1325
      assert set(consinfo.keys()) == set(instance_names)
1326
    else:
1327
      consinfo = None
1328

    
1329
    if query.IQ_NODES in self.requested_data:
1330
      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
1331
                                            instance_list)))
1332
      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
1333
      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
1334
                    for uuid in set(map(operator.attrgetter("group"),
1335
                                        nodes.values())))
1336
    else:
1337
      nodes = None
1338
      groups = None
1339

    
1340
    if query.IQ_NETWORKS in self.requested_data:
1341
      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
1342
                                    for i in instance_list))
1343
      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
1344
    else:
1345
      networks = None
1346

    
1347
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
1348
                                   disk_usage, offline_nodes, bad_nodes,
1349
                                   live_data, wrongnode_inst, consinfo,
1350
                                   nodes, groups, networks)
1351

    
1352

    
1353
class LUQuery(NoHooksLU):
1354
  """Query for resources/items of a certain kind.
1355

1356
  """
1357
  # pylint: disable=W0142
1358
  REQ_BGL = False
1359

    
1360
  def CheckArguments(self):
1361
    qcls = _GetQueryImplementation(self.op.what)
1362

    
1363
    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
1364

    
1365
  def ExpandNames(self):
1366
    self.impl.ExpandNames(self)
1367

    
1368
  def DeclareLocks(self, level):
1369
    self.impl.DeclareLocks(self, level)
1370

    
1371
  def Exec(self, feedback_fn):
1372
    return self.impl.NewStyleQuery(self)
1373

    
1374

    
1375
class LUQueryFields(NoHooksLU):
1376
  """Query for resources/items of a certain kind.
1377

1378
  """
1379
  # pylint: disable=W0142
1380
  REQ_BGL = False
1381

    
1382
  def CheckArguments(self):
1383
    self.qcls = _GetQueryImplementation(self.op.what)
1384

    
1385
  def ExpandNames(self):
1386
    self.needed_locks = {}
1387

    
1388
  def Exec(self, feedback_fn):
1389
    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
1390

    
1391

    
1392
class LUInstanceActivateDisks(NoHooksLU):
1393
  """Bring up an instance's disks.
1394

1395
  """
1396
  REQ_BGL = False
1397

    
1398
  def ExpandNames(self):
1399
    self._ExpandAndLockInstance()
1400
    self.needed_locks[locking.LEVEL_NODE] = []
1401
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1402

    
1403
  def DeclareLocks(self, level):
1404
    if level == locking.LEVEL_NODE:
1405
      self._LockInstancesNodes()
1406

    
1407
  def CheckPrereq(self):
1408
    """Check prerequisites.
1409

1410
    This checks that the instance is in the cluster.
1411

1412
    """
1413
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1414
    assert self.instance is not None, \
1415
      "Cannot retrieve locked instance %s" % self.op.instance_name
1416
    _CheckNodeOnline(self, self.instance.primary_node)
1417

    
1418
  def Exec(self, feedback_fn):
1419
    """Activate the disks.
1420

1421
    """
1422
    disks_ok, disks_info = \
1423
              _AssembleInstanceDisks(self, self.instance,
1424
                                     ignore_size=self.op.ignore_size)
1425
    if not disks_ok:
1426
      raise errors.OpExecError("Cannot activate block devices")
1427

    
1428
    if self.op.wait_for_sync:
1429
      if not _WaitForSync(self, self.instance):
1430
        raise errors.OpExecError("Some disks of the instance are degraded!")
1431

    
1432
    return disks_info
1433

    
1434

    
1435
def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1436
                           ignore_size=False):
1437
  """Prepare the block devices for an instance.
1438

1439
  This sets up the block devices on all nodes.
1440

1441
  @type lu: L{LogicalUnit}
1442
  @param lu: the logical unit on whose behalf we execute
1443
  @type instance: L{objects.Instance}
1444
  @param instance: the instance for whose disks we assemble
1445
  @type disks: list of L{objects.Disk} or None
1446
  @param disks: which disks to assemble (or all, if None)
1447
  @type ignore_secondaries: boolean
1448
  @param ignore_secondaries: if true, errors on secondary nodes
1449
      won't result in an error return from the function
1450
  @type ignore_size: boolean
1451
  @param ignore_size: if true, the current known size of the disk
1452
      will not be used during the disk activation, useful for cases
1453
      when the size is wrong
1454
  @return: False if the operation failed, otherwise a list of
1455
      (host, instance_visible_name, node_visible_name)
1456
      with the mapping from node devices to instance devices
1457

1458
  """
1459
  device_info = []
1460
  disks_ok = True
1461
  iname = instance.name
1462
  disks = _ExpandCheckDisks(instance, disks)
1463

    
1464
  # With the two passes mechanism we try to reduce the window of
1465
  # opportunity for the race condition of switching DRBD to primary
1466
  # before handshaking occured, but we do not eliminate it
1467

    
1468
  # The proper fix would be to wait (with some limits) until the
1469
  # connection has been made and drbd transitions from WFConnection
1470
  # into any other network-connected state (Connected, SyncTarget,
1471
  # SyncSource, etc.)
1472

    
1473
  # 1st pass, assemble on all nodes in secondary mode
1474
  for idx, inst_disk in enumerate(disks):
1475
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1476
      if ignore_size:
1477
        node_disk = node_disk.Copy()
1478
        node_disk.UnsetSize()
1479
      lu.cfg.SetDiskID(node_disk, node)
1480
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1481
                                             False, idx)
1482
      msg = result.fail_msg
1483
      if msg:
1484
        is_offline_secondary = (node in instance.secondary_nodes and
1485
                                result.offline)
1486
        lu.LogWarning("Could not prepare block device %s on node %s"
1487
                      " (is_primary=False, pass=1): %s",
1488
                      inst_disk.iv_name, node, msg)
1489
        if not (ignore_secondaries or is_offline_secondary):
1490
          disks_ok = False
1491

    
1492
  # FIXME: race condition on drbd migration to primary
1493

    
1494
  # 2nd pass, do only the primary node
1495
  for idx, inst_disk in enumerate(disks):
1496
    dev_path = None
1497

    
1498
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1499
      if node != instance.primary_node:
1500
        continue
1501
      if ignore_size:
1502
        node_disk = node_disk.Copy()
1503
        node_disk.UnsetSize()
1504
      lu.cfg.SetDiskID(node_disk, node)
1505
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1506
                                             True, idx)
1507
      msg = result.fail_msg
1508
      if msg:
1509
        lu.LogWarning("Could not prepare block device %s on node %s"
1510
                      " (is_primary=True, pass=2): %s",
1511
                      inst_disk.iv_name, node, msg)
1512
        disks_ok = False
1513
      else:
1514
        dev_path = result.payload
1515

    
1516
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1517

    
1518
  # leave the disks configured for the primary node
1519
  # this is a workaround that would be fixed better by
1520
  # improving the logical/physical id handling
1521
  for disk in disks:
1522
    lu.cfg.SetDiskID(disk, instance.primary_node)
1523

    
1524
  return disks_ok, device_info
1525

    
1526

    
1527
def _StartInstanceDisks(lu, instance, force):
1528
  """Start the disks of an instance.
1529

1530
  """
1531
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
1532
                                           ignore_secondaries=force)
1533
  if not disks_ok:
1534
    _ShutdownInstanceDisks(lu, instance)
1535
    if force is not None and not force:
1536
      lu.LogWarning("",
1537
                    hint=("If the message above refers to a secondary node,"
1538
                          " you can retry the operation using '--force'"))
1539
    raise errors.OpExecError("Disk consistency error")
1540

    
1541

    
1542
class LUInstanceDeactivateDisks(NoHooksLU):
1543
  """Shutdown an instance's disks.
1544

1545
  """
1546
  REQ_BGL = False
1547

    
1548
  def ExpandNames(self):
1549
    self._ExpandAndLockInstance()
1550
    self.needed_locks[locking.LEVEL_NODE] = []
1551
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1552

    
1553
  def DeclareLocks(self, level):
1554
    if level == locking.LEVEL_NODE:
1555
      self._LockInstancesNodes()
1556

    
1557
  def CheckPrereq(self):
1558
    """Check prerequisites.
1559

1560
    This checks that the instance is in the cluster.
1561

1562
    """
1563
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1564
    assert self.instance is not None, \
1565
      "Cannot retrieve locked instance %s" % self.op.instance_name
1566

    
1567
  def Exec(self, feedback_fn):
1568
    """Deactivate the disks
1569

1570
    """
1571
    instance = self.instance
1572
    if self.op.force:
1573
      _ShutdownInstanceDisks(self, instance)
1574
    else:
1575
      _SafeShutdownInstanceDisks(self, instance)
1576

    
1577

    
1578
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1579
  """Shutdown block devices of an instance.
1580

1581
  This function checks if an instance is running, before calling
1582
  _ShutdownInstanceDisks.
1583

1584
  """
1585
  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1586
  _ShutdownInstanceDisks(lu, instance, disks=disks)
1587

    
1588

    
1589
def _ExpandCheckDisks(instance, disks):
1590
  """Return the instance disks selected by the disks list
1591

1592
  @type disks: list of L{objects.Disk} or None
1593
  @param disks: selected disks
1594
  @rtype: list of L{objects.Disk}
1595
  @return: selected instance disks to act on
1596

1597
  """
1598
  if disks is None:
1599
    return instance.disks
1600
  else:
1601
    if not set(disks).issubset(instance.disks):
1602
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1603
                                   " target instance")
1604
    return disks
1605

    
1606

    
1607
def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1608
  """Shutdown block devices of an instance.
1609

1610
  This does the shutdown on all nodes of the instance.
1611

1612
  If the ignore_primary is false, errors on the primary node are
1613
  ignored.
1614

1615
  """
1616
  all_result = True
1617
  disks = _ExpandCheckDisks(instance, disks)
1618

    
1619
  for disk in disks:
1620
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1621
      lu.cfg.SetDiskID(top_disk, node)
1622
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1623
      msg = result.fail_msg
1624
      if msg:
1625
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1626
                      disk.iv_name, node, msg)
1627
        if ((node == instance.primary_node and not ignore_primary) or
1628
            (node != instance.primary_node and not result.offline)):
1629
          all_result = False
1630
  return all_result
1631

    
1632

    
1633
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
1634
  """Checks if a node has enough free memory.
1635

1636
  This function checks if a given node has the needed amount of free
1637
  memory. In case the node has less memory or we cannot get the
1638
  information from the node, this function raises an OpPrereqError
1639
  exception.
1640

1641
  @type lu: C{LogicalUnit}
1642
  @param lu: a logical unit from which we get configuration data
1643
  @type node: C{str}
1644
  @param node: the node to check
1645
  @type reason: C{str}
1646
  @param reason: string to use in the error message
1647
  @type requested: C{int}
1648
  @param requested: the amount of memory in MiB to check for
1649
  @type hypervisor_name: C{str}
1650
  @param hypervisor_name: the hypervisor to ask for memory stats
1651
  @rtype: integer
1652
  @return: node current free memory
1653
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
1654
      we cannot check the node
1655

1656
  """
1657
  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
1658
  nodeinfo[node].Raise("Can't get data from node %s" % node,
1659
                       prereq=True, ecode=errors.ECODE_ENVIRON)
1660
  (_, _, (hv_info, )) = nodeinfo[node].payload
1661

    
1662
  free_mem = hv_info.get("memory_free", None)
1663
  if not isinstance(free_mem, int):
1664
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1665
                               " was '%s'" % (node, free_mem),
1666
                               errors.ECODE_ENVIRON)
1667
  if requested > free_mem:
1668
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1669
                               " needed %s MiB, available %s MiB" %
1670
                               (node, reason, requested, free_mem),
1671
                               errors.ECODE_NORES)
1672
  return free_mem
1673

    
1674

    
1675
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
1676
  """Checks if nodes have enough free disk space in all the VGs.
1677

1678
  This function checks if all given nodes have the needed amount of
1679
  free disk. In case any node has less disk or we cannot get the
1680
  information from the node, this function raises an OpPrereqError
1681
  exception.
1682

1683
  @type lu: C{LogicalUnit}
1684
  @param lu: a logical unit from which we get configuration data
1685
  @type nodenames: C{list}
1686
  @param nodenames: the list of node names to check
1687
  @type req_sizes: C{dict}
1688
  @param req_sizes: the hash of vg and corresponding amount of disk in
1689
      MiB to check for
1690
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
1691
      or we cannot check the node
1692

1693
  """
1694
  for vg, req_size in req_sizes.items():
1695
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
1696

    
1697

    
1698
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
1699
  """Checks if nodes have enough free disk space in the specified VG.
1700

1701
  This function checks if all given nodes have the needed amount of
1702
  free disk. In case any node has less disk or we cannot get the
1703
  information from the node, this function raises an OpPrereqError
1704
  exception.
1705

1706
  @type lu: C{LogicalUnit}
1707
  @param lu: a logical unit from which we get configuration data
1708
  @type nodenames: C{list}
1709
  @param nodenames: the list of node names to check
1710
  @type vg: C{str}
1711
  @param vg: the volume group to check
1712
  @type requested: C{int}
1713
  @param requested: the amount of disk in MiB to check for
1714
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
1715
      or we cannot check the node
1716

1717
  """
1718
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
1719
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
1720
  for node in nodenames:
1721
    info = nodeinfo[node]
1722
    info.Raise("Cannot get current information from node %s" % node,
1723
               prereq=True, ecode=errors.ECODE_ENVIRON)
1724
    (_, (vg_info, ), _) = info.payload
1725
    vg_free = vg_info.get("vg_free", None)
1726
    if not isinstance(vg_free, int):
1727
      raise errors.OpPrereqError("Can't compute free disk space on node"
1728
                                 " %s for vg %s, result was '%s'" %
1729
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
1730
    if requested > vg_free:
1731
      raise errors.OpPrereqError("Not enough disk space on target node %s"
1732
                                 " vg %s: required %d MiB, available %d MiB" %
1733
                                 (node, vg, requested, vg_free),
1734
                                 errors.ECODE_NORES)
1735

    
1736

    
1737
def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
1738
  """Checks if nodes have enough physical CPUs
1739

1740
  This function checks if all given nodes have the needed number of
1741
  physical CPUs. In case any node has less CPUs or we cannot get the
1742
  information from the node, this function raises an OpPrereqError
1743
  exception.
1744

1745
  @type lu: C{LogicalUnit}
1746
  @param lu: a logical unit from which we get configuration data
1747
  @type nodenames: C{list}
1748
  @param nodenames: the list of node names to check
1749
  @type requested: C{int}
1750
  @param requested: the minimum acceptable number of physical CPUs
1751
  @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
1752
      or we cannot check the node
1753

1754
  """
1755
  nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
1756
  for node in nodenames:
1757
    info = nodeinfo[node]
1758
    info.Raise("Cannot get current information from node %s" % node,
1759
               prereq=True, ecode=errors.ECODE_ENVIRON)
1760
    (_, _, (hv_info, )) = info.payload
1761
    num_cpus = hv_info.get("cpu_total", None)
1762
    if not isinstance(num_cpus, int):
1763
      raise errors.OpPrereqError("Can't compute the number of physical CPUs"
1764
                                 " on node %s, result was '%s'" %
1765
                                 (node, num_cpus), errors.ECODE_ENVIRON)
1766
    if requested > num_cpus:
1767
      raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
1768
                                 "required" % (node, num_cpus, requested),
1769
                                 errors.ECODE_NORES)
1770

    
1771

    
1772
class LUInstanceStartup(LogicalUnit):
1773
  """Starts an instance.
1774

1775
  """
1776
  HPATH = "instance-start"
1777
  HTYPE = constants.HTYPE_INSTANCE
1778
  REQ_BGL = False
1779

    
1780
  def CheckArguments(self):
1781
    # extra beparams
1782
    if self.op.beparams:
1783
      # fill the beparams dict
1784
      objects.UpgradeBeParams(self.op.beparams)
1785
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1786

    
1787
  def ExpandNames(self):
1788
    self._ExpandAndLockInstance()
1789
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1790

    
1791
  def DeclareLocks(self, level):
1792
    if level == locking.LEVEL_NODE_RES:
1793
      self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
1794

    
1795
  def BuildHooksEnv(self):
1796
    """Build hooks env.
1797

1798
    This runs on master, primary and secondary nodes of the instance.
1799

1800
    """
1801
    env = {
1802
      "FORCE": self.op.force,
1803
      }
1804

    
1805
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1806

    
1807
    return env
1808

    
1809
  def BuildHooksNodes(self):
1810
    """Build hooks nodes.
1811

1812
    """
1813
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1814
    return (nl, nl)
1815

    
1816
  def CheckPrereq(self):
1817
    """Check prerequisites.
1818

1819
    This checks that the instance is in the cluster.
1820

1821
    """
1822
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1823
    assert self.instance is not None, \
1824
      "Cannot retrieve locked instance %s" % self.op.instance_name
1825

    
1826
    # extra hvparams
1827
    if self.op.hvparams:
1828
      # check hypervisor parameter syntax (locally)
1829
      cluster = self.cfg.GetClusterInfo()
1830
      utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
1831
      filled_hvp = cluster.FillHV(instance)
1832
      filled_hvp.update(self.op.hvparams)
1833
      hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
1834
      hv_type.CheckParameterSyntax(filled_hvp)
1835
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
1836

    
1837
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
1838

    
1839
    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
1840

    
1841
    if self.primary_offline and self.op.ignore_offline_nodes:
1842
      self.LogWarning("Ignoring offline primary node")
1843

    
1844
      if self.op.hvparams or self.op.beparams:
1845
        self.LogWarning("Overridden parameters are ignored")
1846
    else:
1847
      _CheckNodeOnline(self, instance.primary_node)
1848

    
1849
      bep = self.cfg.GetClusterInfo().FillBE(instance)
1850
      bep.update(self.op.beparams)
1851

    
1852
      # check bridges existence
1853
      _CheckInstanceBridgesExist(self, instance)
1854

    
1855
      remote_info = self.rpc.call_instance_info(instance.primary_node,
1856
                                                instance.name,
1857
                                                instance.hypervisor)
1858
      remote_info.Raise("Error checking node %s" % instance.primary_node,
1859
                        prereq=True, ecode=errors.ECODE_ENVIRON)
1860
      if not remote_info.payload: # not running already
1861
        _CheckNodeFreeMemory(self, instance.primary_node,
1862
                             "starting instance %s" % instance.name,
1863
                             bep[constants.BE_MINMEM], instance.hypervisor)
1864

    
1865
  def Exec(self, feedback_fn):
1866
    """Start the instance.
1867

1868
    """
1869
    instance = self.instance
1870
    force = self.op.force
1871
    reason = self.op.reason
1872

    
1873
    if not self.op.no_remember:
1874
      self.cfg.MarkInstanceUp(instance.name)
1875

    
1876
    if self.primary_offline:
1877
      assert self.op.ignore_offline_nodes
1878
      self.LogInfo("Primary node offline, marked instance as started")
1879
    else:
1880
      node_current = instance.primary_node
1881

    
1882
      _StartInstanceDisks(self, instance, force)
1883

    
1884
      result = \
1885
        self.rpc.call_instance_start(node_current,
1886
                                     (instance, self.op.hvparams,
1887
                                      self.op.beparams),
1888
                                     self.op.startup_paused, reason)
1889
      msg = result.fail_msg
1890
      if msg:
1891
        _ShutdownInstanceDisks(self, instance)
1892
        raise errors.OpExecError("Could not start instance: %s" % msg)
1893

    
1894

    
1895
class LUInstanceReboot(LogicalUnit):
1896
  """Reboot an instance.
1897

1898
  """
1899
  HPATH = "instance-reboot"
1900
  HTYPE = constants.HTYPE_INSTANCE
1901
  REQ_BGL = False
1902

    
1903
  def ExpandNames(self):
1904
    self._ExpandAndLockInstance()
1905

    
1906
  def BuildHooksEnv(self):
1907
    """Build hooks env.
1908

1909
    This runs on master, primary and secondary nodes of the instance.
1910

1911
    """
1912
    env = {
1913
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1914
      "REBOOT_TYPE": self.op.reboot_type,
1915
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
1916
      }
1917

    
1918
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1919

    
1920
    return env
1921

    
1922
  def BuildHooksNodes(self):
1923
    """Build hooks nodes.
1924

1925
    """
1926
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1927
    return (nl, nl)
1928

    
1929
  def CheckPrereq(self):
1930
    """Check prerequisites.
1931

1932
    This checks that the instance is in the cluster.
1933

1934
    """
1935
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1936
    assert self.instance is not None, \
1937
      "Cannot retrieve locked instance %s" % self.op.instance_name
1938
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
1939
    _CheckNodeOnline(self, instance.primary_node)
1940

    
1941
    # check bridges existence
1942
    _CheckInstanceBridgesExist(self, instance)
1943

    
1944
  def Exec(self, feedback_fn):
1945
    """Reboot the instance.
1946

1947
    """
1948
    instance = self.instance
1949
    ignore_secondaries = self.op.ignore_secondaries
1950
    reboot_type = self.op.reboot_type
1951
    reason = self.op.reason
1952

    
1953
    remote_info = self.rpc.call_instance_info(instance.primary_node,
1954
                                              instance.name,
1955
                                              instance.hypervisor)
1956
    remote_info.Raise("Error checking node %s" % instance.primary_node)
1957
    instance_running = bool(remote_info.payload)
1958

    
1959
    node_current = instance.primary_node
1960

    
1961
    if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
1962
                                            constants.INSTANCE_REBOOT_HARD]:
1963
      for disk in instance.disks:
1964
        self.cfg.SetDiskID(disk, node_current)
1965
      result = self.rpc.call_instance_reboot(node_current, instance,
1966
                                             reboot_type,
1967
                                             self.op.shutdown_timeout, reason)
1968
      result.Raise("Could not reboot instance")
1969
    else:
1970
      if instance_running:
1971
        result = self.rpc.call_instance_shutdown(node_current, instance,
1972
                                                 self.op.shutdown_timeout,
1973
                                                 reason)
1974
        result.Raise("Could not shutdown instance for full reboot")
1975
        _ShutdownInstanceDisks(self, instance)
1976
      else:
1977
        self.LogInfo("Instance %s was already stopped, starting now",
1978
                     instance.name)
1979
      _StartInstanceDisks(self, instance, ignore_secondaries)
1980
      result = self.rpc.call_instance_start(node_current,
1981
                                            (instance, None, None), False,
1982
                                             reason)
1983
      msg = result.fail_msg
1984
      if msg:
1985
        _ShutdownInstanceDisks(self, instance)
1986
        raise errors.OpExecError("Could not start instance for"
1987
                                 " full reboot: %s" % msg)
1988

    
1989
    self.cfg.MarkInstanceUp(instance.name)
1990

    
1991

    
1992
class LUInstanceShutdown(LogicalUnit):
1993
  """Shutdown an instance.
1994

1995
  """
1996
  HPATH = "instance-stop"
1997
  HTYPE = constants.HTYPE_INSTANCE
1998
  REQ_BGL = False
1999

    
2000
  def ExpandNames(self):
2001
    self._ExpandAndLockInstance()
2002

    
2003
  def BuildHooksEnv(self):
2004
    """Build hooks env.
2005

2006
    This runs on master, primary and secondary nodes of the instance.
2007

2008
    """
2009
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2010
    env["TIMEOUT"] = self.op.timeout
2011
    return env
2012

    
2013
  def BuildHooksNodes(self):
2014
    """Build hooks nodes.
2015

2016
    """
2017
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2018
    return (nl, nl)
2019

    
2020
  def CheckPrereq(self):
2021
    """Check prerequisites.
2022

2023
    This checks that the instance is in the cluster.
2024

2025
    """
2026
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2027
    assert self.instance is not None, \
2028
      "Cannot retrieve locked instance %s" % self.op.instance_name
2029

    
2030
    if not self.op.force:
2031
      _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
2032
    else:
2033
      self.LogWarning("Ignoring offline instance check")
2034

    
2035
    self.primary_offline = \
2036
      self.cfg.GetNodeInfo(self.instance.primary_node).offline
2037

    
2038
    if self.primary_offline and self.op.ignore_offline_nodes:
2039
      self.LogWarning("Ignoring offline primary node")
2040
    else:
2041
      _CheckNodeOnline(self, self.instance.primary_node)
2042

    
2043
  def Exec(self, feedback_fn):
2044
    """Shutdown the instance.
2045

2046
    """
2047
    instance = self.instance
2048
    node_current = instance.primary_node
2049
    timeout = self.op.timeout
2050
    reason = self.op.reason
2051

    
2052
    # If the instance is offline we shouldn't mark it as down, as that
2053
    # resets the offline flag.
2054
    if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
2055
      self.cfg.MarkInstanceDown(instance.name)
2056

    
2057
    if self.primary_offline:
2058
      assert self.op.ignore_offline_nodes
2059
      self.LogInfo("Primary node offline, marked instance as stopped")
2060
    else:
2061
      result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
2062
                                               reason)
2063
      msg = result.fail_msg
2064
      if msg:
2065
        self.LogWarning("Could not shutdown instance: %s", msg)
2066

    
2067
      _ShutdownInstanceDisks(self, instance)
2068

    
2069

    
2070
class LUInstanceReinstall(LogicalUnit):
2071
  """Reinstall an instance.
2072

2073
  """
2074
  HPATH = "instance-reinstall"
2075
  HTYPE = constants.HTYPE_INSTANCE
2076
  REQ_BGL = False
2077

    
2078
  def ExpandNames(self):
2079
    self._ExpandAndLockInstance()
2080

    
2081
  def BuildHooksEnv(self):
2082
    """Build hooks env.
2083

2084
    This runs on master, primary and secondary nodes of the instance.
2085

2086
    """
2087
    return _BuildInstanceHookEnvByObject(self, self.instance)
2088

    
2089
  def BuildHooksNodes(self):
2090
    """Build hooks nodes.
2091

2092
    """
2093
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2094
    return (nl, nl)
2095

    
2096
  def CheckPrereq(self):
2097
    """Check prerequisites.
2098

2099
    This checks that the instance is in the cluster and is not running.
2100

2101
    """
2102
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2103
    assert instance is not None, \
2104
      "Cannot retrieve locked instance %s" % self.op.instance_name
2105
    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
2106
                     " offline, cannot reinstall")
2107

    
2108
    if instance.disk_template == constants.DT_DISKLESS:
2109
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2110
                                 self.op.instance_name,
2111
                                 errors.ECODE_INVAL)
2112
    _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
2113

    
2114
    if self.op.os_type is not None:
2115
      # OS verification
2116
      pnode = _ExpandNodeName(self.cfg, instance.primary_node)
2117
      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
2118
      instance_os = self.op.os_type
2119
    else:
2120
      instance_os = instance.os
2121

    
2122
    nodelist = list(instance.all_nodes)
2123

    
2124
    if self.op.osparams:
2125
      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
2126
      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
2127
      self.os_inst = i_osdict # the new dict (without defaults)
2128
    else:
2129
      self.os_inst = None
2130

    
2131
    self.instance = instance
2132

    
2133
  def Exec(self, feedback_fn):
2134
    """Reinstall the instance.
2135

2136
    """
2137
    inst = self.instance
2138

    
2139
    if self.op.os_type is not None:
2140
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2141
      inst.os = self.op.os_type
2142
      # Write to configuration
2143
      self.cfg.Update(inst, feedback_fn)
2144

    
2145
    _StartInstanceDisks(self, inst, None)
2146
    try:
2147
      feedback_fn("Running the instance OS create scripts...")
2148
      # FIXME: pass debug option from opcode to backend
2149
      result = self.rpc.call_instance_os_add(inst.primary_node,
2150
                                             (inst, self.os_inst), True,
2151
                                             self.op.debug_level)
2152
      result.Raise("Could not install OS for instance %s on node %s" %
2153
                   (inst.name, inst.primary_node))
2154
    finally:
2155
      _ShutdownInstanceDisks(self, inst)
2156

    
2157

    
2158
class LUInstanceRecreateDisks(LogicalUnit):
2159
  """Recreate an instance's missing disks.
2160

2161
  """
2162
  HPATH = "instance-recreate-disks"
2163
  HTYPE = constants.HTYPE_INSTANCE
2164
  REQ_BGL = False
2165

    
2166
  _MODIFYABLE = compat.UniqueFrozenset([
2167
    constants.IDISK_SIZE,
2168
    constants.IDISK_MODE,
2169
    ])
2170

    
2171
  # New or changed disk parameters may have different semantics
2172
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
2173
    constants.IDISK_ADOPT,
2174

    
2175
    # TODO: Implement support changing VG while recreating
2176
    constants.IDISK_VG,
2177
    constants.IDISK_METAVG,
2178
    constants.IDISK_PROVIDER,
2179
    constants.IDISK_NAME,
2180
    ]))
2181

    
2182
  def _RunAllocator(self):
2183
    """Run the allocator based on input opcode.
2184

2185
    """
2186
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
2187

    
2188
    # FIXME
2189
    # The allocator should actually run in "relocate" mode, but current
2190
    # allocators don't support relocating all the nodes of an instance at
2191
    # the same time. As a workaround we use "allocate" mode, but this is
2192
    # suboptimal for two reasons:
2193
    # - The instance name passed to the allocator is present in the list of
2194
    #   existing instances, so there could be a conflict within the
2195
    #   internal structures of the allocator. This doesn't happen with the
2196
    #   current allocators, but it's a liability.
2197
    # - The allocator counts the resources used by the instance twice: once
2198
    #   because the instance exists already, and once because it tries to
2199
    #   allocate a new instance.
2200
    # The allocator could choose some of the nodes on which the instance is
2201
    # running, but that's not a problem. If the instance nodes are broken,
2202
    # they should be already be marked as drained or offline, and hence
2203
    # skipped by the allocator. If instance disks have been lost for other
2204
    # reasons, then recreating the disks on the same nodes should be fine.
2205
    disk_template = self.instance.disk_template
2206
    spindle_use = be_full[constants.BE_SPINDLE_USE]
2207
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
2208
                                        disk_template=disk_template,
2209
                                        tags=list(self.instance.GetTags()),
2210
                                        os=self.instance.os,
2211
                                        nics=[{}],
2212
                                        vcpus=be_full[constants.BE_VCPUS],
2213
                                        memory=be_full[constants.BE_MAXMEM],
2214
                                        spindle_use=spindle_use,
2215
                                        disks=[{constants.IDISK_SIZE: d.size,
2216
                                                constants.IDISK_MODE: d.mode}
2217
                                                for d in self.instance.disks],
2218
                                        hypervisor=self.instance.hypervisor,
2219
                                        node_whitelist=None)
2220
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
2221

    
2222
    ial.Run(self.op.iallocator)
2223

    
2224
    assert req.RequiredNodes() == len(self.instance.all_nodes)
2225

    
2226
    if not ial.success:
2227
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
2228
                                 " %s" % (self.op.iallocator, ial.info),
2229
                                 errors.ECODE_NORES)
2230

    
2231
    self.op.nodes = ial.result
2232
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
2233
                 self.op.instance_name, self.op.iallocator,
2234
                 utils.CommaJoin(ial.result))
2235

    
2236
  def CheckArguments(self):
2237
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
2238
      # Normalize and convert deprecated list of disk indices
2239
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
2240

    
2241
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
2242
    if duplicates:
2243
      raise errors.OpPrereqError("Some disks have been specified more than"
2244
                                 " once: %s" % utils.CommaJoin(duplicates),
2245
                                 errors.ECODE_INVAL)
2246

    
2247
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
2248
    # when neither iallocator nor nodes are specified
2249
    if self.op.iallocator or self.op.nodes:
2250
      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
2251

    
2252
    for (idx, params) in self.op.disks:
2253
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
2254
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
2255
      if unsupported:
2256
        raise errors.OpPrereqError("Parameters for disk %s try to change"
2257
                                   " unmodifyable parameter(s): %s" %
2258
                                   (idx, utils.CommaJoin(unsupported)),
2259
                                   errors.ECODE_INVAL)
2260

    
2261
  def ExpandNames(self):
2262
    self._ExpandAndLockInstance()
2263
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2264

    
2265
    if self.op.nodes:
2266
      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
2267
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
2268
    else:
2269
      self.needed_locks[locking.LEVEL_NODE] = []
2270
      if self.op.iallocator:
2271
        # iallocator will select a new node in the same group
2272
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
2273
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2274

    
2275
    self.needed_locks[locking.LEVEL_NODE_RES] = []
2276

    
2277
  def DeclareLocks(self, level):
2278
    if level == locking.LEVEL_NODEGROUP:
2279
      assert self.op.iallocator is not None
2280
      assert not self.op.nodes
2281
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2282
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
2283
      # Lock the primary group used by the instance optimistically; this
2284
      # requires going via the node before it's locked, requiring
2285
      # verification later on
2286
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2287
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
2288

    
2289
    elif level == locking.LEVEL_NODE:
2290
      # If an allocator is used, then we lock all the nodes in the current
2291
      # instance group, as we don't know yet which ones will be selected;
2292
      # if we replace the nodes without using an allocator, locks are
2293
      # already declared in ExpandNames; otherwise, we need to lock all the
2294
      # instance nodes for disk re-creation
2295
      if self.op.iallocator:
2296
        assert not self.op.nodes
2297
        assert not self.needed_locks[locking.LEVEL_NODE]
2298
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
2299

    
2300
        # Lock member nodes of the group of the primary node
2301
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
2302
          self.needed_locks[locking.LEVEL_NODE].extend(
2303
            self.cfg.GetNodeGroup(group_uuid).members)
2304

    
2305
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
2306
      elif not self.op.nodes:
2307
        self._LockInstancesNodes(primary_only=False)
2308
    elif level == locking.LEVEL_NODE_RES:
2309
      # Copy node locks
2310
      self.needed_locks[locking.LEVEL_NODE_RES] = \
2311
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2312

    
2313
  def BuildHooksEnv(self):
2314
    """Build hooks env.
2315

2316
    This runs on master, primary and secondary nodes of the instance.
2317

2318
    """
2319
    return _BuildInstanceHookEnvByObject(self, self.instance)
2320

    
2321
  def BuildHooksNodes(self):
2322
    """Build hooks nodes.
2323

2324
    """
2325
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2326
    return (nl, nl)
2327

    
2328
  def CheckPrereq(self):
2329
    """Check prerequisites.
2330

2331
    This checks that the instance is in the cluster and is not running.
2332

2333
    """
2334
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2335
    assert instance is not None, \
2336
      "Cannot retrieve locked instance %s" % self.op.instance_name
2337
    if self.op.nodes:
2338
      if len(self.op.nodes) != len(instance.all_nodes):
2339
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
2340
                                   " %d replacement nodes were specified" %
2341
                                   (instance.name, len(instance.all_nodes),
2342
                                    len(self.op.nodes)),
2343
                                   errors.ECODE_INVAL)
2344
      assert instance.disk_template != constants.DT_DRBD8 or \
2345
          len(self.op.nodes) == 2
2346
      assert instance.disk_template != constants.DT_PLAIN or \
2347
          len(self.op.nodes) == 1
2348
      primary_node = self.op.nodes[0]
2349
    else:
2350
      primary_node = instance.primary_node
2351
    if not self.op.iallocator:
2352
      _CheckNodeOnline(self, primary_node)
2353

    
2354
    if instance.disk_template == constants.DT_DISKLESS:
2355
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2356
                                 self.op.instance_name, errors.ECODE_INVAL)
2357

    
2358
    # Verify if node group locks are still correct
2359
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
2360
    if owned_groups:
2361
      # Node group locks are acquired only for the primary node (and only
2362
      # when the allocator is used)
2363
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
2364
                               primary_only=True)
2365

    
2366
    # if we replace nodes *and* the old primary is offline, we don't
2367
    # check the instance state
2368
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
2369
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
2370
      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
2371
                          msg="cannot recreate disks")
2372

    
2373
    if self.op.disks:
2374
      self.disks = dict(self.op.disks)
2375
    else:
2376
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
2377

    
2378
    maxidx = max(self.disks.keys())
2379
    if maxidx >= len(instance.disks):
2380
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
2381
                                 errors.ECODE_INVAL)
2382

    
2383
    if ((self.op.nodes or self.op.iallocator) and
2384
        sorted(self.disks.keys()) != range(len(instance.disks))):
2385
      raise errors.OpPrereqError("Can't recreate disks partially and"
2386
                                 " change the nodes at the same time",
2387
                                 errors.ECODE_INVAL)
2388

    
2389
    self.instance = instance
2390

    
2391
    if self.op.iallocator:
2392
      self._RunAllocator()
2393
      # Release unneeded node and node resource locks
2394
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
2395
      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
2396
      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
2397

    
2398
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2399

    
2400
  def Exec(self, feedback_fn):
2401
    """Recreate the disks.
2402

2403
    """
2404
    instance = self.instance
2405

    
2406
    assert (self.owned_locks(locking.LEVEL_NODE) ==
2407
            self.owned_locks(locking.LEVEL_NODE_RES))
2408

    
2409
    to_skip = []
2410
    mods = [] # keeps track of needed changes
2411

    
2412
    for idx, disk in enumerate(instance.disks):
2413
      try:
2414
        changes = self.disks[idx]
2415
      except KeyError:
2416
        # Disk should not be recreated
2417
        to_skip.append(idx)
2418
        continue
2419

    
2420
      # update secondaries for disks, if needed
2421
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
2422
        # need to update the nodes and minors
2423
        assert len(self.op.nodes) == 2
2424
        assert len(disk.logical_id) == 6 # otherwise disk internals
2425
                                         # have changed
2426
        (_, _, old_port, _, _, old_secret) = disk.logical_id
2427
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
2428
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
2429
                  new_minors[0], new_minors[1], old_secret)
2430
        assert len(disk.logical_id) == len(new_id)
2431
      else:
2432
        new_id = None
2433

    
2434
      mods.append((idx, new_id, changes))
2435

    
2436
    # now that we have passed all asserts above, we can apply the mods
2437
    # in a single run (to avoid partial changes)
2438
    for idx, new_id, changes in mods:
2439
      disk = instance.disks[idx]
2440
      if new_id is not None:
2441
        assert disk.dev_type == constants.LD_DRBD8
2442
        disk.logical_id = new_id
2443
      if changes:
2444
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
2445
                    mode=changes.get(constants.IDISK_MODE, None))
2446

    
2447
    # change primary node, if needed
2448
    if self.op.nodes:
2449
      instance.primary_node = self.op.nodes[0]
2450
      self.LogWarning("Changing the instance's nodes, you will have to"
2451
                      " remove any disks left on the older nodes manually")
2452

    
2453
    if self.op.nodes:
2454
      self.cfg.Update(instance, feedback_fn)
2455

    
2456
    # All touched nodes must be locked
2457
    mylocks = self.owned_locks(locking.LEVEL_NODE)
2458
    assert mylocks.issuperset(frozenset(instance.all_nodes))
2459
    _CreateDisks(self, instance, to_skip=to_skip)
2460

    
2461

    
2462
class LUInstanceRename(LogicalUnit):
2463
  """Rename an instance.
2464

2465
  """
2466
  HPATH = "instance-rename"
2467
  HTYPE = constants.HTYPE_INSTANCE
2468

    
2469
  def CheckArguments(self):
2470
    """Check arguments.
2471

2472
    """
2473
    if self.op.ip_check and not self.op.name_check:
2474
      # TODO: make the ip check more flexible and not depend on the name check
2475
      raise errors.OpPrereqError("IP address check requires a name check",
2476
                                 errors.ECODE_INVAL)
2477

    
2478
  def BuildHooksEnv(self):
2479
    """Build hooks env.
2480

2481
    This runs on master, primary and secondary nodes of the instance.
2482

2483
    """
2484
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2485
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2486
    return env
2487

    
2488
  def BuildHooksNodes(self):
2489
    """Build hooks nodes.
2490

2491
    """
2492
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2493
    return (nl, nl)
2494

    
2495
  def CheckPrereq(self):
2496
    """Check prerequisites.
2497

2498
    This checks that the instance is in the cluster and is not running.
2499

2500
    """
2501
    self.op.instance_name = _ExpandInstanceName(self.cfg,
2502
                                                self.op.instance_name)
2503
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2504
    assert instance is not None
2505
    _CheckNodeOnline(self, instance.primary_node)
2506
    _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
2507
                        msg="cannot rename")
2508
    self.instance = instance
2509

    
2510
    new_name = self.op.new_name
2511
    if self.op.name_check:
2512
      hostname = _CheckHostnameSane(self, new_name)
2513
      new_name = self.op.new_name = hostname.name
2514
      if (self.op.ip_check and
2515
          netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
2516
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2517
                                   (hostname.ip, new_name),
2518
                                   errors.ECODE_NOTUNIQUE)
2519

    
2520
    instance_list = self.cfg.GetInstanceList()
2521
    if new_name in instance_list and new_name != instance.name:
2522
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2523
                                 new_name, errors.ECODE_EXISTS)
2524

    
2525
  def Exec(self, feedback_fn):
2526
    """Rename the instance.
2527

2528
    """
2529
    inst = self.instance
2530
    old_name = inst.name
2531

    
2532
    rename_file_storage = False
2533
    if (inst.disk_template in constants.DTS_FILEBASED and
2534
        self.op.new_name != inst.name):
2535
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2536
      rename_file_storage = True
2537

    
2538
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2539
    # Change the instance lock. This is definitely safe while we hold the BGL.
2540
    # Otherwise the new lock would have to be added in acquired mode.
2541
    assert self.REQ_BGL
2542
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
2543
    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
2544
    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2545

    
2546
    # re-read the instance from the configuration after rename
2547
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2548

    
2549
    if rename_file_storage:
2550
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2551
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2552
                                                     old_file_storage_dir,
2553
                                                     new_file_storage_dir)
2554
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
2555
                   " (but the instance has been renamed in Ganeti)" %
2556
                   (inst.primary_node, old_file_storage_dir,
2557
                    new_file_storage_dir))
2558

    
2559
    _StartInstanceDisks(self, inst, None)
2560
    # update info on disks
2561
    info = _GetInstanceInfoText(inst)
2562
    for (idx, disk) in enumerate(inst.disks):
2563
      for node in inst.all_nodes:
2564
        self.cfg.SetDiskID(disk, node)
2565
        result = self.rpc.call_blockdev_setinfo(node, disk, info)
2566
        if result.fail_msg:
2567
          self.LogWarning("Error setting info on node %s for disk %s: %s",
2568
                          node, idx, result.fail_msg)
2569
    try:
2570
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2571
                                                 old_name, self.op.debug_level)
2572
      msg = result.fail_msg
2573
      if msg:
2574
        msg = ("Could not run OS rename script for instance %s on node %s"
2575
               " (but the instance has been renamed in Ganeti): %s" %
2576
               (inst.name, inst.primary_node, msg))
2577
        self.LogWarning(msg)
2578
    finally:
2579
      _ShutdownInstanceDisks(self, inst)
2580

    
2581
    return inst.name
2582

    
2583

    
2584
class LUInstanceRemove(LogicalUnit):
2585
  """Remove an instance.
2586

2587
  """
2588
  HPATH = "instance-remove"
2589
  HTYPE = constants.HTYPE_INSTANCE
2590
  REQ_BGL = False
2591

    
2592
  def ExpandNames(self):
2593
    self._ExpandAndLockInstance()
2594
    self.needed_locks[locking.LEVEL_NODE] = []
2595
    self.needed_locks[locking.LEVEL_NODE_RES] = []
2596
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2597

    
2598
  def DeclareLocks(self, level):
2599
    if level == locking.LEVEL_NODE:
2600
      self._LockInstancesNodes()
2601
    elif level == locking.LEVEL_NODE_RES:
2602
      # Copy node locks
2603
      self.needed_locks[locking.LEVEL_NODE_RES] = \
2604
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2605

    
2606
  def BuildHooksEnv(self):
2607
    """Build hooks env.
2608

2609
    This runs on master, primary and secondary nodes of the instance.
2610

2611
    """
2612
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2613
    env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
2614
    return env
2615

    
2616
  def BuildHooksNodes(self):
2617
    """Build hooks nodes.
2618

2619
    """
2620
    nl = [self.cfg.GetMasterNode()]
2621
    nl_post = list(self.instance.all_nodes) + nl
2622
    return (nl, nl_post)
2623

    
2624
  def CheckPrereq(self):
2625
    """Check prerequisites.
2626

2627
    This checks that the instance is in the cluster.
2628

2629
    """
2630
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2631
    assert self.instance is not None, \
2632
      "Cannot retrieve locked instance %s" % self.op.instance_name
2633

    
2634
  def Exec(self, feedback_fn):
2635
    """Remove the instance.
2636

2637
    """
2638
    instance = self.instance
2639
    logging.info("Shutting down instance %s on node %s",
2640
                 instance.name, instance.primary_node)
2641

    
2642
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
2643
                                             self.op.shutdown_timeout,
2644
                                             self.op.reason)
2645
    msg = result.fail_msg
2646
    if msg:
2647
      if self.op.ignore_failures:
2648
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
2649
      else:
2650
        raise errors.OpExecError("Could not shutdown instance %s on"
2651
                                 " node %s: %s" %
2652
                                 (instance.name, instance.primary_node, msg))
2653

    
2654
    assert (self.owned_locks(locking.LEVEL_NODE) ==
2655
            self.owned_locks(locking.LEVEL_NODE_RES))
2656
    assert not (set(instance.all_nodes) -
2657
                self.owned_locks(locking.LEVEL_NODE)), \
2658
      "Not owning correct locks"
2659

    
2660
    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
2661

    
2662

    
2663
def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
2664
  """Utility function to remove an instance.
2665

2666
  """
2667
  logging.info("Removing block devices for instance %s", instance.name)
2668

    
2669
  if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
2670
    if not ignore_failures:
2671
      raise errors.OpExecError("Can't remove instance's disks")
2672
    feedback_fn("Warning: can't remove instance's disks")
2673

    
2674
  logging.info("Removing instance %s out of cluster config", instance.name)
2675

    
2676
  lu.cfg.RemoveInstance(instance.name)
2677

    
2678
  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
2679
    "Instance lock removal conflict"
2680

    
2681
  # Remove lock for the instance
2682
  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2683

    
2684

    
2685
class LUInstanceQuery(NoHooksLU):
2686
  """Logical unit for querying instances.
2687

2688
  """
2689
  # pylint: disable=W0142
2690
  REQ_BGL = False
2691

    
2692
  def CheckArguments(self):
2693
    self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2694
                             self.op.output_fields, self.op.use_locking)
2695

    
2696
  def ExpandNames(self):
2697
    self.iq.ExpandNames(self)
2698

    
2699
  def DeclareLocks(self, level):
2700
    self.iq.DeclareLocks(self, level)
2701

    
2702
  def Exec(self, feedback_fn):
2703
    return self.iq.OldStyleQuery(self)
2704

    
2705

    
2706
def _ExpandNamesForMigration(lu):
2707
  """Expands names for use with L{TLMigrateInstance}.
2708

2709
  @type lu: L{LogicalUnit}
2710

2711
  """
2712
  if lu.op.target_node is not None:
2713
    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2714

    
2715
  lu.needed_locks[locking.LEVEL_NODE] = []
2716
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2717

    
2718
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
2719
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2720

    
2721
  # The node allocation lock is actually only needed for externally replicated
2722
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2723
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2724

    
2725

    
2726
def _DeclareLocksForMigration(lu, level):
2727
  """Declares locks for L{TLMigrateInstance}.
2728

2729
  @type lu: L{LogicalUnit}
2730
  @param level: Lock level
2731

2732
  """
2733
  if level == locking.LEVEL_NODE_ALLOC:
2734
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2735

    
2736
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2737

    
2738
    # Node locks are already declared here rather than at LEVEL_NODE as we need
2739
    # the instance object anyway to declare the node allocation lock.
2740
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2741
      if lu.op.target_node is None:
2742
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2743
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2744
      else:
2745
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2746
                                               lu.op.target_node]
2747
      del lu.recalculate_locks[locking.LEVEL_NODE]
2748
    else:
2749
      lu._LockInstancesNodes() # pylint: disable=W0212
2750

    
2751
  elif level == locking.LEVEL_NODE:
2752
    # Node locks are declared together with the node allocation lock
2753
    assert (lu.needed_locks[locking.LEVEL_NODE] or
2754
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2755

    
2756
  elif level == locking.LEVEL_NODE_RES:
2757
    # Copy node locks
2758
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
2759
      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2760

    
2761

    
2762
class LUInstanceFailover(LogicalUnit):
2763
  """Failover an instance.
2764

2765
  """
2766
  HPATH = "instance-failover"
2767
  HTYPE = constants.HTYPE_INSTANCE
2768
  REQ_BGL = False
2769

    
2770
  def CheckArguments(self):
2771
    """Check the arguments.
2772

2773
    """
2774
    self.iallocator = getattr(self.op, "iallocator", None)
2775
    self.target_node = getattr(self.op, "target_node", None)
2776

    
2777
  def ExpandNames(self):
2778
    self._ExpandAndLockInstance()
2779
    _ExpandNamesForMigration(self)
2780

    
2781
    self._migrater = \
2782
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
2783
                        self.op.ignore_consistency, True,
2784
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
2785

    
2786
    self.tasklets = [self._migrater]
2787

    
2788
  def DeclareLocks(self, level):
2789
    _DeclareLocksForMigration(self, level)
2790

    
2791
  def BuildHooksEnv(self):
2792
    """Build hooks env.
2793

2794
    This runs on master, primary and secondary nodes of the instance.
2795

2796
    """
2797
    instance = self._migrater.instance
2798
    source_node = instance.primary_node
2799
    target_node = self.op.target_node
2800
    env = {
2801
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2802
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2803
      "OLD_PRIMARY": source_node,
2804
      "NEW_PRIMARY": target_node,
2805
      }
2806

    
2807
    if instance.disk_template in constants.DTS_INT_MIRROR:
2808
      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2809
      env["NEW_SECONDARY"] = source_node
2810
    else:
2811
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2812

    
2813
    env.update(_BuildInstanceHookEnvByObject(self, instance))
2814

    
2815
    return env
2816

    
2817
  def BuildHooksNodes(self):
2818
    """Build hooks nodes.
2819

2820
    """
2821
    instance = self._migrater.instance
2822
    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2823
    return (nl, nl + [instance.primary_node])
2824

    
2825

    
2826
class LUInstanceMigrate(LogicalUnit):
2827
  """Migrate an instance.
2828

2829
  This is migration without shutting down, compared to the failover,
2830
  which is done with shutdown.
2831

2832
  """
2833
  HPATH = "instance-migrate"
2834
  HTYPE = constants.HTYPE_INSTANCE
2835
  REQ_BGL = False
2836

    
2837
  def ExpandNames(self):
2838
    self._ExpandAndLockInstance()
2839
    _ExpandNamesForMigration(self)
2840

    
2841
    self._migrater = \
2842
      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2843
                        False, self.op.allow_failover, False,
2844
                        self.op.allow_runtime_changes,
2845
                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
2846
                        self.op.ignore_ipolicy)
2847

    
2848
    self.tasklets = [self._migrater]
2849

    
2850
  def DeclareLocks(self, level):
2851
    _DeclareLocksForMigration(self, level)
2852

    
2853
  def BuildHooksEnv(self):
2854
    """Build hooks env.
2855

2856
    This runs on master, primary and secondary nodes of the instance.
2857

2858
    """
2859
    instance = self._migrater.instance
2860
    source_node = instance.primary_node
2861
    target_node = self.op.target_node
2862
    env = _BuildInstanceHookEnvByObject(self, instance)
2863
    env.update({
2864
      "MIGRATE_LIVE": self._migrater.live,
2865
      "MIGRATE_CLEANUP": self.op.cleanup,
2866
      "OLD_PRIMARY": source_node,
2867
      "NEW_PRIMARY": target_node,
2868
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2869
      })
2870

    
2871
    if instance.disk_template in constants.DTS_INT_MIRROR:
2872
      env["OLD_SECONDARY"] = target_node
2873
      env["NEW_SECONDARY"] = source_node
2874
    else:
2875
      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2876

    
2877
    return env
2878

    
2879
  def BuildHooksNodes(self):
2880
    """Build hooks nodes.
2881

2882
    """
2883
    instance = self._migrater.instance
2884
    snodes = list(instance.secondary_nodes)
2885
    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2886
    return (nl, nl)
2887

    
2888

    
2889
class LUInstanceMove(LogicalUnit):
2890
  """Move an instance by data-copying.
2891

2892
  """
2893
  HPATH = "instance-move"
2894
  HTYPE = constants.HTYPE_INSTANCE
2895
  REQ_BGL = False
2896

    
2897
  def ExpandNames(self):
2898
    self._ExpandAndLockInstance()
2899
    target_node = _ExpandNodeName(self.cfg, self.op.target_node)
2900
    self.op.target_node = target_node
2901
    self.needed_locks[locking.LEVEL_NODE] = [target_node]
2902
    self.needed_locks[locking.LEVEL_NODE_RES] = []
2903
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2904

    
2905
  def DeclareLocks(self, level):
2906
    if level == locking.LEVEL_NODE:
2907
      self._LockInstancesNodes(primary_only=True)
2908
    elif level == locking.LEVEL_NODE_RES:
2909
      # Copy node locks
2910
      self.needed_locks[locking.LEVEL_NODE_RES] = \
2911
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2912

    
2913
  def BuildHooksEnv(self):
2914
    """Build hooks env.
2915

2916
    This runs on master, primary and secondary nodes of the instance.
2917

2918
    """
2919
    env = {
2920
      "TARGET_NODE": self.op.target_node,
2921
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2922
      }
2923
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2924
    return env
2925

    
2926
  def BuildHooksNodes(self):
2927
    """Build hooks nodes.
2928

2929
    """
2930
    nl = [
2931
      self.cfg.GetMasterNode(),
2932
      self.instance.primary_node,
2933
      self.op.target_node,
2934
      ]
2935
    return (nl, nl)
2936

    
2937
  def CheckPrereq(self):
2938
    """Check prerequisites.
2939

2940
    This checks that the instance is in the cluster.
2941

2942
    """
2943
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2944
    assert self.instance is not None, \
2945
      "Cannot retrieve locked instance %s" % self.op.instance_name
2946

    
2947
    if instance.disk_template not in constants.DTS_COPYABLE:
2948
      raise errors.OpPrereqError("Disk template %s not suitable for copying" %
2949
                                 instance.disk_template, errors.ECODE_STATE)
2950

    
2951
    node = self.cfg.GetNodeInfo(self.op.target_node)
2952
    assert node is not None, \
2953
      "Cannot retrieve locked node %s" % self.op.target_node
2954

    
2955
    self.target_node = target_node = node.name
2956

    
2957
    if target_node == instance.primary_node:
2958
      raise errors.OpPrereqError("Instance %s is already on the node %s" %
2959
                                 (instance.name, target_node),
2960
                                 errors.ECODE_STATE)
2961

    
2962
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2963

    
2964
    for idx, dsk in enumerate(instance.disks):
2965
      if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
2966
        raise errors.OpPrereqError("Instance disk %d has a complex layout,"
2967
                                   " cannot copy" % idx, errors.ECODE_STATE)
2968

    
2969
    _CheckNodeOnline(self, target_node)
2970
    _CheckNodeNotDrained(self, target_node)
2971
    _CheckNodeVmCapable(self, target_node)
2972
    cluster = self.cfg.GetClusterInfo()
2973
    group_info = self.cfg.GetNodeGroup(node.group)
2974
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
2975
    _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
2976
                            ignore=self.op.ignore_ipolicy)
2977

    
2978
    if instance.admin_state == constants.ADMINST_UP:
2979
      # check memory requirements on the secondary node
2980
      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2981
                           instance.name, bep[constants.BE_MAXMEM],
2982
                           instance.hypervisor)
2983
    else:
2984
      self.LogInfo("Not checking memory on the secondary node as"
2985
                   " instance will not be started")
2986

    
2987
    # check bridge existance
2988
    _CheckInstanceBridgesExist(self, instance, node=target_node)
2989

    
2990
  def Exec(self, feedback_fn):
2991
    """Move an instance.
2992

2993
    The move is done by shutting it down on its present node, copying
2994
    the data over (slow) and starting it on the new node.
2995

2996
    """
2997
    instance = self.instance
2998

    
2999
    source_node = instance.primary_node
3000
    target_node = self.target_node
3001

    
3002
    self.LogInfo("Shutting down instance %s on source node %s",
3003
                 instance.name, source_node)
3004

    
3005
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3006
            self.owned_locks(locking.LEVEL_NODE_RES))
3007

    
3008
    result = self.rpc.call_instance_shutdown(source_node, instance,
3009
                                             self.op.shutdown_timeout,
3010
                                             self.op.reason)
3011
    msg = result.fail_msg
3012
    if msg:
3013
      if self.op.ignore_consistency:
3014
        self.LogWarning("Could not shutdown instance %s on node %s."
3015
                        " Proceeding anyway. Please make sure node"
3016
                        " %s is down. Error details: %s",
3017
                        instance.name, source_node, source_node, msg)
3018
      else:
3019
        raise errors.OpExecError("Could not shutdown instance %s on"
3020
                                 " node %s: %s" %
3021
                                 (instance.name, source_node, msg))
3022

    
3023
    # create the target disks
3024
    try:
3025
      _CreateDisks(self, instance, target_node=target_node)
3026
    except errors.OpExecError:
3027
      self.LogWarning("Device creation failed")
3028
      self.cfg.ReleaseDRBDMinors(instance.name)
3029
      raise
3030

    
3031
    cluster_name = self.cfg.GetClusterInfo().cluster_name
3032

    
3033
    errs = []
3034
    # activate, get path, copy the data over
3035
    for idx, disk in enumerate(instance.disks):
3036
      self.LogInfo("Copying data for disk %d", idx)
3037
      result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
3038
                                               instance.name, True, idx)
3039
      if result.fail_msg:
3040
        self.LogWarning("Can't assemble newly created disk %d: %s",
3041
                        idx, result.fail_msg)
3042
        errs.append(result.fail_msg)
3043
        break
3044
      dev_path = result.payload
3045
      result = self.rpc.call_blockdev_export(source_node, (disk, instance),
3046
                                             target_node, dev_path,
3047
                                             cluster_name)
3048
      if result.fail_msg:
3049
        self.LogWarning("Can't copy data over for disk %d: %s",
3050
                        idx, result.fail_msg)
3051
        errs.append(result.fail_msg)
3052
        break
3053

    
3054
    if errs:
3055
      self.LogWarning("Some disks failed to copy, aborting")
3056
      try:
3057
        _RemoveDisks(self, instance, target_node=target_node)
3058
      finally:
3059
        self.cfg.ReleaseDRBDMinors(instance.name)
3060
        raise errors.OpExecError("Errors during disk copy: %s" %
3061
                                 (",".join(errs),))
3062

    
3063
    instance.primary_node = target_node
3064
    self.cfg.Update(instance, feedback_fn)
3065

    
3066
    self.LogInfo("Removing the disks on the original node")
3067
    _RemoveDisks(self, instance, target_node=source_node)
3068

    
3069
    # Only start the instance if it's marked as up
3070
    if instance.admin_state == constants.ADMINST_UP:
3071
      self.LogInfo("Starting instance %s on node %s",
3072
                   instance.name, target_node)
3073

    
3074
      disks_ok, _ = _AssembleInstanceDisks(self, instance,
3075
                                           ignore_secondaries=True)
3076
      if not disks_ok:
3077
        _ShutdownInstanceDisks(self, instance)
3078
        raise errors.OpExecError("Can't activate the instance's disks")
3079

    
3080
      result = self.rpc.call_instance_start(target_node,
3081
                                            (instance, None, None), False,
3082
                                             self.op.reason)
3083
      msg = result.fail_msg
3084
      if msg:
3085
        _ShutdownInstanceDisks(self, instance)
3086
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3087
                                 (instance.name, target_node, msg))
3088

    
3089

    
3090
class TLMigrateInstance(Tasklet):
3091
  """Tasklet class for instance migration.
3092

3093
  @type live: boolean
3094
  @ivar live: whether the migration will be done live or non-live;
3095
      this variable is initalized only after CheckPrereq has run
3096
  @type cleanup: boolean
3097
  @ivar cleanup: Wheater we cleanup from a failed migration
3098
  @type iallocator: string
3099
  @ivar iallocator: The iallocator used to determine target_node
3100
  @type target_node: string
3101
  @ivar target_node: If given, the target_node to reallocate the instance to
3102
  @type failover: boolean
3103
  @ivar failover: Whether operation results in failover or migration
3104
  @type fallback: boolean
3105
  @ivar fallback: Whether fallback to failover is allowed if migration not
3106
                  possible
3107
  @type ignore_consistency: boolean
3108
  @ivar ignore_consistency: Wheter we should ignore consistency between source
3109
                            and target node
3110
  @type shutdown_timeout: int
3111
  @ivar shutdown_timeout: In case of failover timeout of the shutdown
3112
  @type ignore_ipolicy: bool
3113
  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
3114

3115
  """
3116

    
3117
  # Constants
3118
  _MIGRATION_POLL_INTERVAL = 1      # seconds
3119
  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
3120

    
3121
  def __init__(self, lu, instance_name, cleanup, failover, fallback,
3122
               ignore_consistency, allow_runtime_changes, shutdown_timeout,
3123
               ignore_ipolicy):
3124
    """Initializes this class.
3125

3126
    """
3127
    Tasklet.__init__(self, lu)
3128

    
3129
    # Parameters
3130
    self.instance_name = instance_name
3131
    self.cleanup = cleanup
3132
    self.live = False # will be overridden later
3133
    self.failover = failover
3134
    self.fallback = fallback
3135
    self.ignore_consistency = ignore_consistency
3136
    self.shutdown_timeout = shutdown_timeout
3137
    self.ignore_ipolicy = ignore_ipolicy
3138
    self.allow_runtime_changes = allow_runtime_changes
3139

    
3140
  def CheckPrereq(self):
3141
    """Check prerequisites.
3142

3143
    This checks that the instance is in the cluster.
3144

3145
    """
3146
    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
3147
    instance = self.cfg.GetInstanceInfo(instance_name)
3148
    assert instance is not None
3149
    self.instance = instance
3150
    cluster = self.cfg.GetClusterInfo()
3151

    
3152
    if (not self.cleanup and
3153
        not instance.admin_state == constants.ADMINST_UP and
3154
        not self.failover and self.fallback):
3155
      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
3156
                      " switching to failover")
3157
      self.failover = True
3158

    
3159
    if instance.disk_template not in constants.DTS_MIRRORED:
3160
      if self.failover:
3161
        text = "failovers"
3162
      else:
3163
        text = "migrations"
3164
      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
3165
                                 " %s" % (instance.disk_template, text),
3166
                                 errors.ECODE_STATE)
3167

    
3168
    if instance.disk_template in constants.DTS_EXT_MIRROR:
3169
      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
3170

    
3171
      if self.lu.op.iallocator:
3172
        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
3173
        self._RunAllocator()
3174
      else:
3175
        # We set set self.target_node as it is required by
3176
        # BuildHooksEnv
3177
        self.target_node = self.lu.op.target_node
3178

    
3179
      # Check that the target node is correct in terms of instance policy
3180
      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
3181
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
3182
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3183
                                                              group_info)
3184
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
3185
                              ignore=self.ignore_ipolicy)
3186

    
3187
      # self.target_node is already populated, either directly or by the
3188
      # iallocator run
3189
      target_node = self.target_node
3190
      if self.target_node == instance.primary_node:
3191
        raise errors.OpPrereqError("Cannot migrate instance %s"
3192
                                   " to its primary (%s)" %
3193
                                   (instance.name, instance.primary_node),
3194
                                   errors.ECODE_STATE)
3195

    
3196
      if len(self.lu.tasklets) == 1:
3197
        # It is safe to release locks only when we're the only tasklet
3198
        # in the LU
3199
        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
3200
                      keep=[instance.primary_node, self.target_node])
3201
        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
3202

    
3203
    else:
3204
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3205

    
3206
      secondary_nodes = instance.secondary_nodes
3207
      if not secondary_nodes:
3208
        raise errors.ConfigurationError("No secondary node but using"
3209
                                        " %s disk template" %
3210
                                        instance.disk_template)
3211
      target_node = secondary_nodes[0]
3212
      if self.lu.op.iallocator or (self.lu.op.target_node and
3213
                                   self.lu.op.target_node != target_node):
3214
        if self.failover:
3215
          text = "failed over"
3216
        else:
3217
          text = "migrated"
3218
        raise errors.OpPrereqError("Instances with disk template %s cannot"
3219
                                   " be %s to arbitrary nodes"
3220
                                   " (neither an iallocator nor a target"
3221
                                   " node can be passed)" %
3222
                                   (instance.disk_template, text),
3223
                                   errors.ECODE_INVAL)
3224
      nodeinfo = self.cfg.GetNodeInfo(target_node)
3225
      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
3226
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3227
                                                              group_info)
3228
      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
3229
                              ignore=self.ignore_ipolicy)
3230

    
3231
    i_be = cluster.FillBE(instance)
3232

    
3233
    # check memory requirements on the secondary node
3234
    if (not self.cleanup and
3235
         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
3236
      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
3237
                                               "migrating instance %s" %
3238
                                               instance.name,
3239
                                               i_be[constants.BE_MINMEM],
3240
                                               instance.hypervisor)
3241
    else:
3242
      self.lu.LogInfo("Not checking memory on the secondary node as"
3243
                      " instance will not be started")
3244

    
3245
    # check if failover must be forced instead of migration
3246
    if (not self.cleanup and not self.failover and
3247
        i_be[constants.BE_ALWAYS_FAILOVER]):
3248
      self.lu.LogInfo("Instance configured to always failover; fallback"
3249
                      " to failover")
3250
      self.failover = True
3251

    
3252
    # check bridge existance
3253
    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
3254

    
3255
    if not self.cleanup:
3256
      _CheckNodeNotDrained(self.lu, target_node)
3257
      if not self.failover:
3258
        result = self.rpc.call_instance_migratable(instance.primary_node,
3259
                                                   instance)
3260
        if result.fail_msg and self.fallback:
3261
          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
3262
                          " failover")
3263
          self.failover = True
3264
        else:
3265
          result.Raise("Can't migrate, please use failover",
3266
                       prereq=True, ecode=errors.ECODE_STATE)
3267

    
3268
    assert not (self.failover and self.cleanup)
3269

    
3270
    if not self.failover:
3271
      if self.lu.op.live is not None and self.lu.op.mode is not None:
3272
        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
3273
                                   " parameters are accepted",
3274
                                   errors.ECODE_INVAL)
3275
      if self.lu.op.live is not None:
3276
        if self.lu.op.live:
3277
          self.lu.op.mode = constants.HT_MIGRATION_LIVE
3278
        else:
3279
          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
3280
        # reset the 'live' parameter to None so that repeated
3281
        # invocations of CheckPrereq do not raise an exception
3282
        self.lu.op.live = None
3283
      elif self.lu.op.mode is None:
3284
        # read the default value from the hypervisor
3285
        i_hv = cluster.FillHV(self.instance, skip_globals=False)
3286
        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
3287

    
3288
      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
3289
    else:
3290
      # Failover is never live
3291
      self.live = False
3292

    
3293
    if not (self.failover or self.cleanup):
3294
      remote_info = self.rpc.call_instance_info(instance.primary_node,
3295
                                                instance.name,
3296
                                                instance.hypervisor)
3297
      remote_info.Raise("Error checking instance on node %s" %
3298
                        instance.primary_node)
3299
      instance_running = bool(remote_info.payload)
3300
      if instance_running:
3301
        self.current_mem = int(remote_info.payload["memory"])
3302

    
3303
  def _RunAllocator(self):
3304
    """Run the allocator based on input opcode.
3305

3306
    """
3307
    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
3308

    
3309
    # FIXME: add a self.ignore_ipolicy option
3310
    req = iallocator.IAReqRelocate(name=self.instance_name,
3311
                                   relocate_from=[self.instance.primary_node])
3312
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3313

    
3314
    ial.Run(self.lu.op.iallocator)
3315

    
3316
    if not ial.success:
3317
      raise errors.OpPrereqError("Can't compute nodes using"
3318
                                 " iallocator '%s': %s" %
3319
                                 (self.lu.op.iallocator, ial.info),
3320
                                 errors.ECODE_NORES)
3321
    self.target_node = ial.result[0]
3322
    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3323
                    self.instance_name, self.lu.op.iallocator,
3324
                    utils.CommaJoin(ial.result))
3325

    
3326
  def _WaitUntilSync(self):
3327
    """Poll with custom rpc for disk sync.
3328

3329
    This uses our own step-based rpc call.
3330

3331
    """
3332
    self.feedback_fn("* wait until resync is done")
3333
    all_done = False
3334
    while not all_done:
3335
      all_done = True
3336
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3337
                                            self.nodes_ip,
3338
                                            (self.instance.disks,
3339
                                             self.instance))
3340
      min_percent = 100
3341
      for node, nres in result.items():
3342
        nres.Raise("Cannot resync disks on node %s" % node)
3343
        node_done, node_percent = nres.payload
3344
        all_done = all_done and node_done
3345
        if node_percent is not None:
3346
          min_percent = min(min_percent, node_percent)
3347
      if not all_done:
3348
        if min_percent < 100:
3349
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3350
        time.sleep(2)
3351

    
3352
  def _EnsureSecondary(self, node):
3353
    """Demote a node to secondary.
3354

3355
    """
3356
    self.feedback_fn("* switching node %s to secondary mode" % node)
3357

    
3358
    for dev in self.instance.disks:
3359
      self.cfg.SetDiskID(dev, node)
3360

    
3361
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3362
                                          self.instance.disks)
3363
    result.Raise("Cannot change disk to secondary on node %s" % node)
3364

    
3365
  def _GoStandalone(self):
3366
    """Disconnect from the network.
3367

3368
    """
3369
    self.feedback_fn("* changing into standalone mode")
3370
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3371
                                               self.instance.disks)
3372
    for node, nres in result.items():
3373
      nres.Raise("Cannot disconnect disks node %s" % node)
3374

    
3375
  def _GoReconnect(self, multimaster):
3376
    """Reconnect to the network.
3377

3378
    """
3379
    if multimaster:
3380
      msg = "dual-master"
3381
    else:
3382
      msg = "single-master"
3383
    self.feedback_fn("* changing disks into %s mode" % msg)
3384
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3385
                                           (self.instance.disks, self.instance),
3386
                                           self.instance.name, multimaster)
3387
    for node, nres in result.items():
3388
      nres.Raise("Cannot change disks config on node %s" % node)
3389

    
3390
  def _ExecCleanup(self):
3391
    """Try to cleanup after a failed migration.
3392

3393
    The cleanup is done by:
3394
      - check that the instance is running only on one node
3395
        (and update the config if needed)
3396
      - change disks on its secondary node to secondary
3397
      - wait until disks are fully synchronized
3398
      - disconnect from the network
3399
      - change disks into single-master mode
3400
      - wait again until disks are fully synchronized
3401

3402
    """
3403
    instance = self.instance
3404
    target_node = self.target_node
3405
    source_node = self.source_node
3406

    
3407
    # check running on only one node
3408
    self.feedback_fn("* checking where the instance actually runs"
3409
                     " (if this hangs, the hypervisor might be in"
3410
                     " a bad state)")
3411
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3412
    for node, result in ins_l.items():
3413
      result.Raise("Can't contact node %s" % node)
3414

    
3415
    runningon_source = instance.name in ins_l[source_node].payload
3416
    runningon_target = instance.name in ins_l[target_node].payload
3417

    
3418
    if runningon_source and runningon_target:
3419
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3420
                               " or the hypervisor is confused; you will have"
3421
                               " to ensure manually that it runs only on one"
3422
                               " and restart this operation")
3423

    
3424
    if not (runningon_source or runningon_target):
3425
      raise errors.OpExecError("Instance does not seem to be running at all;"
3426
                               " in this case it's safer to repair by"
3427
                               " running 'gnt-instance stop' to ensure disk"
3428
                               " shutdown, and then restarting it")
3429

    
3430
    if runningon_target:
3431
      # the migration has actually succeeded, we need to update the config
3432
      self.feedback_fn("* instance running on secondary node (%s),"
3433
                       " updating config" % target_node)
3434
      instance.primary_node = target_node
3435
      self.cfg.Update(instance, self.feedback_fn)
3436
      demoted_node = source_node
3437
    else:
3438
      self.feedback_fn("* instance confirmed to be running on its"
3439
                       " primary node (%s)" % source_node)
3440
      demoted_node = target_node
3441

    
3442
    if instance.disk_template in constants.DTS_INT_MIRROR:
3443
      self._EnsureSecondary(demoted_node)
3444
      try:
3445
        self._WaitUntilSync()
3446
      except errors.OpExecError:
3447
        # we ignore here errors, since if the device is standalone, it
3448
        # won't be able to sync
3449
        pass
3450
      self._GoStandalone()
3451
      self._GoReconnect(False)
3452
      self._WaitUntilSync()
3453

    
3454
    self.feedback_fn("* done")
3455

    
3456
  def _RevertDiskStatus(self):
3457
    """Try to revert the disk status after a failed migration.
3458

3459
    """
3460
    target_node = self.target_node
3461
    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
3462
      return
3463

    
3464
    try:
3465
      self._EnsureSecondary(target_node)
3466
      self._GoStandalone()
3467
      self._GoReconnect(False)
3468
      self._WaitUntilSync()
3469
    except errors.OpExecError, err:
3470
      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
3471
                         " please try to recover the instance manually;"
3472
                         " error '%s'" % str(err))
3473

    
3474
  def _AbortMigration(self):
3475
    """Call the hypervisor code to abort a started migration.
3476

3477
    """
3478
    instance = self.instance
3479
    target_node = self.target_node
3480
    source_node = self.source_node
3481
    migration_info = self.migration_info
3482

    
3483
    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
3484
                                                                 instance,
3485
                                                                 migration_info,
3486
                                                                 False)
3487
    abort_msg = abort_result.fail_msg
3488
    if abort_msg:
3489
      logging.error("Aborting migration failed on target node %s: %s",
3490
                    target_node, abort_msg)
3491
      # Don't raise an exception here, as we stil have to try to revert the
3492
      # disk status, even if this step failed.
3493

    
3494
    abort_result = self.rpc.call_instance_finalize_migration_src(
3495
      source_node, instance, False, self.live)
3496
    abort_msg = abort_result.fail_msg
3497
    if abort_msg:
3498
      logging.error("Aborting migration failed on source node %s: %s",
3499
                    source_node, abort_msg)
3500

    
3501
  def _ExecMigration(self):
3502
    """Migrate an instance.
3503

3504
    The migrate is done by:
3505
      - change the disks into dual-master mode
3506
      - wait until disks are fully synchronized again
3507
      - migrate the instance
3508
      - change disks on the new secondary node (the old primary) to secondary
3509
      - wait until disks are fully synchronized
3510
      - change disks into single-master mode
3511

3512
    """
3513
    instance = self.instance
3514
    target_node = self.target_node
3515
    source_node = self.source_node
3516

    
3517
    # Check for hypervisor version mismatch and warn the user.
3518
    nodeinfo = self.rpc.call_node_info([source_node, target_node],
3519
                                       None, [self.instance.hypervisor], False)
3520
    for ninfo in nodeinfo.values():
3521
      ninfo.Raise("Unable to retrieve node information from node '%s'" %
3522
                  ninfo.node)
3523
    (_, _, (src_info, )) = nodeinfo[source_node].payload
3524
    (_, _, (dst_info, )) = nodeinfo[target_node].payload
3525

    
3526
    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
3527
        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
3528
      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
3529
      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
3530
      if src_version != dst_version:
3531
        self.feedback_fn("* warning: hypervisor version mismatch between"
3532
                         " source (%s) and target (%s) node" %
3533
                         (src_version, dst_version))
3534

    
3535
    self.feedback_fn("* checking disk consistency between source and target")
3536
    for (idx, dev) in enumerate(instance.disks):
3537
      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
3538
        raise errors.OpExecError("Disk %s is degraded or not fully"
3539
                                 " synchronized on target node,"
3540
                                 " aborting migration" % idx)
3541

    
3542
    if self.current_mem > self.tgt_free_mem:
3543
      if not self.allow_runtime_changes:
3544
        raise errors.OpExecError("Memory ballooning not allowed and not enough"
3545
                                 " free memory to fit instance %s on target"
3546
                                 " node %s (have %dMB, need %dMB)" %
3547
                                 (instance.name, target_node,
3548
                                  self.tgt_free_mem, self.current_mem))
3549
      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
3550
      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
3551
                                                     instance,
3552
                                                     self.tgt_free_mem)
3553
      rpcres.Raise("Cannot modify instance runtime memory")
3554

    
3555
    # First get the migration information from the remote node
3556
    result = self.rpc.call_migration_info(source_node, instance)
3557
    msg = result.fail_msg
3558
    if msg:
3559
      log_err = ("Failed fetching source migration information from %s: %s" %
3560
                 (source_node, msg))
3561
      logging.error(log_err)
3562
      raise errors.OpExecError(log_err)
3563

    
3564
    self.migration_info = migration_info = result.payload
3565

    
3566
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
3567
      # Then switch the disks to master/master mode
3568
      self._EnsureSecondary(target_node)
3569
      self._GoStandalone()
3570
      self._GoReconnect(True)
3571
      self._WaitUntilSync()
3572

    
3573
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3574
    result = self.rpc.call_accept_instance(target_node,
3575
                                           instance,
3576
                                           migration_info,
3577
                                           self.nodes_ip[target_node])
3578

    
3579
    msg = result.fail_msg
3580
    if msg:
3581
      logging.error("Instance pre-migration failed, trying to revert"
3582
                    " disk status: %s", msg)
3583
      self.feedback_fn("Pre-migration failed, aborting")
3584
      self._AbortMigration()
3585
      self._RevertDiskStatus()
3586
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3587
                               (instance.name, msg))
3588

    
3589
    self.feedback_fn("* migrating instance to %s" % target_node)
3590
    result = self.rpc.call_instance_migrate(source_node, instance,
3591
                                            self.nodes_ip[target_node],
3592
                                            self.live)
3593
    msg = result.fail_msg
3594
    if msg:
3595
      logging.error("Instance migration failed, trying to revert"
3596
                    " disk status: %s", msg)
3597
      self.feedback_fn("Migration failed, aborting")
3598
      self._AbortMigration()
3599
      self._RevertDiskStatus()
3600
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3601
                               (instance.name, msg))
3602

    
3603
    self.feedback_fn("* starting memory transfer")
3604
    last_feedback = time.time()
3605
    while True:
3606
      result = self.rpc.call_instance_get_migration_status(source_node,
3607
                                                           instance)
3608
      msg = result.fail_msg
3609
      ms = result.payload   # MigrationStatus instance
3610
      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
3611
        logging.error("Instance migration failed, trying to revert"
3612
                      " disk status: %s", msg)
3613
        self.feedback_fn("Migration failed, aborting")
3614
        self._AbortMigration()
3615
        self._RevertDiskStatus()
3616
        if not msg:
3617
          msg = "hypervisor returned failure"
3618
        raise errors.OpExecError("Could not migrate instance %s: %s" %
3619
                                 (instance.name, msg))
3620

    
3621
      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
3622
        self.feedback_fn("* memory transfer complete")
3623
        break
3624

    
3625
      if (utils.TimeoutExpired(last_feedback,
3626
                               self._MIGRATION_FEEDBACK_INTERVAL) and
3627
          ms.transferred_ram is not None):
3628
        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
3629
        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
3630
        last_feedback = time.time()
3631

    
3632
      time.sleep(self._MIGRATION_POLL_INTERVAL)
3633

    
3634
    result = self.rpc.call_instance_finalize_migration_src(source_node,
3635
                                                           instance,
3636
                                                           True,
3637
                                                           self.live)
3638
    msg = result.fail_msg
3639
    if msg:
3640
      logging.error("Instance migration succeeded, but finalization failed"
3641
                    " on the source node: %s", msg)
3642
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3643
                               msg)
3644

    
3645
    instance.primary_node = target_node
3646

    
3647
    # distribute new instance config to the other nodes
3648
    self.cfg.Update(instance, self.feedback_fn)
3649

    
3650
    result = self.rpc.call_instance_finalize_migration_dst(target_node,
3651
                                                           instance,
3652
                                                           migration_info,
3653
                                                           True)
3654
    msg = result.fail_msg
3655
    if msg:
3656
      logging.error("Instance migration succeeded, but finalization failed"
3657
                    " on the target node: %s", msg)
3658
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3659
                               msg)
3660

    
3661
    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
3662
      self._EnsureSecondary(source_node)
3663
      self._WaitUntilSync()
3664
      self._GoStandalone()
3665
      self._GoReconnect(False)
3666
      self._WaitUntilSync()
3667

    
3668
    # If the instance's disk template is `rbd' or `ext' and there was a
3669
    # successful migration, unmap the device from the source node.
3670
    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
3671
      disks = _ExpandCheckDisks(instance, instance.disks)
3672
      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
3673
      for disk in disks:
3674
        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
3675
        msg = result.fail_msg
3676
        if msg:
3677
          logging.error("Migration was successful, but couldn't unmap the"
3678
                        " block device %s on source node %s: %s",
3679
                        disk.iv_name, source_node, msg)
3680
          logging.error("You need to unmap the device %s manually on %s",
3681
                        disk.iv_name, source_node)
3682

    
3683
    self.feedback_fn("* done")
3684

    
3685
  def _ExecFailover(self):
3686
    """Failover an instance.
3687

3688
    The failover is done by shutting it down on its present node and
3689
    starting it on the secondary.
3690

3691
    """
3692
    instance = self.instance
3693
    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
3694

    
3695
    source_node = instance.primary_node
3696
    target_node = self.target_node
3697

    
3698
    if instance.admin_state == constants.ADMINST_UP:
3699
      self.feedback_fn("* checking disk consistency between source and target")
3700
      for (idx, dev) in enumerate(instance.disks):
3701
        # for drbd, these are drbd over lvm
3702
        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
3703
                                     False):
3704
          if primary_node.offline:
3705
            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
3706
                             " target node %s" %
3707
                             (primary_node.name, idx, target_node))
3708
          elif not self.ignore_consistency:
3709
            raise errors.OpExecError("Disk %s is degraded on target node,"
3710
                                     " aborting failover" % idx)
3711
    else:
3712
      self.feedback_fn("* not checking disk consistency as instance is not"
3713
                       " running")
3714

    
3715
    self.feedback_fn("* shutting down instance on source node")
3716
    logging.info("Shutting down instance %s on node %s",
3717
                 instance.name, source_node)
3718

    
3719
    result = self.rpc.call_instance_shutdown(source_node, instance,
3720
                                             self.shutdown_timeout,
3721
                                             self.lu.op.reason)
3722
    msg = result.fail_msg
3723
    if msg:
3724
      if self.ignore_consistency or primary_node.offline:
3725
        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
3726
                           " proceeding anyway; please make sure node"
3727
                           " %s is down; error details: %s",
3728
                           instance.name, source_node, source_node, msg)
3729
      else:
3730
        raise errors.OpExecError("Could not shutdown instance %s on"
3731
                                 " node %s: %s" %
3732
                                 (instance.name, source_node, msg))
3733

    
3734
    self.feedback_fn("* deactivating the instance's disks on source node")
3735
    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
3736
      raise errors.OpExecError("Can't shut down the instance's disks")
3737

    
3738
    instance.primary_node = target_node
3739
    # distribute new instance config to the other nodes
3740
    self.cfg.Update(instance, self.feedback_fn)
3741

    
3742
    # Only start the instance if it's marked as up
3743
    if instance.admin_state == constants.ADMINST_UP:
3744
      self.feedback_fn("* activating the instance's disks on target node %s" %
3745
                       target_node)
3746
      logging.info("Starting instance %s on node %s",
3747
                   instance.name, target_node)
3748

    
3749
      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
3750
                                           ignore_secondaries=True)
3751
      if not disks_ok:
3752
        _ShutdownInstanceDisks(self.lu, instance)
3753
        raise errors.OpExecError("Can't activate the instance's disks")
3754

    
3755
      self.feedback_fn("* starting the instance on the target node %s" %
3756
                       target_node)
3757
      result = self.rpc.call_instance_start(target_node, (instance, None, None),
3758
                                            False, self.lu.op.reason)
3759
      msg = result.fail_msg
3760
      if msg:
3761
        _ShutdownInstanceDisks(self.lu, instance)
3762
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3763
                                 (instance.name, target_node, msg))
3764

    
3765
  def Exec(self, feedback_fn):
3766
    """Perform the migration.
3767

3768
    """
3769
    self.feedback_fn = feedback_fn
3770
    self.source_node = self.instance.primary_node
3771

    
3772
    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
3773
    if self.instance.disk_template in constants.DTS_INT_MIRROR:
3774
      self.target_node = self.instance.secondary_nodes[0]
3775
      # Otherwise self.target_node has been populated either
3776
      # directly, or through an iallocator.
3777

    
3778
    self.all_nodes = [self.source_node, self.target_node]
3779
    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
3780
                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
3781

    
3782
    if self.failover:
3783
      feedback_fn("Failover instance %s" % self.instance.name)
3784
      self._ExecFailover()
3785
    else:
3786
      feedback_fn("Migrating instance %s" % self.instance.name)
3787

    
3788
      if self.cleanup:
3789
        return self._ExecCleanup()
3790
      else:
3791
        return self._ExecMigration()
3792

    
3793

    
3794
def _CreateBlockDev(lu, node, instance, device, force_create, info,
3795
                    force_open):
3796
  """Wrapper around L{_CreateBlockDevInner}.
3797

3798
  This method annotates the root device first.
3799

3800
  """
3801
  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
3802
  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
3803
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
3804
                              force_open, excl_stor)
3805

    
3806

    
3807
def _CreateBlockDevInner(lu, node, instance, device, force_create,
3808
                         info, force_open, excl_stor):
3809
  """Create a tree of block devices on a given node.
3810

3811
  If this device type has to be created on secondaries, create it and
3812
  all its children.
3813

3814
  If not, just recurse to children keeping the same 'force' value.
3815

3816
  @attention: The device has to be annotated already.
3817

3818
  @param lu: the lu on whose behalf we execute
3819
  @param node: the node on which to create the device
3820
  @type instance: L{objects.Instance}
3821
  @param instance: the instance which owns the device
3822
  @type device: L{objects.Disk}
3823
  @param device: the device to create
3824
  @type force_create: boolean
3825
  @param force_create: whether to force creation of this device; this
3826
      will be change to True whenever we find a device which has
3827
      CreateOnSecondary() attribute
3828
  @param info: the extra 'metadata' we should attach to the device
3829
      (this will be represented as a LVM tag)
3830
  @type force_open: boolean
3831
  @param force_open: this parameter will be passes to the
3832
      L{backend.BlockdevCreate} function where it specifies
3833
      whether we run on primary or not, and it affects both
3834
      the child assembly and the device own Open() execution
3835
  @type excl_stor: boolean
3836
  @param excl_stor: Whether exclusive_storage is active for the node
3837

3838
  @return: list of created devices
3839
  """
3840
  created_devices = []
3841
  try:
3842
    if device.CreateOnSecondary():
3843
      force_create = True
3844

    
3845
    if device.children:
3846
      for child in device.children:
3847
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
3848
                                    info, force_open, excl_stor)
3849
        created_devices.extend(devs)
3850

    
3851
    if not force_create:
3852
      return created_devices
3853

    
3854
    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
3855
                          excl_stor)
3856
    # The device has been completely created, so there is no point in keeping
3857
    # its subdevices in the list. We just add the device itself instead.
3858
    created_devices = [(node, device)]
3859
    return created_devices
3860

    
3861
  except errors.DeviceCreationError, e:
3862
    e.created_devices.extend(created_devices)
3863
    raise e
3864
  except errors.OpExecError, e:
3865
    raise errors.DeviceCreationError(str(e), created_devices)
3866

    
3867

    
3868
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
3869
                          excl_stor):
3870
  """Create a single block device on a given node.
3871

3872
  This will not recurse over children of the device, so they must be
3873
  created in advance.
3874

3875
  @param lu: the lu on whose behalf we execute
3876
  @param node: the node on which to create the device
3877
  @type instance: L{objects.Instance}
3878
  @param instance: the instance which owns the device
3879
  @type device: L{objects.Disk}
3880
  @param device: the device to create
3881
  @param info: the extra 'metadata' we should attach to the device
3882
      (this will be represented as a LVM tag)
3883
  @type force_open: boolean
3884
  @param force_open: this parameter will be passes to the
3885
      L{backend.BlockdevCreate} function where it specifies
3886
      whether we run on primary or not, and it affects both
3887
      the child assembly and the device own Open() execution
3888
  @type excl_stor: boolean
3889
  @param excl_stor: Whether exclusive_storage is active for the node
3890

3891
  """
3892
  lu.cfg.SetDiskID(device, node)
3893
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3894
                                       instance.name, force_open, info,
3895
                                       excl_stor)
3896
  result.Raise("Can't create block device %s on"
3897
               " node %s for instance %s" % (device, node, instance.name))
3898
  if device.physical_id is None:
3899
    device.physical_id = result.payload
3900

    
3901

    
3902
def _GenerateUniqueNames(lu, exts):
3903
  """Generate a suitable LV name.
3904

3905
  This will generate a logical volume name for the given instance.
3906

3907
  """
3908
  results = []
3909
  for val in exts:
3910
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
3911
    results.append("%s%s" % (new_id, val))
3912
  return results
3913

    
3914

    
3915
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
3916
                         iv_name, p_minor, s_minor):
3917
  """Generate a drbd8 device complete with its children.
3918

3919
  """
3920
  assert len(vgnames) == len(names) == 2
3921
  port = lu.cfg.AllocatePort()
3922
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
3923

    
3924
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3925
                          logical_id=(vgnames[0], names[0]),
3926
                          params={})
3927
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
3928
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
3929
                          size=constants.DRBD_META_SIZE,
3930
                          logical_id=(vgnames[1], names[1]),
3931
                          params={})
3932
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
3933
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3934
                          logical_id=(primary, secondary, port,
3935
                                      p_minor, s_minor,
3936
                                      shared_secret),
3937
                          children=[dev_data, dev_meta],
3938
                          iv_name=iv_name, params={})
3939
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
3940
  return drbd_dev
3941

    
3942

    
3943
_DISK_TEMPLATE_NAME_PREFIX = {
3944
  constants.DT_PLAIN: "",
3945
  constants.DT_RBD: ".rbd",
3946
  constants.DT_EXT: ".ext",
3947
  }
3948

    
3949

    
3950
_DISK_TEMPLATE_DEVICE_TYPE = {
3951
  constants.DT_PLAIN: constants.LD_LV,
3952
  constants.DT_FILE: constants.LD_FILE,
3953
  constants.DT_SHARED_FILE: constants.LD_FILE,
3954
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
3955
  constants.DT_RBD: constants.LD_RBD,
3956
  constants.DT_EXT: constants.LD_EXT,
3957
  }
3958

    
3959

    
3960
def _GenerateDiskTemplate(
3961
  lu, template_name, instance_name, primary_node, secondary_nodes,
3962
  disk_info, file_storage_dir, file_driver, base_index,
3963
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
3964
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
3965
  """Generate the entire disk layout for a given template type.
3966

3967
  """
3968
  vgname = lu.cfg.GetVGName()
3969
  disk_count = len(disk_info)
3970
  disks = []
3971

    
3972
  if template_name == constants.DT_DISKLESS:
3973
    pass
3974
  elif template_name == constants.DT_DRBD8:
3975
    if len(secondary_nodes) != 1:
3976
      raise errors.ProgrammerError("Wrong template configuration")
3977
    remote_node = secondary_nodes[0]
3978
    minors = lu.cfg.AllocateDRBDMinor(
3979
      [primary_node, remote_node] * len(disk_info), instance_name)
3980

    
3981
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
3982
                                                       full_disk_params)
3983
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
3984

    
3985
    names = []
3986
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
3987
                                               for i in range(disk_count)]):
3988
      names.append(lv_prefix + "_data")
3989
      names.append(lv_prefix + "_meta")
3990
    for idx, disk in enumerate(disk_info):
3991
      disk_index = idx + base_index
3992
      data_vg = disk.get(constants.IDISK_VG, vgname)
3993
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
3994
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3995
                                      disk[constants.IDISK_SIZE],
3996
                                      [data_vg, meta_vg],
3997
                                      names[idx * 2:idx * 2 + 2],
3998
                                      "disk/%d" % disk_index,
3999
                                      minors[idx * 2], minors[idx * 2 + 1])
4000
      disk_dev.mode = disk[constants.IDISK_MODE]
4001
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
4002
      disks.append(disk_dev)
4003
  else:
4004
    if secondary_nodes:
4005
      raise errors.ProgrammerError("Wrong template configuration")
4006

    
4007
    if template_name == constants.DT_FILE:
4008
      _req_file_storage()
4009
    elif template_name == constants.DT_SHARED_FILE:
4010
      _req_shr_file_storage()
4011

    
4012
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
4013
    if name_prefix is None:
4014
      names = None
4015
    else:
4016
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
4017
                                        (name_prefix, base_index + i)
4018
                                        for i in range(disk_count)])
4019

    
4020
    if template_name == constants.DT_PLAIN:
4021

    
4022
      def logical_id_fn(idx, _, disk):
4023
        vg = disk.get(constants.IDISK_VG, vgname)
4024
        return (vg, names[idx])
4025

    
4026
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
4027
      logical_id_fn = \
4028
        lambda _, disk_index, disk: (file_driver,
4029
                                     "%s/disk%d" % (file_storage_dir,
4030
                                                    disk_index))
4031
    elif template_name == constants.DT_BLOCK:
4032
      logical_id_fn = \
4033
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
4034
                                       disk[constants.IDISK_ADOPT])
4035
    elif template_name == constants.DT_RBD:
4036
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
4037
    elif template_name == constants.DT_EXT:
4038
      def logical_id_fn(idx, _, disk):
4039
        provider = disk.get(constants.IDISK_PROVIDER, None)
4040
        if provider is None:
4041
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
4042
                                       " not found", constants.DT_EXT,
4043
                                       constants.IDISK_PROVIDER)
4044
        return (provider, names[idx])
4045
    else:
4046
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
4047

    
4048
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
4049

    
4050
    for idx, disk in enumerate(disk_info):
4051
      params = {}
4052
      # Only for the Ext template add disk_info to params
4053
      if template_name == constants.DT_EXT:
4054
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
4055
        for key in disk:
4056
          if key not in constants.IDISK_PARAMS:
4057
            params[key] = disk[key]
4058
      disk_index = idx + base_index
4059
      size = disk[constants.IDISK_SIZE]
4060
      feedback_fn("* disk %s, size %s" %
4061
                  (disk_index, utils.FormatUnit(size, "h")))
4062
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
4063
                              logical_id=logical_id_fn(idx, disk_index, disk),
4064
                              iv_name="disk/%d" % disk_index,
4065
                              mode=disk[constants.IDISK_MODE],
4066
                              params=params)
4067
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
4068
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
4069
      disks.append(disk_dev)
4070

    
4071
  return disks
4072

    
4073

    
4074
def _GetInstanceInfoText(instance):
4075
  """Compute that text that should be added to the disk's metadata.
4076

4077
  """
4078
  return "originstname+%s" % instance.name
4079

    
4080

    
4081
def _CalcEta(time_taken, written, total_size):
4082
  """Calculates the ETA based on size written and total size.
4083

4084
  @param time_taken: The time taken so far
4085
  @param written: amount written so far
4086
  @param total_size: The total size of data to be written
4087
  @return: The remaining time in seconds
4088

4089
  ""&qu