Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / __init__.py @ f380d53c

History | View | Annotate | Download (399.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import time
33
import logging
34
import copy
35
import OpenSSL
36
import itertools
37
import operator
38

    
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import compat
46
from ganeti import masterd
47
from ganeti import netutils
48
from ganeti import query
49
from ganeti import qlang
50
from ganeti import opcodes
51
from ganeti import ht
52
from ganeti import rpc
53
from ganeti import pathutils
54
from ganeti import network
55
from ganeti.masterd import iallocator
56

    
57
from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
58
  Tasklet, _QueryBase
59
from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
60
  _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
61
  _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
62
  _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
63
  _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
64
  _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
65
  _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
66
  _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
67
  _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
68
  _CheckInstanceNodeGroups
69

    
70
from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
71
  LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
72
  LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
73
  LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
74
  LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
75
  LUClusterVerifyDisks
76
from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
77
  _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
78
  LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
79
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
80
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
81
  LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
82
  LUNetworkDisconnect
83
from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
84

    
85
import ganeti.masterd.instance # pylint: disable=W0611
86

    
87

    
88
# States of instance
89
INSTANCE_DOWN = [constants.ADMINST_DOWN]
90
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
91
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
92

    
93
#: Instance status in which an instance can be marked as offline/online
94
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
95
  constants.ADMINST_OFFLINE,
96
  ]))
97

    
98

    
99
def _IsExclusiveStorageEnabledNode(cfg, node):
100
  """Whether exclusive_storage is in effect for the given node.
101

102
  @type cfg: L{config.ConfigWriter}
103
  @param cfg: The cluster configuration
104
  @type node: L{objects.Node}
105
  @param node: The node
106
  @rtype: bool
107
  @return: The effective value of exclusive_storage
108

109
  """
110
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
111

    
112

    
113
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
114
  """Whether exclusive_storage is in effect for the given node.
115

116
  @type cfg: L{config.ConfigWriter}
117
  @param cfg: The cluster configuration
118
  @type nodename: string
119
  @param nodename: The node
120
  @rtype: bool
121
  @return: The effective value of exclusive_storage
122
  @raise errors.OpPrereqError: if no node exists with the given name
123

124
  """
125
  ni = cfg.GetNodeInfo(nodename)
126
  if ni is None:
127
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
128
                               errors.ECODE_NOENT)
129
  return _IsExclusiveStorageEnabledNode(cfg, ni)
130

    
131

    
132
def _CopyLockList(names):
133
  """Makes a copy of a list of lock names.
134

135
  Handles L{locking.ALL_SET} correctly.
136

137
  """
138
  if names == locking.ALL_SET:
139
    return locking.ALL_SET
140
  else:
141
    return names[:]
142

    
143

    
144
def _ReleaseLocks(lu, level, names=None, keep=None):
145
  """Releases locks owned by an LU.
146

147
  @type lu: L{LogicalUnit}
148
  @param level: Lock level
149
  @type names: list or None
150
  @param names: Names of locks to release
151
  @type keep: list or None
152
  @param keep: Names of locks to retain
153

154
  """
155
  assert not (keep is not None and names is not None), \
156
         "Only one of the 'names' and the 'keep' parameters can be given"
157

    
158
  if names is not None:
159
    should_release = names.__contains__
160
  elif keep:
161
    should_release = lambda name: name not in keep
162
  else:
163
    should_release = None
164

    
165
  owned = lu.owned_locks(level)
166
  if not owned:
167
    # Not owning any lock at this level, do nothing
168
    pass
169

    
170
  elif should_release:
171
    retain = []
172
    release = []
173

    
174
    # Determine which locks to release
175
    for name in owned:
176
      if should_release(name):
177
        release.append(name)
178
      else:
179
        retain.append(name)
180

    
181
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
182

    
183
    # Release just some locks
184
    lu.glm.release(level, names=release)
185

    
186
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
187
  else:
188
    # Release everything
189
    lu.glm.release(level)
190

    
191
    assert not lu.glm.is_owned(level), "No locks should be owned"
192

    
193

    
194
def _CheckOutputFields(static, dynamic, selected):
195
  """Checks whether all selected fields are valid.
196

197
  @type static: L{utils.FieldSet}
198
  @param static: static fields set
199
  @type dynamic: L{utils.FieldSet}
200
  @param dynamic: dynamic fields set
201

202
  """
203
  f = utils.FieldSet()
204
  f.Extend(static)
205
  f.Extend(dynamic)
206

    
207
  delta = f.NonMatching(selected)
208
  if delta:
209
    raise errors.OpPrereqError("Unknown output fields selected: %s"
210
                               % ",".join(delta), errors.ECODE_INVAL)
211

    
212

    
213
def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
214
  """Make sure that none of the given paramters is global.
215

216
  If a global parameter is found, an L{errors.OpPrereqError} exception is
217
  raised. This is used to avoid setting global parameters for individual nodes.
218

219
  @type params: dictionary
220
  @param params: Parameters to check
221
  @type glob_pars: dictionary
222
  @param glob_pars: Forbidden parameters
223
  @type kind: string
224
  @param kind: Kind of parameters (e.g. "node")
225
  @type bad_levels: string
226
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
227
      "instance")
228
  @type good_levels: strings
229
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
230
      "cluster or group")
231

232
  """
233
  used_globals = glob_pars.intersection(params)
234
  if used_globals:
235
    msg = ("The following %s parameters are global and cannot"
236
           " be customized at %s level, please modify them at"
237
           " %s level: %s" %
238
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
239
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
240

    
241

    
242
def _CheckNodeOnline(lu, node, msg=None):
243
  """Ensure that a given node is online.
244

245
  @param lu: the LU on behalf of which we make the check
246
  @param node: the node to check
247
  @param msg: if passed, should be a message to replace the default one
248
  @raise errors.OpPrereqError: if the node is offline
249

250
  """
251
  if msg is None:
252
    msg = "Can't use offline node"
253
  if lu.cfg.GetNodeInfo(node).offline:
254
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
255

    
256

    
257
def _CheckNodeNotDrained(lu, node):
258
  """Ensure that a given node is not drained.
259

260
  @param lu: the LU on behalf of which we make the check
261
  @param node: the node to check
262
  @raise errors.OpPrereqError: if the node is drained
263

264
  """
265
  if lu.cfg.GetNodeInfo(node).drained:
266
    raise errors.OpPrereqError("Can't use drained node %s" % node,
267
                               errors.ECODE_STATE)
268

    
269

    
270
def _CheckNodeVmCapable(lu, node):
271
  """Ensure that a given node is vm capable.
272

273
  @param lu: the LU on behalf of which we make the check
274
  @param node: the node to check
275
  @raise errors.OpPrereqError: if the node is not vm capable
276

277
  """
278
  if not lu.cfg.GetNodeInfo(node).vm_capable:
279
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
280
                               errors.ECODE_STATE)
281

    
282

    
283
def _CheckNodeHasOS(lu, node, os_name, force_variant):
284
  """Ensure that a node supports a given OS.
285

286
  @param lu: the LU on behalf of which we make the check
287
  @param node: the node to check
288
  @param os_name: the OS to query about
289
  @param force_variant: whether to ignore variant errors
290
  @raise errors.OpPrereqError: if the node is not supporting the OS
291

292
  """
293
  result = lu.rpc.call_os_get(node, os_name)
294
  result.Raise("OS '%s' not in supported OS list for node %s" %
295
               (os_name, node),
296
               prereq=True, ecode=errors.ECODE_INVAL)
297
  if not force_variant:
298
    _CheckOSVariant(result.payload, os_name)
299

    
300

    
301
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
302
  """Ensure that a node has the given secondary ip.
303

304
  @type lu: L{LogicalUnit}
305
  @param lu: the LU on behalf of which we make the check
306
  @type node: string
307
  @param node: the node to check
308
  @type secondary_ip: string
309
  @param secondary_ip: the ip to check
310
  @type prereq: boolean
311
  @param prereq: whether to throw a prerequisite or an execute error
312
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
313
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
314

315
  """
316
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
317
  result.Raise("Failure checking secondary ip on node %s" % node,
318
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
319
  if not result.payload:
320
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
321
           " please fix and re-run this command" % secondary_ip)
322
    if prereq:
323
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
324
    else:
325
      raise errors.OpExecError(msg)
326

    
327

    
328
def _GetClusterDomainSecret():
329
  """Reads the cluster domain secret.
330

331
  """
332
  return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
333
                               strict=True)
334

    
335

    
336
def _CheckInstanceState(lu, instance, req_states, msg=None):
337
  """Ensure that an instance is in one of the required states.
338

339
  @param lu: the LU on behalf of which we make the check
340
  @param instance: the instance to check
341
  @param msg: if passed, should be a message to replace the default one
342
  @raise errors.OpPrereqError: if the instance is not in the required state
343

344
  """
345
  if msg is None:
346
    msg = ("can't use instance from outside %s states" %
347
           utils.CommaJoin(req_states))
348
  if instance.admin_state not in req_states:
349
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
350
                               (instance.name, instance.admin_state, msg),
351
                               errors.ECODE_STATE)
352

    
353
  if constants.ADMINST_UP not in req_states:
354
    pnode = instance.primary_node
355
    if not lu.cfg.GetNodeInfo(pnode).offline:
356
      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
357
      ins_l.Raise("Can't contact node %s for instance information" % pnode,
358
                  prereq=True, ecode=errors.ECODE_ENVIRON)
359
      if instance.name in ins_l.payload:
360
        raise errors.OpPrereqError("Instance %s is running, %s" %
361
                                   (instance.name, msg), errors.ECODE_STATE)
362
    else:
363
      lu.LogWarning("Primary node offline, ignoring check that instance"
364
                     " is down")
365

    
366

    
367
def _ComputeIPolicyInstanceSpecViolation(
368
  ipolicy, instance_spec, disk_template,
369
  _compute_fn=_ComputeIPolicySpecViolation):
370
  """Compute if instance specs meets the specs of ipolicy.
371

372
  @type ipolicy: dict
373
  @param ipolicy: The ipolicy to verify against
374
  @param instance_spec: dict
375
  @param instance_spec: The instance spec to verify
376
  @type disk_template: string
377
  @param disk_template: the disk template of the instance
378
  @param _compute_fn: The function to verify ipolicy (unittest only)
379
  @see: L{_ComputeIPolicySpecViolation}
380

381
  """
382
  mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
383
  cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
384
  disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
385
  disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
386
  nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
387
  spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
388

    
389
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
390
                     disk_sizes, spindle_use, disk_template)
391

    
392

    
393
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
394
                                 target_group, cfg,
395
                                 _compute_fn=_ComputeIPolicyInstanceViolation):
396
  """Compute if instance meets the specs of the new target group.
397

398
  @param ipolicy: The ipolicy to verify
399
  @param instance: The instance object to verify
400
  @param current_group: The current group of the instance
401
  @param target_group: The new group of the instance
402
  @type cfg: L{config.ConfigWriter}
403
  @param cfg: Cluster configuration
404
  @param _compute_fn: The function to verify ipolicy (unittest only)
405
  @see: L{_ComputeIPolicySpecViolation}
406

407
  """
408
  if current_group == target_group:
409
    return []
410
  else:
411
    return _compute_fn(ipolicy, instance, cfg)
412

    
413

    
414
def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
415
                            _compute_fn=_ComputeIPolicyNodeViolation):
416
  """Checks that the target node is correct in terms of instance policy.
417

418
  @param ipolicy: The ipolicy to verify
419
  @param instance: The instance object to verify
420
  @param node: The new node to relocate
421
  @type cfg: L{config.ConfigWriter}
422
  @param cfg: Cluster configuration
423
  @param ignore: Ignore violations of the ipolicy
424
  @param _compute_fn: The function to verify ipolicy (unittest only)
425
  @see: L{_ComputeIPolicySpecViolation}
426

427
  """
428
  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
429
  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
430

    
431
  if res:
432
    msg = ("Instance does not meet target node group's (%s) instance"
433
           " policy: %s") % (node.group, utils.CommaJoin(res))
434
    if ignore:
435
      lu.LogWarning(msg)
436
    else:
437
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
438

    
439

    
440
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
441
                          minmem, maxmem, vcpus, nics, disk_template, disks,
442
                          bep, hvp, hypervisor_name, tags):
443
  """Builds instance related env variables for hooks
444

445
  This builds the hook environment from individual variables.
446

447
  @type name: string
448
  @param name: the name of the instance
449
  @type primary_node: string
450
  @param primary_node: the name of the instance's primary node
451
  @type secondary_nodes: list
452
  @param secondary_nodes: list of secondary nodes as strings
453
  @type os_type: string
454
  @param os_type: the name of the instance's OS
455
  @type status: string
456
  @param status: the desired status of the instance
457
  @type minmem: string
458
  @param minmem: the minimum memory size of the instance
459
  @type maxmem: string
460
  @param maxmem: the maximum memory size of the instance
461
  @type vcpus: string
462
  @param vcpus: the count of VCPUs the instance has
463
  @type nics: list
464
  @param nics: list of tuples (name, uuid, ip, mac, mode, link, net, netinfo)
465
      representing the NICs the instance has
466
  @type disk_template: string
467
  @param disk_template: the disk template of the instance
468
  @type disks: list
469
  @param disks: list of tuples (name, uuid, size, mode)
470
  @type bep: dict
471
  @param bep: the backend parameters for the instance
472
  @type hvp: dict
473
  @param hvp: the hypervisor parameters for the instance
474
  @type hypervisor_name: string
475
  @param hypervisor_name: the hypervisor for the instance
476
  @type tags: list
477
  @param tags: list of instance tags as strings
478
  @rtype: dict
479
  @return: the hook environment for this instance
480

481
  """
482
  env = {
483
    "OP_TARGET": name,
484
    "INSTANCE_NAME": name,
485
    "INSTANCE_PRIMARY": primary_node,
486
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
487
    "INSTANCE_OS_TYPE": os_type,
488
    "INSTANCE_STATUS": status,
489
    "INSTANCE_MINMEM": minmem,
490
    "INSTANCE_MAXMEM": maxmem,
491
    # TODO(2.9) remove deprecated "memory" value
492
    "INSTANCE_MEMORY": maxmem,
493
    "INSTANCE_VCPUS": vcpus,
494
    "INSTANCE_DISK_TEMPLATE": disk_template,
495
    "INSTANCE_HYPERVISOR": hypervisor_name,
496
  }
497
  if nics:
498
    nic_count = len(nics)
499
    for idx, (name, _, ip, mac, mode, link, net, netinfo) in enumerate(nics):
500
      if ip is None:
501
        ip = ""
502
      env["INSTANCE_NIC%d_NAME" % idx] = name
503
      env["INSTANCE_NIC%d_IP" % idx] = ip
504
      env["INSTANCE_NIC%d_MAC" % idx] = mac
505
      env["INSTANCE_NIC%d_MODE" % idx] = mode
506
      env["INSTANCE_NIC%d_LINK" % idx] = link
507
      if netinfo:
508
        nobj = objects.Network.FromDict(netinfo)
509
        env.update(nobj.HooksDict("INSTANCE_NIC%d_" % idx))
510
      elif network:
511
        # FIXME: broken network reference: the instance NIC specifies a
512
        # network, but the relevant network entry was not in the config. This
513
        # should be made impossible.
514
        env["INSTANCE_NIC%d_NETWORK_NAME" % idx] = net
515
      if mode == constants.NIC_MODE_BRIDGED:
516
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
517
  else:
518
    nic_count = 0
519

    
520
  env["INSTANCE_NIC_COUNT"] = nic_count
521

    
522
  if disks:
523
    disk_count = len(disks)
524
    for idx, (name, size, mode) in enumerate(disks):
525
      env["INSTANCE_DISK%d_NAME" % idx] = name
526
      env["INSTANCE_DISK%d_SIZE" % idx] = size
527
      env["INSTANCE_DISK%d_MODE" % idx] = mode
528
  else:
529
    disk_count = 0
530

    
531
  env["INSTANCE_DISK_COUNT"] = disk_count
532

    
533
  if not tags:
534
    tags = []
535

    
536
  env["INSTANCE_TAGS"] = " ".join(tags)
537

    
538
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
539
    for key, value in source.items():
540
      env["INSTANCE_%s_%s" % (kind, key)] = value
541

    
542
  return env
543

    
544

    
545
def _NICToTuple(lu, nic):
546
  """Build a tupple of nic information.
547

548
  @type lu:  L{LogicalUnit}
549
  @param lu: the logical unit on whose behalf we execute
550
  @type nic: L{objects.NIC}
551
  @param nic: nic to convert to hooks tuple
552

553
  """
554
  cluster = lu.cfg.GetClusterInfo()
555
  filled_params = cluster.SimpleFillNIC(nic.nicparams)
556
  mode = filled_params[constants.NIC_MODE]
557
  link = filled_params[constants.NIC_LINK]
558
  netinfo = None
559
  if nic.network:
560
    nobj = lu.cfg.GetNetwork(nic.network)
561
    netinfo = objects.Network.ToDict(nobj)
562
  return (nic.name, nic.uuid, nic.ip, nic.mac, mode, link, nic.network, netinfo)
563

    
564

    
565
def _NICListToTuple(lu, nics):
566
  """Build a list of nic information tuples.
567

568
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
569
  value in LUInstanceQueryData.
570

571
  @type lu:  L{LogicalUnit}
572
  @param lu: the logical unit on whose behalf we execute
573
  @type nics: list of L{objects.NIC}
574
  @param nics: list of nics to convert to hooks tuples
575

576
  """
577
  hooks_nics = []
578
  for nic in nics:
579
    hooks_nics.append(_NICToTuple(lu, nic))
580
  return hooks_nics
581

    
582

    
583
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
584
  """Builds instance related env variables for hooks from an object.
585

586
  @type lu: L{LogicalUnit}
587
  @param lu: the logical unit on whose behalf we execute
588
  @type instance: L{objects.Instance}
589
  @param instance: the instance for which we should build the
590
      environment
591
  @type override: dict
592
  @param override: dictionary with key/values that will override
593
      our values
594
  @rtype: dict
595
  @return: the hook environment dictionary
596

597
  """
598
  cluster = lu.cfg.GetClusterInfo()
599
  bep = cluster.FillBE(instance)
600
  hvp = cluster.FillHV(instance)
601
  args = {
602
    "name": instance.name,
603
    "primary_node": instance.primary_node,
604
    "secondary_nodes": instance.secondary_nodes,
605
    "os_type": instance.os,
606
    "status": instance.admin_state,
607
    "maxmem": bep[constants.BE_MAXMEM],
608
    "minmem": bep[constants.BE_MINMEM],
609
    "vcpus": bep[constants.BE_VCPUS],
610
    "nics": _NICListToTuple(lu, instance.nics),
611
    "disk_template": instance.disk_template,
612
    "disks": [(disk.name, disk.size, disk.mode)
613
              for disk in instance.disks],
614
    "bep": bep,
615
    "hvp": hvp,
616
    "hypervisor_name": instance.hypervisor,
617
    "tags": instance.tags,
618
  }
619
  if override:
620
    args.update(override)
621
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
622

    
623

    
624
def _DecideSelfPromotion(lu, exceptions=None):
625
  """Decide whether I should promote myself as a master candidate.
626

627
  """
628
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
629
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
630
  # the new node will increase mc_max with one, so:
631
  mc_should = min(mc_should + 1, cp_size)
632
  return mc_now < mc_should
633

    
634

    
635
def _CheckNicsBridgesExist(lu, target_nics, target_node):
636
  """Check that the brigdes needed by a list of nics exist.
637

638
  """
639
  cluster = lu.cfg.GetClusterInfo()
640
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
641
  brlist = [params[constants.NIC_LINK] for params in paramslist
642
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
643
  if brlist:
644
    result = lu.rpc.call_bridges_exist(target_node, brlist)
645
    result.Raise("Error checking bridges on destination node '%s'" %
646
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
647

    
648

    
649
def _CheckInstanceBridgesExist(lu, instance, node=None):
650
  """Check that the brigdes needed by an instance exist.
651

652
  """
653
  if node is None:
654
    node = instance.primary_node
655
  _CheckNicsBridgesExist(lu, instance.nics, node)
656

    
657

    
658
def _CheckOSVariant(os_obj, name):
659
  """Check whether an OS name conforms to the os variants specification.
660

661
  @type os_obj: L{objects.OS}
662
  @param os_obj: OS object to check
663
  @type name: string
664
  @param name: OS name passed by the user, to check for validity
665

666
  """
667
  variant = objects.OS.GetVariant(name)
668
  if not os_obj.supported_variants:
669
    if variant:
670
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
671
                                 " passed)" % (os_obj.name, variant),
672
                                 errors.ECODE_INVAL)
673
    return
674
  if not variant:
675
    raise errors.OpPrereqError("OS name must include a variant",
676
                               errors.ECODE_INVAL)
677

    
678
  if variant not in os_obj.supported_variants:
679
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
680

    
681

    
682
def _GetNodeInstancesInner(cfg, fn):
683
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
684

    
685

    
686
def _GetNodeInstances(cfg, node_name):
687
  """Returns a list of all primary and secondary instances on a node.
688

689
  """
690

    
691
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
692

    
693

    
694
def _GetNodePrimaryInstances(cfg, node_name):
695
  """Returns primary instances on a node.
696

697
  """
698
  return _GetNodeInstancesInner(cfg,
699
                                lambda inst: node_name == inst.primary_node)
700

    
701

    
702
def _GetNodeSecondaryInstances(cfg, node_name):
703
  """Returns secondary instances on a node.
704

705
  """
706
  return _GetNodeInstancesInner(cfg,
707
                                lambda inst: node_name in inst.secondary_nodes)
708

    
709

    
710
def _GetStorageTypeArgs(cfg, storage_type):
711
  """Returns the arguments for a storage type.
712

713
  """
714
  # Special case for file storage
715
  if storage_type == constants.ST_FILE:
716
    # storage.FileStorage wants a list of storage directories
717
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
718

    
719
  return []
720

    
721

    
722
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
723
  faulty = []
724

    
725
  for dev in instance.disks:
726
    cfg.SetDiskID(dev, node_name)
727

    
728
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
729
                                                                instance))
730
  result.Raise("Failed to get disk status from node %s" % node_name,
731
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
732

    
733
  for idx, bdev_status in enumerate(result.payload):
734
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
735
      faulty.append(idx)
736

    
737
  return faulty
738

    
739

    
740
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
741
  """Check the sanity of iallocator and node arguments and use the
742
  cluster-wide iallocator if appropriate.
743

744
  Check that at most one of (iallocator, node) is specified. If none is
745
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
746
  then the LU's opcode's iallocator slot is filled with the cluster-wide
747
  default iallocator.
748

749
  @type iallocator_slot: string
750
  @param iallocator_slot: the name of the opcode iallocator slot
751
  @type node_slot: string
752
  @param node_slot: the name of the opcode target node slot
753

754
  """
755
  node = getattr(lu.op, node_slot, None)
756
  ialloc = getattr(lu.op, iallocator_slot, None)
757
  if node == []:
758
    node = None
759

    
760
  if node is not None and ialloc is not None:
761
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
762
                               errors.ECODE_INVAL)
763
  elif ((node is None and ialloc is None) or
764
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
765
    default_iallocator = lu.cfg.GetDefaultIAllocator()
766
    if default_iallocator:
767
      setattr(lu.op, iallocator_slot, default_iallocator)
768
    else:
769
      raise errors.OpPrereqError("No iallocator or node given and no"
770
                                 " cluster-wide default iallocator found;"
771
                                 " please specify either an iallocator or a"
772
                                 " node, or set a cluster-wide default"
773
                                 " iallocator", errors.ECODE_INVAL)
774

    
775

    
776
def _CheckHostnameSane(lu, name):
777
  """Ensures that a given hostname resolves to a 'sane' name.
778

779
  The given name is required to be a prefix of the resolved hostname,
780
  to prevent accidental mismatches.
781

782
  @param lu: the logical unit on behalf of which we're checking
783
  @param name: the name we should resolve and check
784
  @return: the resolved hostname object
785

786
  """
787
  hostname = netutils.GetHostname(name=name)
788
  if hostname.name != name:
789
    lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
790
  if not utils.MatchNameComponent(name, [hostname.name]):
791
    raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
792
                                " same as given hostname '%s'") %
793
                                (hostname.name, name), errors.ECODE_INVAL)
794
  return hostname
795

    
796

    
797
def _WaitForSync(lu, instance, disks=None, oneshot=False):
798
  """Sleep and poll for an instance's disk to sync.
799

800
  """
801
  if not instance.disks or disks is not None and not disks:
802
    return True
803

    
804
  disks = _ExpandCheckDisks(instance, disks)
805

    
806
  if not oneshot:
807
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
808

    
809
  node = instance.primary_node
810

    
811
  for dev in disks:
812
    lu.cfg.SetDiskID(dev, node)
813

    
814
  # TODO: Convert to utils.Retry
815

    
816
  retries = 0
817
  degr_retries = 10 # in seconds, as we sleep 1 second each time
818
  while True:
819
    max_time = 0
820
    done = True
821
    cumul_degraded = False
822
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
823
    msg = rstats.fail_msg
824
    if msg:
825
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
826
      retries += 1
827
      if retries >= 10:
828
        raise errors.RemoteError("Can't contact node %s for mirror data,"
829
                                 " aborting." % node)
830
      time.sleep(6)
831
      continue
832
    rstats = rstats.payload
833
    retries = 0
834
    for i, mstat in enumerate(rstats):
835
      if mstat is None:
836
        lu.LogWarning("Can't compute data for node %s/%s",
837
                           node, disks[i].iv_name)
838
        continue
839

    
840
      cumul_degraded = (cumul_degraded or
841
                        (mstat.is_degraded and mstat.sync_percent is None))
842
      if mstat.sync_percent is not None:
843
        done = False
844
        if mstat.estimated_time is not None:
845
          rem_time = ("%s remaining (estimated)" %
846
                      utils.FormatSeconds(mstat.estimated_time))
847
          max_time = mstat.estimated_time
848
        else:
849
          rem_time = "no time estimate"
850
        lu.LogInfo("- device %s: %5.2f%% done, %s",
851
                   disks[i].iv_name, mstat.sync_percent, rem_time)
852

    
853
    # if we're done but degraded, let's do a few small retries, to
854
    # make sure we see a stable and not transient situation; therefore
855
    # we force restart of the loop
856
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
857
      logging.info("Degraded disks found, %d retries left", degr_retries)
858
      degr_retries -= 1
859
      time.sleep(1)
860
      continue
861

    
862
    if done or oneshot:
863
      break
864

    
865
    time.sleep(min(60, max_time))
866

    
867
  if done:
868
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
869

    
870
  return not cumul_degraded
871

    
872

    
873
def _BlockdevFind(lu, node, dev, instance):
874
  """Wrapper around call_blockdev_find to annotate diskparams.
875

876
  @param lu: A reference to the lu object
877
  @param node: The node to call out
878
  @param dev: The device to find
879
  @param instance: The instance object the device belongs to
880
  @returns The result of the rpc call
881

882
  """
883
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
884
  return lu.rpc.call_blockdev_find(node, disk)
885

    
886

    
887
def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
888
  """Wrapper around L{_CheckDiskConsistencyInner}.
889

890
  """
891
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
892
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
893
                                    ldisk=ldisk)
894

    
895

    
896
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
897
                               ldisk=False):
898
  """Check that mirrors are not degraded.
899

900
  @attention: The device has to be annotated already.
901

902
  The ldisk parameter, if True, will change the test from the
903
  is_degraded attribute (which represents overall non-ok status for
904
  the device(s)) to the ldisk (representing the local storage status).
905

906
  """
907
  lu.cfg.SetDiskID(dev, node)
908

    
909
  result = True
910

    
911
  if on_primary or dev.AssembleOnSecondary():
912
    rstats = lu.rpc.call_blockdev_find(node, dev)
913
    msg = rstats.fail_msg
914
    if msg:
915
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
916
      result = False
917
    elif not rstats.payload:
918
      lu.LogWarning("Can't find disk on node %s", node)
919
      result = False
920
    else:
921
      if ldisk:
922
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
923
      else:
924
        result = result and not rstats.payload.is_degraded
925

    
926
  if dev.children:
927
    for child in dev.children:
928
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
929
                                                     on_primary)
930

    
931
  return result
932

    
933

    
934
class LUOobCommand(NoHooksLU):
935
  """Logical unit for OOB handling.
936

937
  """
938
  REQ_BGL = False
939
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
940

    
941
  def ExpandNames(self):
942
    """Gather locks we need.
943

944
    """
945
    if self.op.node_names:
946
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
947
      lock_names = self.op.node_names
948
    else:
949
      lock_names = locking.ALL_SET
950

    
951
    self.needed_locks = {
952
      locking.LEVEL_NODE: lock_names,
953
      }
954

    
955
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
956

    
957
    if not self.op.node_names:
958
      # Acquire node allocation lock only if all nodes are affected
959
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
960

    
961
  def CheckPrereq(self):
962
    """Check prerequisites.
963

964
    This checks:
965
     - the node exists in the configuration
966
     - OOB is supported
967

968
    Any errors are signaled by raising errors.OpPrereqError.
969

970
    """
971
    self.nodes = []
972
    self.master_node = self.cfg.GetMasterNode()
973

    
974
    assert self.op.power_delay >= 0.0
975

    
976
    if self.op.node_names:
977
      if (self.op.command in self._SKIP_MASTER and
978
          self.master_node in self.op.node_names):
979
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
980
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
981

    
982
        if master_oob_handler:
983
          additional_text = ("run '%s %s %s' if you want to operate on the"
984
                             " master regardless") % (master_oob_handler,
985
                                                      self.op.command,
986
                                                      self.master_node)
987
        else:
988
          additional_text = "it does not support out-of-band operations"
989

    
990
        raise errors.OpPrereqError(("Operating on the master node %s is not"
991
                                    " allowed for %s; %s") %
992
                                   (self.master_node, self.op.command,
993
                                    additional_text), errors.ECODE_INVAL)
994
    else:
995
      self.op.node_names = self.cfg.GetNodeList()
996
      if self.op.command in self._SKIP_MASTER:
997
        self.op.node_names.remove(self.master_node)
998

    
999
    if self.op.command in self._SKIP_MASTER:
1000
      assert self.master_node not in self.op.node_names
1001

    
1002
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
1003
      if node is None:
1004
        raise errors.OpPrereqError("Node %s not found" % node_name,
1005
                                   errors.ECODE_NOENT)
1006
      else:
1007
        self.nodes.append(node)
1008

    
1009
      if (not self.op.ignore_status and
1010
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
1011
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
1012
                                    " not marked offline") % node_name,
1013
                                   errors.ECODE_STATE)
1014

    
1015
  def Exec(self, feedback_fn):
1016
    """Execute OOB and return result if we expect any.
1017

1018
    """
1019
    master_node = self.master_node
1020
    ret = []
1021

    
1022
    for idx, node in enumerate(utils.NiceSort(self.nodes,
1023
                                              key=lambda node: node.name)):
1024
      node_entry = [(constants.RS_NORMAL, node.name)]
1025
      ret.append(node_entry)
1026

    
1027
      oob_program = _SupportsOob(self.cfg, node)
1028

    
1029
      if not oob_program:
1030
        node_entry.append((constants.RS_UNAVAIL, None))
1031
        continue
1032

    
1033
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
1034
                   self.op.command, oob_program, node.name)
1035
      result = self.rpc.call_run_oob(master_node, oob_program,
1036
                                     self.op.command, node.name,
1037
                                     self.op.timeout)
1038

    
1039
      if result.fail_msg:
1040
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
1041
                        node.name, result.fail_msg)
1042
        node_entry.append((constants.RS_NODATA, None))
1043
      else:
1044
        try:
1045
          self._CheckPayload(result)
1046
        except errors.OpExecError, err:
1047
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
1048
                          node.name, err)
1049
          node_entry.append((constants.RS_NODATA, None))
1050
        else:
1051
          if self.op.command == constants.OOB_HEALTH:
1052
            # For health we should log important events
1053
            for item, status in result.payload:
1054
              if status in [constants.OOB_STATUS_WARNING,
1055
                            constants.OOB_STATUS_CRITICAL]:
1056
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
1057
                                item, node.name, status)
1058

    
1059
          if self.op.command == constants.OOB_POWER_ON:
1060
            node.powered = True
1061
          elif self.op.command == constants.OOB_POWER_OFF:
1062
            node.powered = False
1063
          elif self.op.command == constants.OOB_POWER_STATUS:
1064
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
1065
            if powered != node.powered:
1066
              logging.warning(("Recorded power state (%s) of node '%s' does not"
1067
                               " match actual power state (%s)"), node.powered,
1068
                              node.name, powered)
1069

    
1070
          # For configuration changing commands we should update the node
1071
          if self.op.command in (constants.OOB_POWER_ON,
1072
                                 constants.OOB_POWER_OFF):
1073
            self.cfg.Update(node, feedback_fn)
1074

    
1075
          node_entry.append((constants.RS_NORMAL, result.payload))
1076

    
1077
          if (self.op.command == constants.OOB_POWER_ON and
1078
              idx < len(self.nodes) - 1):
1079
            time.sleep(self.op.power_delay)
1080

    
1081
    return ret
1082

    
1083
  def _CheckPayload(self, result):
1084
    """Checks if the payload is valid.
1085

1086
    @param result: RPC result
1087
    @raises errors.OpExecError: If payload is not valid
1088

1089
    """
1090
    errs = []
1091
    if self.op.command == constants.OOB_HEALTH:
1092
      if not isinstance(result.payload, list):
1093
        errs.append("command 'health' is expected to return a list but got %s" %
1094
                    type(result.payload))
1095
      else:
1096
        for item, status in result.payload:
1097
          if status not in constants.OOB_STATUSES:
1098
            errs.append("health item '%s' has invalid status '%s'" %
1099
                        (item, status))
1100

    
1101
    if self.op.command == constants.OOB_POWER_STATUS:
1102
      if not isinstance(result.payload, dict):
1103
        errs.append("power-status is expected to return a dict but got %s" %
1104
                    type(result.payload))
1105

    
1106
    if self.op.command in [
1107
      constants.OOB_POWER_ON,
1108
      constants.OOB_POWER_OFF,
1109
      constants.OOB_POWER_CYCLE,
1110
      ]:
1111
      if result.payload is not None:
1112
        errs.append("%s is expected to not return payload but got '%s'" %
1113
                    (self.op.command, result.payload))
1114

    
1115
    if errs:
1116
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
1117
                               utils.CommaJoin(errs))
1118

    
1119

    
1120
class _OsQuery(_QueryBase):
1121
  FIELDS = query.OS_FIELDS
1122

    
1123
  def ExpandNames(self, lu):
1124
    # Lock all nodes in shared mode
1125
    # Temporary removal of locks, should be reverted later
1126
    # TODO: reintroduce locks when they are lighter-weight
1127
    lu.needed_locks = {}
1128
    #self.share_locks[locking.LEVEL_NODE] = 1
1129
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1130

    
1131
    # The following variables interact with _QueryBase._GetNames
1132
    if self.names:
1133
      self.wanted = self.names
1134
    else:
1135
      self.wanted = locking.ALL_SET
1136

    
1137
    self.do_locking = self.use_locking
1138

    
1139
  def DeclareLocks(self, lu, level):
1140
    pass
1141

    
1142
  @staticmethod
1143
  def _DiagnoseByOS(rlist):
1144
    """Remaps a per-node return list into an a per-os per-node dictionary
1145

1146
    @param rlist: a map with node names as keys and OS objects as values
1147

1148
    @rtype: dict
1149
    @return: a dictionary with osnames as keys and as value another
1150
        map, with nodes as keys and tuples of (path, status, diagnose,
1151
        variants, parameters, api_versions) as values, eg::
1152

1153
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
1154
                                     (/srv/..., False, "invalid api")],
1155
                           "node2": [(/srv/..., True, "", [], [])]}
1156
          }
1157

1158
    """
1159
    all_os = {}
1160
    # we build here the list of nodes that didn't fail the RPC (at RPC
1161
    # level), so that nodes with a non-responding node daemon don't
1162
    # make all OSes invalid
1163
    good_nodes = [node_name for node_name in rlist
1164
                  if not rlist[node_name].fail_msg]
1165
    for node_name, nr in rlist.items():
1166
      if nr.fail_msg or not nr.payload:
1167
        continue
1168
      for (name, path, status, diagnose, variants,
1169
           params, api_versions) in nr.payload:
1170
        if name not in all_os:
1171
          # build a list of nodes for this os containing empty lists
1172
          # for each node in node_list
1173
          all_os[name] = {}
1174
          for nname in good_nodes:
1175
            all_os[name][nname] = []
1176
        # convert params from [name, help] to (name, help)
1177
        params = [tuple(v) for v in params]
1178
        all_os[name][node_name].append((path, status, diagnose,
1179
                                        variants, params, api_versions))
1180
    return all_os
1181

    
1182
  def _GetQueryData(self, lu):
1183
    """Computes the list of nodes and their attributes.
1184

1185
    """
1186
    # Locking is not used
1187
    assert not (compat.any(lu.glm.is_owned(level)
1188
                           for level in locking.LEVELS
1189
                           if level != locking.LEVEL_CLUSTER) or
1190
                self.do_locking or self.use_locking)
1191

    
1192
    valid_nodes = [node.name
1193
                   for node in lu.cfg.GetAllNodesInfo().values()
1194
                   if not node.offline and node.vm_capable]
1195
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
1196
    cluster = lu.cfg.GetClusterInfo()
1197

    
1198
    data = {}
1199

    
1200
    for (os_name, os_data) in pol.items():
1201
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
1202
                          hidden=(os_name in cluster.hidden_os),
1203
                          blacklisted=(os_name in cluster.blacklisted_os))
1204

    
1205
      variants = set()
1206
      parameters = set()
1207
      api_versions = set()
1208

    
1209
      for idx, osl in enumerate(os_data.values()):
1210
        info.valid = bool(info.valid and osl and osl[0][1])
1211
        if not info.valid:
1212
          break
1213

    
1214
        (node_variants, node_params, node_api) = osl[0][3:6]
1215
        if idx == 0:
1216
          # First entry
1217
          variants.update(node_variants)
1218
          parameters.update(node_params)
1219
          api_versions.update(node_api)
1220
        else:
1221
          # Filter out inconsistent values
1222
          variants.intersection_update(node_variants)
1223
          parameters.intersection_update(node_params)
1224
          api_versions.intersection_update(node_api)
1225

    
1226
      info.variants = list(variants)
1227
      info.parameters = list(parameters)
1228
      info.api_versions = list(api_versions)
1229

    
1230
      data[os_name] = info
1231

    
1232
    # Prepare data in requested order
1233
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1234
            if name in data]
1235

    
1236

    
1237
class LUOsDiagnose(NoHooksLU):
1238
  """Logical unit for OS diagnose/query.
1239

1240
  """
1241
  REQ_BGL = False
1242

    
1243
  @staticmethod
1244
  def _BuildFilter(fields, names):
1245
    """Builds a filter for querying OSes.
1246

1247
    """
1248
    name_filter = qlang.MakeSimpleFilter("name", names)
1249

    
1250
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
1251
    # respective field is not requested
1252
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
1253
                     for fname in ["hidden", "blacklisted"]
1254
                     if fname not in fields]
1255
    if "valid" not in fields:
1256
      status_filter.append([qlang.OP_TRUE, "valid"])
1257

    
1258
    if status_filter:
1259
      status_filter.insert(0, qlang.OP_AND)
1260
    else:
1261
      status_filter = None
1262

    
1263
    if name_filter and status_filter:
1264
      return [qlang.OP_AND, name_filter, status_filter]
1265
    elif name_filter:
1266
      return name_filter
1267
    else:
1268
      return status_filter
1269

    
1270
  def CheckArguments(self):
1271
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
1272
                       self.op.output_fields, False)
1273

    
1274
  def ExpandNames(self):
1275
    self.oq.ExpandNames(self)
1276

    
1277
  def Exec(self, feedback_fn):
1278
    return self.oq.OldStyleQuery(self)
1279

    
1280

    
1281
class _ExtStorageQuery(_QueryBase):
1282
  FIELDS = query.EXTSTORAGE_FIELDS
1283

    
1284
  def ExpandNames(self, lu):
1285
    # Lock all nodes in shared mode
1286
    # Temporary removal of locks, should be reverted later
1287
    # TODO: reintroduce locks when they are lighter-weight
1288
    lu.needed_locks = {}
1289
    #self.share_locks[locking.LEVEL_NODE] = 1
1290
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1291

    
1292
    # The following variables interact with _QueryBase._GetNames
1293
    if self.names:
1294
      self.wanted = self.names
1295
    else:
1296
      self.wanted = locking.ALL_SET
1297

    
1298
    self.do_locking = self.use_locking
1299

    
1300
  def DeclareLocks(self, lu, level):
1301
    pass
1302

    
1303
  @staticmethod
1304
  def _DiagnoseByProvider(rlist):
1305
    """Remaps a per-node return list into an a per-provider per-node dictionary
1306

1307
    @param rlist: a map with node names as keys and ExtStorage objects as values
1308

1309
    @rtype: dict
1310
    @return: a dictionary with extstorage providers as keys and as
1311
        value another map, with nodes as keys and tuples of
1312
        (path, status, diagnose, parameters) as values, eg::
1313

1314
          {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
1315
                         "node2": [(/srv/..., False, "missing file")]
1316
                         "node3": [(/srv/..., True, "", [])]
1317
          }
1318

1319
    """
1320
    all_es = {}
1321
    # we build here the list of nodes that didn't fail the RPC (at RPC
1322
    # level), so that nodes with a non-responding node daemon don't
1323
    # make all OSes invalid
1324
    good_nodes = [node_name for node_name in rlist
1325
                  if not rlist[node_name].fail_msg]
1326
    for node_name, nr in rlist.items():
1327
      if nr.fail_msg or not nr.payload:
1328
        continue
1329
      for (name, path, status, diagnose, params) in nr.payload:
1330
        if name not in all_es:
1331
          # build a list of nodes for this os containing empty lists
1332
          # for each node in node_list
1333
          all_es[name] = {}
1334
          for nname in good_nodes:
1335
            all_es[name][nname] = []
1336
        # convert params from [name, help] to (name, help)
1337
        params = [tuple(v) for v in params]
1338
        all_es[name][node_name].append((path, status, diagnose, params))
1339
    return all_es
1340

    
1341
  def _GetQueryData(self, lu):
1342
    """Computes the list of nodes and their attributes.
1343

1344
    """
1345
    # Locking is not used
1346
    assert not (compat.any(lu.glm.is_owned(level)
1347
                           for level in locking.LEVELS
1348
                           if level != locking.LEVEL_CLUSTER) or
1349
                self.do_locking or self.use_locking)
1350

    
1351
    valid_nodes = [node.name
1352
                   for node in lu.cfg.GetAllNodesInfo().values()
1353
                   if not node.offline and node.vm_capable]
1354
    pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
1355

    
1356
    data = {}
1357

    
1358
    nodegroup_list = lu.cfg.GetNodeGroupList()
1359

    
1360
    for (es_name, es_data) in pol.items():
1361
      # For every provider compute the nodegroup validity.
1362
      # To do this we need to check the validity of each node in es_data
1363
      # and then construct the corresponding nodegroup dict:
1364
      #      { nodegroup1: status
1365
      #        nodegroup2: status
1366
      #      }
1367
      ndgrp_data = {}
1368
      for nodegroup in nodegroup_list:
1369
        ndgrp = lu.cfg.GetNodeGroup(nodegroup)
1370

    
1371
        nodegroup_nodes = ndgrp.members
1372
        nodegroup_name = ndgrp.name
1373
        node_statuses = []
1374

    
1375
        for node in nodegroup_nodes:
1376
          if node in valid_nodes:
1377
            if es_data[node] != []:
1378
              node_status = es_data[node][0][1]
1379
              node_statuses.append(node_status)
1380
            else:
1381
              node_statuses.append(False)
1382

    
1383
        if False in node_statuses:
1384
          ndgrp_data[nodegroup_name] = False
1385
        else:
1386
          ndgrp_data[nodegroup_name] = True
1387

    
1388
      # Compute the provider's parameters
1389
      parameters = set()
1390
      for idx, esl in enumerate(es_data.values()):
1391
        valid = bool(esl and esl[0][1])
1392
        if not valid:
1393
          break
1394

    
1395
        node_params = esl[0][3]
1396
        if idx == 0:
1397
          # First entry
1398
          parameters.update(node_params)
1399
        else:
1400
          # Filter out inconsistent values
1401
          parameters.intersection_update(node_params)
1402

    
1403
      params = list(parameters)
1404

    
1405
      # Now fill all the info for this provider
1406
      info = query.ExtStorageInfo(name=es_name, node_status=es_data,
1407
                                  nodegroup_status=ndgrp_data,
1408
                                  parameters=params)
1409

    
1410
      data[es_name] = info
1411

    
1412
    # Prepare data in requested order
1413
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1414
            if name in data]
1415

    
1416

    
1417
class LUExtStorageDiagnose(NoHooksLU):
1418
  """Logical unit for ExtStorage diagnose/query.
1419

1420
  """
1421
  REQ_BGL = False
1422

    
1423
  def CheckArguments(self):
1424
    self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
1425
                               self.op.output_fields, False)
1426

    
1427
  def ExpandNames(self):
1428
    self.eq.ExpandNames(self)
1429

    
1430
  def Exec(self, feedback_fn):
1431
    return self.eq.OldStyleQuery(self)
1432

    
1433

    
1434
class LUNodeRemove(LogicalUnit):
1435
  """Logical unit for removing a node.
1436

1437
  """
1438
  HPATH = "node-remove"
1439
  HTYPE = constants.HTYPE_NODE
1440

    
1441
  def BuildHooksEnv(self):
1442
    """Build hooks env.
1443

1444
    """
1445
    return {
1446
      "OP_TARGET": self.op.node_name,
1447
      "NODE_NAME": self.op.node_name,
1448
      }
1449

    
1450
  def BuildHooksNodes(self):
1451
    """Build hooks nodes.
1452

1453
    This doesn't run on the target node in the pre phase as a failed
1454
    node would then be impossible to remove.
1455

1456
    """
1457
    all_nodes = self.cfg.GetNodeList()
1458
    try:
1459
      all_nodes.remove(self.op.node_name)
1460
    except ValueError:
1461
      pass
1462
    return (all_nodes, all_nodes)
1463

    
1464
  def CheckPrereq(self):
1465
    """Check prerequisites.
1466

1467
    This checks:
1468
     - the node exists in the configuration
1469
     - it does not have primary or secondary instances
1470
     - it's not the master
1471

1472
    Any errors are signaled by raising errors.OpPrereqError.
1473

1474
    """
1475
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1476
    node = self.cfg.GetNodeInfo(self.op.node_name)
1477
    assert node is not None
1478

    
1479
    masternode = self.cfg.GetMasterNode()
1480
    if node.name == masternode:
1481
      raise errors.OpPrereqError("Node is the master node, failover to another"
1482
                                 " node is required", errors.ECODE_INVAL)
1483

    
1484
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1485
      if node.name in instance.all_nodes:
1486
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1487
                                   " please remove first" % instance_name,
1488
                                   errors.ECODE_INVAL)
1489
    self.op.node_name = node.name
1490
    self.node = node
1491

    
1492
  def Exec(self, feedback_fn):
1493
    """Removes the node from the cluster.
1494

1495
    """
1496
    node = self.node
1497
    logging.info("Stopping the node daemon and removing configs from node %s",
1498
                 node.name)
1499

    
1500
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1501

    
1502
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1503
      "Not owning BGL"
1504

    
1505
    # Promote nodes to master candidate as needed
1506
    _AdjustCandidatePool(self, exceptions=[node.name])
1507
    self.context.RemoveNode(node.name)
1508

    
1509
    # Run post hooks on the node before it's removed
1510
    _RunPostHook(self, node.name)
1511

    
1512
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1513
    msg = result.fail_msg
1514
    if msg:
1515
      self.LogWarning("Errors encountered on the remote node while leaving"
1516
                      " the cluster: %s", msg)
1517

    
1518
    # Remove node from our /etc/hosts
1519
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1520
      master_node = self.cfg.GetMasterNode()
1521
      result = self.rpc.call_etc_hosts_modify(master_node,
1522
                                              constants.ETC_HOSTS_REMOVE,
1523
                                              node.name, None)
1524
      result.Raise("Can't update hosts file with new host data")
1525
      _RedistributeAncillaryFiles(self)
1526

    
1527

    
1528
class _NodeQuery(_QueryBase):
1529
  FIELDS = query.NODE_FIELDS
1530

    
1531
  def ExpandNames(self, lu):
1532
    lu.needed_locks = {}
1533
    lu.share_locks = _ShareAll()
1534

    
1535
    if self.names:
1536
      self.wanted = _GetWantedNodes(lu, self.names)
1537
    else:
1538
      self.wanted = locking.ALL_SET
1539

    
1540
    self.do_locking = (self.use_locking and
1541
                       query.NQ_LIVE in self.requested_data)
1542

    
1543
    if self.do_locking:
1544
      # If any non-static field is requested we need to lock the nodes
1545
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1546
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1547

    
1548
  def DeclareLocks(self, lu, level):
1549
    pass
1550

    
1551
  def _GetQueryData(self, lu):
1552
    """Computes the list of nodes and their attributes.
1553

1554
    """
1555
    all_info = lu.cfg.GetAllNodesInfo()
1556

    
1557
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1558

    
1559
    # Gather data as requested
1560
    if query.NQ_LIVE in self.requested_data:
1561
      # filter out non-vm_capable nodes
1562
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1563

    
1564
      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1565
      node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
1566
                                        [lu.cfg.GetHypervisorType()], es_flags)
1567
      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1568
                       for (name, nresult) in node_data.items()
1569
                       if not nresult.fail_msg and nresult.payload)
1570
    else:
1571
      live_data = None
1572

    
1573
    if query.NQ_INST in self.requested_data:
1574
      node_to_primary = dict([(name, set()) for name in nodenames])
1575
      node_to_secondary = dict([(name, set()) for name in nodenames])
1576

    
1577
      inst_data = lu.cfg.GetAllInstancesInfo()
1578

    
1579
      for inst in inst_data.values():
1580
        if inst.primary_node in node_to_primary:
1581
          node_to_primary[inst.primary_node].add(inst.name)
1582
        for secnode in inst.secondary_nodes:
1583
          if secnode in node_to_secondary:
1584
            node_to_secondary[secnode].add(inst.name)
1585
    else:
1586
      node_to_primary = None
1587
      node_to_secondary = None
1588

    
1589
    if query.NQ_OOB in self.requested_data:
1590
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
1591
                         for name, node in all_info.iteritems())
1592
    else:
1593
      oob_support = None
1594

    
1595
    if query.NQ_GROUP in self.requested_data:
1596
      groups = lu.cfg.GetAllNodeGroupsInfo()
1597
    else:
1598
      groups = {}
1599

    
1600
    return query.NodeQueryData([all_info[name] for name in nodenames],
1601
                               live_data, lu.cfg.GetMasterNode(),
1602
                               node_to_primary, node_to_secondary, groups,
1603
                               oob_support, lu.cfg.GetClusterInfo())
1604

    
1605

    
1606
class LUNodeQuery(NoHooksLU):
1607
  """Logical unit for querying nodes.
1608

1609
  """
1610
  # pylint: disable=W0142
1611
  REQ_BGL = False
1612

    
1613
  def CheckArguments(self):
1614
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1615
                         self.op.output_fields, self.op.use_locking)
1616

    
1617
  def ExpandNames(self):
1618
    self.nq.ExpandNames(self)
1619

    
1620
  def DeclareLocks(self, level):
1621
    self.nq.DeclareLocks(self, level)
1622

    
1623
  def Exec(self, feedback_fn):
1624
    return self.nq.OldStyleQuery(self)
1625

    
1626

    
1627
class LUNodeQueryvols(NoHooksLU):
1628
  """Logical unit for getting volumes on node(s).
1629

1630
  """
1631
  REQ_BGL = False
1632
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1633
  _FIELDS_STATIC = utils.FieldSet("node")
1634

    
1635
  def CheckArguments(self):
1636
    _CheckOutputFields(static=self._FIELDS_STATIC,
1637
                       dynamic=self._FIELDS_DYNAMIC,
1638
                       selected=self.op.output_fields)
1639

    
1640
  def ExpandNames(self):
1641
    self.share_locks = _ShareAll()
1642

    
1643
    if self.op.nodes:
1644
      self.needed_locks = {
1645
        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1646
        }
1647
    else:
1648
      self.needed_locks = {
1649
        locking.LEVEL_NODE: locking.ALL_SET,
1650
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1651
        }
1652

    
1653
  def Exec(self, feedback_fn):
1654
    """Computes the list of nodes and their attributes.
1655

1656
    """
1657
    nodenames = self.owned_locks(locking.LEVEL_NODE)
1658
    volumes = self.rpc.call_node_volumes(nodenames)
1659

    
1660
    ilist = self.cfg.GetAllInstancesInfo()
1661
    vol2inst = _MapInstanceDisksToNodes(ilist.values())
1662

    
1663
    output = []
1664
    for node in nodenames:
1665
      nresult = volumes[node]
1666
      if nresult.offline:
1667
        continue
1668
      msg = nresult.fail_msg
1669
      if msg:
1670
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1671
        continue
1672

    
1673
      node_vols = sorted(nresult.payload,
1674
                         key=operator.itemgetter("dev"))
1675

    
1676
      for vol in node_vols:
1677
        node_output = []
1678
        for field in self.op.output_fields:
1679
          if field == "node":
1680
            val = node
1681
          elif field == "phys":
1682
            val = vol["dev"]
1683
          elif field == "vg":
1684
            val = vol["vg"]
1685
          elif field == "name":
1686
            val = vol["name"]
1687
          elif field == "size":
1688
            val = int(float(vol["size"]))
1689
          elif field == "instance":
1690
            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1691
          else:
1692
            raise errors.ParameterError(field)
1693
          node_output.append(str(val))
1694

    
1695
        output.append(node_output)
1696

    
1697
    return output
1698

    
1699

    
1700
class LUNodeQueryStorage(NoHooksLU):
1701
  """Logical unit for getting information on storage units on node(s).
1702

1703
  """
1704
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1705
  REQ_BGL = False
1706

    
1707
  def CheckArguments(self):
1708
    _CheckOutputFields(static=self._FIELDS_STATIC,
1709
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1710
                       selected=self.op.output_fields)
1711

    
1712
  def ExpandNames(self):
1713
    self.share_locks = _ShareAll()
1714

    
1715
    if self.op.nodes:
1716
      self.needed_locks = {
1717
        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1718
        }
1719
    else:
1720
      self.needed_locks = {
1721
        locking.LEVEL_NODE: locking.ALL_SET,
1722
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1723
        }
1724

    
1725
  def Exec(self, feedback_fn):
1726
    """Computes the list of nodes and their attributes.
1727

1728
    """
1729
    self.nodes = self.owned_locks(locking.LEVEL_NODE)
1730

    
1731
    # Always get name to sort by
1732
    if constants.SF_NAME in self.op.output_fields:
1733
      fields = self.op.output_fields[:]
1734
    else:
1735
      fields = [constants.SF_NAME] + self.op.output_fields
1736

    
1737
    # Never ask for node or type as it's only known to the LU
1738
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1739
      while extra in fields:
1740
        fields.remove(extra)
1741

    
1742
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1743
    name_idx = field_idx[constants.SF_NAME]
1744

    
1745
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1746
    data = self.rpc.call_storage_list(self.nodes,
1747
                                      self.op.storage_type, st_args,
1748
                                      self.op.name, fields)
1749

    
1750
    result = []
1751

    
1752
    for node in utils.NiceSort(self.nodes):
1753
      nresult = data[node]
1754
      if nresult.offline:
1755
        continue
1756

    
1757
      msg = nresult.fail_msg
1758
      if msg:
1759
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1760
        continue
1761

    
1762
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1763

    
1764
      for name in utils.NiceSort(rows.keys()):
1765
        row = rows[name]
1766

    
1767
        out = []
1768

    
1769
        for field in self.op.output_fields:
1770
          if field == constants.SF_NODE:
1771
            val = node
1772
          elif field == constants.SF_TYPE:
1773
            val = self.op.storage_type
1774
          elif field in field_idx:
1775
            val = row[field_idx[field]]
1776
          else:
1777
            raise errors.ParameterError(field)
1778

    
1779
          out.append(val)
1780

    
1781
        result.append(out)
1782

    
1783
    return result
1784

    
1785

    
1786
class _InstanceQuery(_QueryBase):
1787
  FIELDS = query.INSTANCE_FIELDS
1788

    
1789
  def ExpandNames(self, lu):
1790
    lu.needed_locks = {}
1791
    lu.share_locks = _ShareAll()
1792

    
1793
    if self.names:
1794
      self.wanted = _GetWantedInstances(lu, self.names)
1795
    else:
1796
      self.wanted = locking.ALL_SET
1797

    
1798
    self.do_locking = (self.use_locking and
1799
                       query.IQ_LIVE in self.requested_data)
1800
    if self.do_locking:
1801
      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1802
      lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1803
      lu.needed_locks[locking.LEVEL_NODE] = []
1804
      lu.needed_locks[locking.LEVEL_NETWORK] = []
1805
      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1806

    
1807
    self.do_grouplocks = (self.do_locking and
1808
                          query.IQ_NODES in self.requested_data)
1809

    
1810
  def DeclareLocks(self, lu, level):
1811
    if self.do_locking:
1812
      if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1813
        assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1814

    
1815
        # Lock all groups used by instances optimistically; this requires going
1816
        # via the node before it's locked, requiring verification later on
1817
        lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1818
          set(group_uuid
1819
              for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1820
              for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1821
      elif level == locking.LEVEL_NODE:
1822
        lu._LockInstancesNodes() # pylint: disable=W0212
1823

    
1824
      elif level == locking.LEVEL_NETWORK:
1825
        lu.needed_locks[locking.LEVEL_NETWORK] = \
1826
          frozenset(net_uuid
1827
                    for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1828
                    for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1829

    
1830
  @staticmethod
1831
  def _CheckGroupLocks(lu):
1832
    owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1833
    owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1834

    
1835
    # Check if node groups for locked instances are still correct
1836
    for instance_name in owned_instances:
1837
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1838

    
1839
  def _GetQueryData(self, lu):
1840
    """Computes the list of instances and their attributes.
1841

1842
    """
1843
    if self.do_grouplocks:
1844
      self._CheckGroupLocks(lu)
1845

    
1846
    cluster = lu.cfg.GetClusterInfo()
1847
    all_info = lu.cfg.GetAllInstancesInfo()
1848

    
1849
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1850

    
1851
    instance_list = [all_info[name] for name in instance_names]
1852
    nodes = frozenset(itertools.chain(*(inst.all_nodes
1853
                                        for inst in instance_list)))
1854
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
1855
    bad_nodes = []
1856
    offline_nodes = []
1857
    wrongnode_inst = set()
1858

    
1859
    # Gather data as requested
1860
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1861
      live_data = {}
1862
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1863
      for name in nodes:
1864
        result = node_data[name]
1865
        if result.offline:
1866
          # offline nodes will be in both lists
1867
          assert result.fail_msg
1868
          offline_nodes.append(name)
1869
        if result.fail_msg:
1870
          bad_nodes.append(name)
1871
        elif result.payload:
1872
          for inst in result.payload:
1873
            if inst in all_info:
1874
              if all_info[inst].primary_node == name:
1875
                live_data.update(result.payload)
1876
              else:
1877
                wrongnode_inst.add(inst)
1878
            else:
1879
              # orphan instance; we don't list it here as we don't
1880
              # handle this case yet in the output of instance listing
1881
              logging.warning("Orphan instance '%s' found on node %s",
1882
                              inst, name)
1883
        # else no instance is alive
1884
    else:
1885
      live_data = {}
1886

    
1887
    if query.IQ_DISKUSAGE in self.requested_data:
1888
      gmi = ganeti.masterd.instance
1889
      disk_usage = dict((inst.name,
1890
                         gmi.ComputeDiskSize(inst.disk_template,
1891
                                             [{constants.IDISK_SIZE: disk.size}
1892
                                              for disk in inst.disks]))
1893
                        for inst in instance_list)
1894
    else:
1895
      disk_usage = None
1896

    
1897
    if query.IQ_CONSOLE in self.requested_data:
1898
      consinfo = {}
1899
      for inst in instance_list:
1900
        if inst.name in live_data:
1901
          # Instance is running
1902
          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
1903
        else:
1904
          consinfo[inst.name] = None
1905
      assert set(consinfo.keys()) == set(instance_names)
1906
    else:
1907
      consinfo = None
1908

    
1909
    if query.IQ_NODES in self.requested_data:
1910
      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
1911
                                            instance_list)))
1912
      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
1913
      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
1914
                    for uuid in set(map(operator.attrgetter("group"),
1915
                                        nodes.values())))
1916
    else:
1917
      nodes = None
1918
      groups = None
1919

    
1920
    if query.IQ_NETWORKS in self.requested_data:
1921
      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
1922
                                    for i in instance_list))
1923
      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
1924
    else:
1925
      networks = None
1926

    
1927
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
1928
                                   disk_usage, offline_nodes, bad_nodes,
1929
                                   live_data, wrongnode_inst, consinfo,
1930
                                   nodes, groups, networks)
1931

    
1932

    
1933
class LUQuery(NoHooksLU):
1934
  """Query for resources/items of a certain kind.
1935

1936
  """
1937
  # pylint: disable=W0142
1938
  REQ_BGL = False
1939

    
1940
  def CheckArguments(self):
1941
    qcls = _GetQueryImplementation(self.op.what)
1942

    
1943
    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
1944

    
1945
  def ExpandNames(self):
1946
    self.impl.ExpandNames(self)
1947

    
1948
  def DeclareLocks(self, level):
1949
    self.impl.DeclareLocks(self, level)
1950

    
1951
  def Exec(self, feedback_fn):
1952
    return self.impl.NewStyleQuery(self)
1953

    
1954

    
1955
class LUQueryFields(NoHooksLU):
1956
  """Query for resources/items of a certain kind.
1957

1958
  """
1959
  # pylint: disable=W0142
1960
  REQ_BGL = False
1961

    
1962
  def CheckArguments(self):
1963
    self.qcls = _GetQueryImplementation(self.op.what)
1964

    
1965
  def ExpandNames(self):
1966
    self.needed_locks = {}
1967

    
1968
  def Exec(self, feedback_fn):
1969
    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
1970

    
1971

    
1972
class LUNodeModifyStorage(NoHooksLU):
1973
  """Logical unit for modifying a storage volume on a node.
1974

1975
  """
1976
  REQ_BGL = False
1977

    
1978
  def CheckArguments(self):
1979
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1980

    
1981
    storage_type = self.op.storage_type
1982

    
1983
    try:
1984
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1985
    except KeyError:
1986
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1987
                                 " modified" % storage_type,
1988
                                 errors.ECODE_INVAL)
1989

    
1990
    diff = set(self.op.changes.keys()) - modifiable
1991
    if diff:
1992
      raise errors.OpPrereqError("The following fields can not be modified for"
1993
                                 " storage units of type '%s': %r" %
1994
                                 (storage_type, list(diff)),
1995
                                 errors.ECODE_INVAL)
1996

    
1997
  def ExpandNames(self):
1998
    self.needed_locks = {
1999
      locking.LEVEL_NODE: self.op.node_name,
2000
      }
2001

    
2002
  def Exec(self, feedback_fn):
2003
    """Computes the list of nodes and their attributes.
2004

2005
    """
2006
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2007
    result = self.rpc.call_storage_modify(self.op.node_name,
2008
                                          self.op.storage_type, st_args,
2009
                                          self.op.name, self.op.changes)
2010
    result.Raise("Failed to modify storage unit '%s' on %s" %
2011
                 (self.op.name, self.op.node_name))
2012

    
2013

    
2014
class LUNodeAdd(LogicalUnit):
2015
  """Logical unit for adding node to the cluster.
2016

2017
  """
2018
  HPATH = "node-add"
2019
  HTYPE = constants.HTYPE_NODE
2020
  _NFLAGS = ["master_capable", "vm_capable"]
2021

    
2022
  def CheckArguments(self):
2023
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
2024
    # validate/normalize the node name
2025
    self.hostname = netutils.GetHostname(name=self.op.node_name,
2026
                                         family=self.primary_ip_family)
2027
    self.op.node_name = self.hostname.name
2028

    
2029
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
2030
      raise errors.OpPrereqError("Cannot readd the master node",
2031
                                 errors.ECODE_STATE)
2032

    
2033
    if self.op.readd and self.op.group:
2034
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
2035
                                 " being readded", errors.ECODE_INVAL)
2036

    
2037
  def BuildHooksEnv(self):
2038
    """Build hooks env.
2039

2040
    This will run on all nodes before, and on all nodes + the new node after.
2041

2042
    """
2043
    return {
2044
      "OP_TARGET": self.op.node_name,
2045
      "NODE_NAME": self.op.node_name,
2046
      "NODE_PIP": self.op.primary_ip,
2047
      "NODE_SIP": self.op.secondary_ip,
2048
      "MASTER_CAPABLE": str(self.op.master_capable),
2049
      "VM_CAPABLE": str(self.op.vm_capable),
2050
      }
2051

    
2052
  def BuildHooksNodes(self):
2053
    """Build hooks nodes.
2054

2055
    """
2056
    # Exclude added node
2057
    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
2058
    post_nodes = pre_nodes + [self.op.node_name, ]
2059

    
2060
    return (pre_nodes, post_nodes)
2061

    
2062
  def CheckPrereq(self):
2063
    """Check prerequisites.
2064

2065
    This checks:
2066
     - the new node is not already in the config
2067
     - it is resolvable
2068
     - its parameters (single/dual homed) matches the cluster
2069

2070
    Any errors are signaled by raising errors.OpPrereqError.
2071

2072
    """
2073
    cfg = self.cfg
2074
    hostname = self.hostname
2075
    node = hostname.name
2076
    primary_ip = self.op.primary_ip = hostname.ip
2077
    if self.op.secondary_ip is None:
2078
      if self.primary_ip_family == netutils.IP6Address.family:
2079
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
2080
                                   " IPv4 address must be given as secondary",
2081
                                   errors.ECODE_INVAL)
2082
      self.op.secondary_ip = primary_ip
2083

    
2084
    secondary_ip = self.op.secondary_ip
2085
    if not netutils.IP4Address.IsValid(secondary_ip):
2086
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
2087
                                 " address" % secondary_ip, errors.ECODE_INVAL)
2088

    
2089
    node_list = cfg.GetNodeList()
2090
    if not self.op.readd and node in node_list:
2091
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2092
                                 node, errors.ECODE_EXISTS)
2093
    elif self.op.readd and node not in node_list:
2094
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2095
                                 errors.ECODE_NOENT)
2096

    
2097
    self.changed_primary_ip = False
2098

    
2099
    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
2100
      if self.op.readd and node == existing_node_name:
2101
        if existing_node.secondary_ip != secondary_ip:
2102
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2103
                                     " address configuration as before",
2104
                                     errors.ECODE_INVAL)
2105
        if existing_node.primary_ip != primary_ip:
2106
          self.changed_primary_ip = True
2107

    
2108
        continue
2109

    
2110
      if (existing_node.primary_ip == primary_ip or
2111
          existing_node.secondary_ip == primary_ip or
2112
          existing_node.primary_ip == secondary_ip or
2113
          existing_node.secondary_ip == secondary_ip):
2114
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2115
                                   " existing node %s" % existing_node.name,
2116
                                   errors.ECODE_NOTUNIQUE)
2117

    
2118
    # After this 'if' block, None is no longer a valid value for the
2119
    # _capable op attributes
2120
    if self.op.readd:
2121
      old_node = self.cfg.GetNodeInfo(node)
2122
      assert old_node is not None, "Can't retrieve locked node %s" % node
2123
      for attr in self._NFLAGS:
2124
        if getattr(self.op, attr) is None:
2125
          setattr(self.op, attr, getattr(old_node, attr))
2126
    else:
2127
      for attr in self._NFLAGS:
2128
        if getattr(self.op, attr) is None:
2129
          setattr(self.op, attr, True)
2130

    
2131
    if self.op.readd and not self.op.vm_capable:
2132
      pri, sec = cfg.GetNodeInstances(node)
2133
      if pri or sec:
2134
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
2135
                                   " flag set to false, but it already holds"
2136
                                   " instances" % node,
2137
                                   errors.ECODE_STATE)
2138

    
2139
    # check that the type of the node (single versus dual homed) is the
2140
    # same as for the master
2141
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2142
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2143
    newbie_singlehomed = secondary_ip == primary_ip
2144
    if master_singlehomed != newbie_singlehomed:
2145
      if master_singlehomed:
2146
        raise errors.OpPrereqError("The master has no secondary ip but the"
2147
                                   " new node has one",
2148
                                   errors.ECODE_INVAL)
2149
      else:
2150
        raise errors.OpPrereqError("The master has a secondary ip but the"
2151
                                   " new node doesn't have one",
2152
                                   errors.ECODE_INVAL)
2153

    
2154
    # checks reachability
2155
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2156
      raise errors.OpPrereqError("Node not reachable by ping",
2157
                                 errors.ECODE_ENVIRON)
2158

    
2159
    if not newbie_singlehomed:
2160
      # check reachability from my secondary ip to newbie's secondary ip
2161
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2162
                              source=myself.secondary_ip):
2163
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2164
                                   " based ping to node daemon port",
2165
                                   errors.ECODE_ENVIRON)
2166

    
2167
    if self.op.readd:
2168
      exceptions = [node]
2169
    else:
2170
      exceptions = []
2171

    
2172
    if self.op.master_capable:
2173
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2174
    else:
2175
      self.master_candidate = False
2176

    
2177
    if self.op.readd:
2178
      self.new_node = old_node
2179
    else:
2180
      node_group = cfg.LookupNodeGroup(self.op.group)
2181
      self.new_node = objects.Node(name=node,
2182
                                   primary_ip=primary_ip,
2183
                                   secondary_ip=secondary_ip,
2184
                                   master_candidate=self.master_candidate,
2185
                                   offline=False, drained=False,
2186
                                   group=node_group, ndparams={})
2187

    
2188
    if self.op.ndparams:
2189
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2190
      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
2191
                            "node", "cluster or group")
2192

    
2193
    if self.op.hv_state:
2194
      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
2195

    
2196
    if self.op.disk_state:
2197
      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
2198

    
2199
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
2200
    #       it a property on the base class.
2201
    rpcrunner = rpc.DnsOnlyRunner()
2202
    result = rpcrunner.call_version([node])[node]
2203
    result.Raise("Can't get version information from node %s" % node)
2204
    if constants.PROTOCOL_VERSION == result.payload:
2205
      logging.info("Communication to node %s fine, sw version %s match",
2206
                   node, result.payload)
2207
    else:
2208
      raise errors.OpPrereqError("Version mismatch master version %s,"
2209
                                 " node version %s" %
2210
                                 (constants.PROTOCOL_VERSION, result.payload),
2211
                                 errors.ECODE_ENVIRON)
2212

    
2213
    vg_name = cfg.GetVGName()
2214
    if vg_name is not None:
2215
      vparams = {constants.NV_PVLIST: [vg_name]}
2216
      excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
2217
      cname = self.cfg.GetClusterName()
2218
      result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
2219
      (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
2220
      if errmsgs:
2221
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
2222
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Adds the new node to the cluster.
2226

2227
    """
2228
    new_node = self.new_node
2229
    node = new_node.name
2230

    
2231
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
2232
      "Not owning BGL"
2233

    
2234
    # We adding a new node so we assume it's powered
2235
    new_node.powered = True
2236

    
2237
    # for re-adds, reset the offline/drained/master-candidate flags;
2238
    # we need to reset here, otherwise offline would prevent RPC calls
2239
    # later in the procedure; this also means that if the re-add
2240
    # fails, we are left with a non-offlined, broken node
2241
    if self.op.readd:
2242
      new_node.drained = new_node.offline = False # pylint: disable=W0201
2243
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2244
      # if we demote the node, we do cleanup later in the procedure
2245
      new_node.master_candidate = self.master_candidate
2246
      if self.changed_primary_ip:
2247
        new_node.primary_ip = self.op.primary_ip
2248

    
2249
    # copy the master/vm_capable flags
2250
    for attr in self._NFLAGS:
2251
      setattr(new_node, attr, getattr(self.op, attr))
2252

    
2253
    # notify the user about any possible mc promotion
2254
    if new_node.master_candidate:
2255
      self.LogInfo("Node will be a master candidate")
2256

    
2257
    if self.op.ndparams:
2258
      new_node.ndparams = self.op.ndparams
2259
    else:
2260
      new_node.ndparams = {}
2261

    
2262
    if self.op.hv_state:
2263
      new_node.hv_state_static = self.new_hv_state
2264

    
2265
    if self.op.disk_state:
2266
      new_node.disk_state_static = self.new_disk_state
2267

    
2268
    # Add node to our /etc/hosts, and add key to known_hosts
2269
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2270
      master_node = self.cfg.GetMasterNode()
2271
      result = self.rpc.call_etc_hosts_modify(master_node,
2272
                                              constants.ETC_HOSTS_ADD,
2273
                                              self.hostname.name,
2274
                                              self.hostname.ip)
2275
      result.Raise("Can't update hosts file with new host data")
2276

    
2277
    if new_node.secondary_ip != new_node.primary_ip:
2278
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
2279
                               False)
2280

    
2281
    node_verify_list = [self.cfg.GetMasterNode()]
2282
    node_verify_param = {
2283
      constants.NV_NODELIST: ([node], {}),
2284
      # TODO: do a node-net-test as well?
2285
    }
2286

    
2287
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2288
                                       self.cfg.GetClusterName())
2289
    for verifier in node_verify_list:
2290
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2291
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
2292
      if nl_payload:
2293
        for failed in nl_payload:
2294
          feedback_fn("ssh/hostname verification failed"
2295
                      " (checking from %s): %s" %
2296
                      (verifier, nl_payload[failed]))
2297
        raise errors.OpExecError("ssh/hostname verification failed")
2298

    
2299
    if self.op.readd:
2300
      _RedistributeAncillaryFiles(self)
2301
      self.context.ReaddNode(new_node)
2302
      # make sure we redistribute the config
2303
      self.cfg.Update(new_node, feedback_fn)
2304
      # and make sure the new node will not have old files around
2305
      if not new_node.master_candidate:
2306
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2307
        msg = result.fail_msg
2308
        if msg:
2309
          self.LogWarning("Node failed to demote itself from master"
2310
                          " candidate status: %s" % msg)
2311
    else:
2312
      _RedistributeAncillaryFiles(self, additional_nodes=[node],
2313
                                  additional_vm=self.op.vm_capable)
2314
      self.context.AddNode(new_node, self.proc.GetECId())
2315

    
2316

    
2317
class LUNodeSetParams(LogicalUnit):
2318
  """Modifies the parameters of a node.
2319

2320
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
2321
      to the node role (as _ROLE_*)
2322
  @cvar _R2F: a dictionary from node role to tuples of flags
2323
  @cvar _FLAGS: a list of attribute names corresponding to the flags
2324

2325
  """
2326
  HPATH = "node-modify"
2327
  HTYPE = constants.HTYPE_NODE
2328
  REQ_BGL = False
2329
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
2330
  _F2R = {
2331
    (True, False, False): _ROLE_CANDIDATE,
2332
    (False, True, False): _ROLE_DRAINED,
2333
    (False, False, True): _ROLE_OFFLINE,
2334
    (False, False, False): _ROLE_REGULAR,
2335
    }
2336
  _R2F = dict((v, k) for k, v in _F2R.items())
2337
  _FLAGS = ["master_candidate", "drained", "offline"]
2338

    
2339
  def CheckArguments(self):
2340
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2341
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
2342
                self.op.master_capable, self.op.vm_capable,
2343
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
2344
                self.op.disk_state]
2345
    if all_mods.count(None) == len(all_mods):
2346
      raise errors.OpPrereqError("Please pass at least one modification",
2347
                                 errors.ECODE_INVAL)
2348
    if all_mods.count(True) > 1:
2349
      raise errors.OpPrereqError("Can't set the node into more than one"
2350
                                 " state at the same time",
2351
                                 errors.ECODE_INVAL)
2352

    
2353
    # Boolean value that tells us whether we might be demoting from MC
2354
    self.might_demote = (self.op.master_candidate is False or
2355
                         self.op.offline is True or
2356
                         self.op.drained is True or
2357
                         self.op.master_capable is False)
2358

    
2359
    if self.op.secondary_ip:
2360
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
2361
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
2362
                                   " address" % self.op.secondary_ip,
2363
                                   errors.ECODE_INVAL)
2364

    
2365
    self.lock_all = self.op.auto_promote and self.might_demote
2366
    self.lock_instances = self.op.secondary_ip is not None
2367

    
2368
  def _InstanceFilter(self, instance):
2369
    """Filter for getting affected instances.
2370

2371
    """
2372
    return (instance.disk_template in constants.DTS_INT_MIRROR and
2373
            self.op.node_name in instance.all_nodes)
2374

    
2375
  def ExpandNames(self):
2376
    if self.lock_all:
2377
      self.needed_locks = {
2378
        locking.LEVEL_NODE: locking.ALL_SET,
2379

    
2380
        # Block allocations when all nodes are locked
2381
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2382
        }
2383
    else:
2384
      self.needed_locks = {
2385
        locking.LEVEL_NODE: self.op.node_name,
2386
        }
2387

    
2388
    # Since modifying a node can have severe effects on currently running
2389
    # operations the resource lock is at least acquired in shared mode
2390
    self.needed_locks[locking.LEVEL_NODE_RES] = \
2391
      self.needed_locks[locking.LEVEL_NODE]
2392

    
2393
    # Get all locks except nodes in shared mode; they are not used for anything
2394
    # but read-only access
2395
    self.share_locks = _ShareAll()
2396
    self.share_locks[locking.LEVEL_NODE] = 0
2397
    self.share_locks[locking.LEVEL_NODE_RES] = 0
2398
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
2399

    
2400
    if self.lock_instances:
2401
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2402
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
2403

    
2404
  def BuildHooksEnv(self):
2405
    """Build hooks env.
2406

2407
    This runs on the master node.
2408

2409
    """
2410
    return {
2411
      "OP_TARGET": self.op.node_name,
2412
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2413
      "OFFLINE": str(self.op.offline),
2414
      "DRAINED": str(self.op.drained),
2415
      "MASTER_CAPABLE": str(self.op.master_capable),
2416
      "VM_CAPABLE": str(self.op.vm_capable),
2417
      }
2418

    
2419
  def BuildHooksNodes(self):
2420
    """Build hooks nodes.
2421

2422
    """
2423
    nl = [self.cfg.GetMasterNode(), self.op.node_name]
2424
    return (nl, nl)
2425

    
2426
  def CheckPrereq(self):
2427
    """Check prerequisites.
2428

2429
    This only checks the instance list against the existing names.
2430

2431
    """
2432
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2433

    
2434
    if self.lock_instances:
2435
      affected_instances = \
2436
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
2437

    
2438
      # Verify instance locks
2439
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2440
      wanted_instances = frozenset(affected_instances.keys())
2441
      if wanted_instances - owned_instances:
2442
        raise errors.OpPrereqError("Instances affected by changing node %s's"
2443
                                   " secondary IP address have changed since"
2444
                                   " locks were acquired, wanted '%s', have"
2445
                                   " '%s'; retry the operation" %
2446
                                   (self.op.node_name,
2447
                                    utils.CommaJoin(wanted_instances),
2448
                                    utils.CommaJoin(owned_instances)),
2449
                                   errors.ECODE_STATE)
2450
    else:
2451
      affected_instances = None
2452

    
2453
    if (self.op.master_candidate is not None or
2454
        self.op.drained is not None or
2455
        self.op.offline is not None):
2456
      # we can't change the master's node flags
2457
      if self.op.node_name == self.cfg.GetMasterNode():
2458
        raise errors.OpPrereqError("The master role can be changed"
2459
                                   " only via master-failover",
2460
                                   errors.ECODE_INVAL)
2461

    
2462
    if self.op.master_candidate and not node.master_capable:
2463
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
2464
                                 " it a master candidate" % node.name,
2465
                                 errors.ECODE_STATE)
2466

    
2467
    if self.op.vm_capable is False:
2468
      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
2469
      if ipri or isec:
2470
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
2471
                                   " the vm_capable flag" % node.name,
2472
                                   errors.ECODE_STATE)
2473

    
2474
    if node.master_candidate and self.might_demote and not self.lock_all:
2475
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
2476
      # check if after removing the current node, we're missing master
2477
      # candidates
2478
      (mc_remaining, mc_should, _) = \
2479
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
2480
      if mc_remaining < mc_should:
2481
        raise errors.OpPrereqError("Not enough master candidates, please"
2482
                                   " pass auto promote option to allow"
2483
                                   " promotion (--auto-promote or RAPI"
2484
                                   " auto_promote=True)", errors.ECODE_STATE)
2485

    
2486
    self.old_flags = old_flags = (node.master_candidate,
2487
                                  node.drained, node.offline)
2488
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
2489
    self.old_role = old_role = self._F2R[old_flags]
2490

    
2491
    # Check for ineffective changes
2492
    for attr in self._FLAGS:
2493
      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
2494
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
2495
        setattr(self.op, attr, None)
2496

    
2497
    # Past this point, any flag change to False means a transition
2498
    # away from the respective state, as only real changes are kept
2499

    
2500
    # TODO: We might query the real power state if it supports OOB
2501
    if _SupportsOob(self.cfg, node):
2502
      if self.op.offline is False and not (node.powered or
2503
                                           self.op.powered is True):
2504
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
2505
                                    " offline status can be reset") %
2506
                                   self.op.node_name, errors.ECODE_STATE)
2507
    elif self.op.powered is not None:
2508
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
2509
                                  " as it does not support out-of-band"
2510
                                  " handling") % self.op.node_name,
2511
                                 errors.ECODE_STATE)
2512

    
2513
    # If we're being deofflined/drained, we'll MC ourself if needed
2514
    if (self.op.drained is False or self.op.offline is False or
2515
        (self.op.master_capable and not node.master_capable)):
2516
      if _DecideSelfPromotion(self):
2517
        self.op.master_candidate = True
2518
        self.LogInfo("Auto-promoting node to master candidate")
2519

    
2520
    # If we're no longer master capable, we'll demote ourselves from MC
2521
    if self.op.master_capable is False and node.master_candidate:
2522
      self.LogInfo("Demoting from master candidate")
2523
      self.op.master_candidate = False
2524

    
2525
    # Compute new role
2526
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
2527
    if self.op.master_candidate:
2528
      new_role = self._ROLE_CANDIDATE
2529
    elif self.op.drained:
2530
      new_role = self._ROLE_DRAINED
2531
    elif self.op.offline:
2532
      new_role = self._ROLE_OFFLINE
2533
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
2534
      # False is still in new flags, which means we're un-setting (the
2535
      # only) True flag
2536
      new_role = self._ROLE_REGULAR
2537
    else: # no new flags, nothing, keep old role
2538
      new_role = old_role
2539

    
2540
    self.new_role = new_role
2541

    
2542
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
2543
      # Trying to transition out of offline status
2544
      result = self.rpc.call_version([node.name])[node.name]
2545
      if result.fail_msg:
2546
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
2547
                                   " to report its version: %s" %
2548
                                   (node.name, result.fail_msg),
2549
                                   errors.ECODE_STATE)
2550
      else:
2551
        self.LogWarning("Transitioning node from offline to online state"
2552
                        " without using re-add. Please make sure the node"
2553
                        " is healthy!")
2554

    
2555
    # When changing the secondary ip, verify if this is a single-homed to
2556
    # multi-homed transition or vice versa, and apply the relevant
2557
    # restrictions.
2558
    if self.op.secondary_ip:
2559
      # Ok even without locking, because this can't be changed by any LU
2560
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2561
      master_singlehomed = master.secondary_ip == master.primary_ip
2562
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
2563
        if self.op.force and node.name == master.name:
2564
          self.LogWarning("Transitioning from single-homed to multi-homed"
2565
                          " cluster; all nodes will require a secondary IP"
2566
                          " address")
2567
        else:
2568
          raise errors.OpPrereqError("Changing the secondary ip on a"
2569
                                     " single-homed cluster requires the"
2570
                                     " --force option to be passed, and the"
2571
                                     " target node to be the master",
2572
                                     errors.ECODE_INVAL)
2573
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
2574
        if self.op.force and node.name == master.name:
2575
          self.LogWarning("Transitioning from multi-homed to single-homed"
2576
                          " cluster; secondary IP addresses will have to be"
2577
                          " removed")
2578
        else:
2579
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
2580
                                     " same as the primary IP on a multi-homed"
2581
                                     " cluster, unless the --force option is"
2582
                                     " passed, and the target node is the"
2583
                                     " master", errors.ECODE_INVAL)
2584

    
2585
      assert not (frozenset(affected_instances) -
2586
                  self.owned_locks(locking.LEVEL_INSTANCE))
2587

    
2588
      if node.offline:
2589
        if affected_instances:
2590
          msg = ("Cannot change secondary IP address: offline node has"
2591
                 " instances (%s) configured to use it" %
2592
                 utils.CommaJoin(affected_instances.keys()))
2593
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
2594
      else:
2595
        # On online nodes, check that no instances are running, and that
2596
        # the node has the new ip and we can reach it.
2597
        for instance in affected_instances.values():
2598
          _CheckInstanceState(self, instance, INSTANCE_DOWN,
2599
                              msg="cannot change secondary ip")
2600

    
2601
        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
2602
        if master.name != node.name:
2603
          # check reachability from master secondary ip to new secondary ip
2604
          if not netutils.TcpPing(self.op.secondary_ip,
2605
                                  constants.DEFAULT_NODED_PORT,
2606
                                  source=master.secondary_ip):
2607
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2608
                                       " based ping to node daemon port",
2609
                                       errors.ECODE_ENVIRON)
2610

    
2611
    if self.op.ndparams:
2612
      new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
2613
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
2614
      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
2615
                            "node", "cluster or group")
2616
      self.new_ndparams = new_ndparams
2617

    
2618
    if self.op.hv_state:
2619
      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
2620
                                                 self.node.hv_state_static)
2621

    
2622
    if self.op.disk_state:
2623
      self.new_disk_state = \
2624
        _MergeAndVerifyDiskState(self.op.disk_state,
2625
                                 self.node.disk_state_static)
2626

    
2627
  def Exec(self, feedback_fn):
2628
    """Modifies a node.
2629

2630
    """
2631
    node = self.node
2632
    old_role = self.old_role
2633
    new_role = self.new_role
2634

    
2635
    result = []
2636

    
2637
    if self.op.ndparams:
2638
      node.ndparams = self.new_ndparams
2639

    
2640
    if self.op.powered is not None:
2641
      node.powered = self.op.powered
2642

    
2643
    if self.op.hv_state:
2644
      node.hv_state_static = self.new_hv_state
2645

    
2646
    if self.op.disk_state:
2647
      node.disk_state_static = self.new_disk_state
2648

    
2649
    for attr in ["master_capable", "vm_capable"]:
2650
      val = getattr(self.op, attr)
2651
      if val is not None:
2652
        setattr(node, attr, val)
2653
        result.append((attr, str(val)))
2654

    
2655
    if new_role != old_role:
2656
      # Tell the node to demote itself, if no longer MC and not offline
2657
      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
2658
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
2659
        if msg:
2660
          self.LogWarning("Node failed to demote itself: %s", msg)
2661

    
2662
      new_flags = self._R2F[new_role]
2663
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
2664
        if of != nf:
2665
          result.append((desc, str(nf)))
2666
      (node.master_candidate, node.drained, node.offline) = new_flags
2667

    
2668
      # we locked all nodes, we adjust the CP before updating this node
2669
      if self.lock_all:
2670
        _AdjustCandidatePool(self, [node.name])
2671

    
2672
    if self.op.secondary_ip:
2673
      node.secondary_ip = self.op.secondary_ip
2674
      result.append(("secondary_ip", self.op.secondary_ip))
2675

    
2676
    # this will trigger configuration file update, if needed
2677
    self.cfg.Update(node, feedback_fn)
2678

    
2679
    # this will trigger job queue propagation or cleanup if the mc
2680
    # flag changed
2681
    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
2682
      self.context.ReaddNode(node)
2683

    
2684
    return result
2685

    
2686

    
2687
class LUNodePowercycle(NoHooksLU):
2688
  """Powercycles a node.
2689

2690
  """
2691
  REQ_BGL = False
2692

    
2693
  def CheckArguments(self):
2694
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2695
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
2696
      raise errors.OpPrereqError("The node is the master and the force"
2697
                                 " parameter was not set",
2698
                                 errors.ECODE_INVAL)
2699

    
2700
  def ExpandNames(self):
2701
    """Locking for PowercycleNode.
2702

2703
    This is a last-resort option and shouldn't block on other
2704
    jobs. Therefore, we grab no locks.
2705

2706
    """
2707
    self.needed_locks = {}
2708

    
2709
  def Exec(self, feedback_fn):
2710
    """Reboots a node.
2711

2712
    """
2713
    result = self.rpc.call_node_powercycle(self.op.node_name,
2714
                                           self.cfg.GetHypervisorType())
2715
    result.Raise("Failed to schedule the reboot")
2716
    return result.payload
2717

    
2718

    
2719
class LUInstanceActivateDisks(NoHooksLU):
2720
  """Bring up an instance's disks.
2721

2722
  """
2723
  REQ_BGL = False
2724

    
2725
  def ExpandNames(self):
2726
    self._ExpandAndLockInstance()
2727
    self.needed_locks[locking.LEVEL_NODE] = []
2728
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2729

    
2730
  def DeclareLocks(self, level):
2731
    if level == locking.LEVEL_NODE:
2732
      self._LockInstancesNodes()
2733

    
2734
  def CheckPrereq(self):
2735
    """Check prerequisites.
2736

2737
    This checks that the instance is in the cluster.
2738

2739
    """
2740
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2741
    assert self.instance is not None, \
2742
      "Cannot retrieve locked instance %s" % self.op.instance_name
2743
    _CheckNodeOnline(self, self.instance.primary_node)
2744

    
2745
  def Exec(self, feedback_fn):
2746
    """Activate the disks.
2747

2748
    """
2749
    disks_ok, disks_info = \
2750
              _AssembleInstanceDisks(self, self.instance,
2751
                                     ignore_size=self.op.ignore_size)
2752
    if not disks_ok:
2753
      raise errors.OpExecError("Cannot activate block devices")
2754

    
2755
    if self.op.wait_for_sync:
2756
      if not _WaitForSync(self, self.instance):
2757
        raise errors.OpExecError("Some disks of the instance are degraded!")
2758

    
2759
    return disks_info
2760

    
2761

    
2762
def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
2763
                           ignore_size=False):
2764
  """Prepare the block devices for an instance.
2765

2766
  This sets up the block devices on all nodes.
2767

2768
  @type lu: L{LogicalUnit}
2769
  @param lu: the logical unit on whose behalf we execute
2770
  @type instance: L{objects.Instance}
2771
  @param instance: the instance for whose disks we assemble
2772
  @type disks: list of L{objects.Disk} or None
2773
  @param disks: which disks to assemble (or all, if None)
2774
  @type ignore_secondaries: boolean
2775
  @param ignore_secondaries: if true, errors on secondary nodes
2776
      won't result in an error return from the function
2777
  @type ignore_size: boolean
2778
  @param ignore_size: if true, the current known size of the disk
2779
      will not be used during the disk activation, useful for cases
2780
      when the size is wrong
2781
  @return: False if the operation failed, otherwise a list of
2782
      (host, instance_visible_name, node_visible_name)
2783
      with the mapping from node devices to instance devices
2784

2785
  """
2786
  device_info = []
2787
  disks_ok = True
2788
  iname = instance.name
2789
  disks = _ExpandCheckDisks(instance, disks)
2790

    
2791
  # With the two passes mechanism we try to reduce the window of
2792
  # opportunity for the race condition of switching DRBD to primary
2793
  # before handshaking occured, but we do not eliminate it
2794

    
2795
  # The proper fix would be to wait (with some limits) until the
2796
  # connection has been made and drbd transitions from WFConnection
2797
  # into any other network-connected state (Connected, SyncTarget,
2798
  # SyncSource, etc.)
2799

    
2800
  # 1st pass, assemble on all nodes in secondary mode
2801
  for idx, inst_disk in enumerate(disks):
2802
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2803
      if ignore_size:
2804
        node_disk = node_disk.Copy()
2805
        node_disk.UnsetSize()
2806
      lu.cfg.SetDiskID(node_disk, node)
2807
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
2808
                                             False, idx)
2809
      msg = result.fail_msg
2810
      if msg:
2811
        is_offline_secondary = (node in instance.secondary_nodes and
2812
                                result.offline)
2813
        lu.LogWarning("Could not prepare block device %s on node %s"
2814
                      " (is_primary=False, pass=1): %s",
2815
                      inst_disk.iv_name, node, msg)
2816
        if not (ignore_secondaries or is_offline_secondary):
2817
          disks_ok = False
2818

    
2819
  # FIXME: race condition on drbd migration to primary
2820

    
2821
  # 2nd pass, do only the primary node
2822
  for idx, inst_disk in enumerate(disks):
2823
    dev_path = None
2824

    
2825
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2826
      if node != instance.primary_node:
2827
        continue
2828
      if ignore_size:
2829
        node_disk = node_disk.Copy()
2830
        node_disk.UnsetSize()
2831
      lu.cfg.SetDiskID(node_disk, node)
2832
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
2833
                                             True, idx)
2834
      msg = result.fail_msg
2835
      if msg:
2836
        lu.LogWarning("Could not prepare block device %s on node %s"
2837
                      " (is_primary=True, pass=2): %s",
2838
                      inst_disk.iv_name, node, msg)
2839
        disks_ok = False
2840
      else:
2841
        dev_path = result.payload
2842

    
2843
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
2844

    
2845
  # leave the disks configured for the primary node
2846
  # this is a workaround that would be fixed better by
2847
  # improving the logical/physical id handling
2848
  for disk in disks:
2849
    lu.cfg.SetDiskID(disk, instance.primary_node)
2850

    
2851
  return disks_ok, device_info
2852

    
2853

    
2854
def _StartInstanceDisks(lu, instance, force):
2855
  """Start the disks of an instance.
2856

2857
  """
2858
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2859
                                           ignore_secondaries=force)
2860
  if not disks_ok:
2861
    _ShutdownInstanceDisks(lu, instance)
2862
    if force is not None and not force:
2863
      lu.LogWarning("",
2864
                    hint=("If the message above refers to a secondary node,"
2865
                          " you can retry the operation using '--force'"))
2866
    raise errors.OpExecError("Disk consistency error")
2867

    
2868

    
2869
class LUInstanceDeactivateDisks(NoHooksLU):
2870
  """Shutdown an instance's disks.
2871

2872
  """
2873
  REQ_BGL = False
2874

    
2875
  def ExpandNames(self):
2876
    self._ExpandAndLockInstance()
2877
    self.needed_locks[locking.LEVEL_NODE] = []
2878
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2879

    
2880
  def DeclareLocks(self, level):
2881
    if level == locking.LEVEL_NODE:
2882
      self._LockInstancesNodes()
2883

    
2884
  def CheckPrereq(self):
2885
    """Check prerequisites.
2886

2887
    This checks that the instance is in the cluster.
2888

2889
    """
2890
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2891
    assert self.instance is not None, \
2892
      "Cannot retrieve locked instance %s" % self.op.instance_name
2893

    
2894
  def Exec(self, feedback_fn):
2895
    """Deactivate the disks
2896

2897
    """
2898
    instance = self.instance
2899
    if self.op.force:
2900
      _ShutdownInstanceDisks(self, instance)
2901
    else:
2902
      _SafeShutdownInstanceDisks(self, instance)
2903

    
2904

    
2905
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
2906
  """Shutdown block devices of an instance.
2907

2908
  This function checks if an instance is running, before calling
2909
  _ShutdownInstanceDisks.
2910

2911
  """
2912
  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
2913
  _ShutdownInstanceDisks(lu, instance, disks=disks)
2914

    
2915

    
2916
def _ExpandCheckDisks(instance, disks):
2917
  """Return the instance disks selected by the disks list
2918

2919
  @type disks: list of L{objects.Disk} or None
2920
  @param disks: selected disks
2921
  @rtype: list of L{objects.Disk}
2922
  @return: selected instance disks to act on
2923

2924
  """
2925
  if disks is None:
2926
    return instance.disks
2927
  else:
2928
    if not set(disks).issubset(instance.disks):
2929
      raise errors.ProgrammerError("Can only act on disks belonging to the"
2930
                                   " target instance")
2931
    return disks
2932

    
2933

    
2934
def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
2935
  """Shutdown block devices of an instance.
2936

2937
  This does the shutdown on all nodes of the instance.
2938

2939
  If the ignore_primary is false, errors on the primary node are
2940
  ignored.
2941

2942
  """
2943
  all_result = True
2944
  disks = _ExpandCheckDisks(instance, disks)
2945

    
2946
  for disk in disks:
2947
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2948
      lu.cfg.SetDiskID(top_disk, node)
2949
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
2950
      msg = result.fail_msg
2951
      if msg:
2952
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2953
                      disk.iv_name, node, msg)
2954
        if ((node == instance.primary_node and not ignore_primary) or
2955
            (node != instance.primary_node and not result.offline)):
2956
          all_result = False
2957
  return all_result
2958

    
2959

    
2960
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2961
  """Checks if a node has enough free memory.
2962

2963
  This function checks if a given node has the needed amount of free
2964
  memory. In case the node has less memory or we cannot get the
2965
  information from the node, this function raises an OpPrereqError
2966
  exception.
2967

2968
  @type lu: C{LogicalUnit}
2969
  @param lu: a logical unit from which we get configuration data
2970
  @type node: C{str}
2971
  @param node: the node to check
2972
  @type reason: C{str}
2973
  @param reason: string to use in the error message
2974
  @type requested: C{int}
2975
  @param requested: the amount of memory in MiB to check for
2976
  @type hypervisor_name: C{str}
2977
  @param hypervisor_name: the hypervisor to ask for memory stats
2978
  @rtype: integer
2979
  @return: node current free memory
2980
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2981
      we cannot check the node
2982

2983
  """
2984
  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
2985
  nodeinfo[node].Raise("Can't get data from node %s" % node,
2986
                       prereq=True, ecode=errors.ECODE_ENVIRON)
2987
  (_, _, (hv_info, )) = nodeinfo[node].payload
2988

    
2989
  free_mem = hv_info.get("memory_free", None)
2990
  if not isinstance(free_mem, int):
2991
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2992
                               " was '%s'" % (node, free_mem),
2993
                               errors.ECODE_ENVIRON)
2994
  if requested > free_mem:
2995
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2996
                               " needed %s MiB, available %s MiB" %
2997
                               (node, reason, requested, free_mem),
2998
                               errors.ECODE_NORES)
2999
  return free_mem
3000

    
3001

    
3002
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
3003
  """Checks if nodes have enough free disk space in all the VGs.
3004

3005
  This function checks if all given nodes have the needed amount of
3006
  free disk. In case any node has less disk or we cannot get the
3007
  information from the node, this function raises an OpPrereqError
3008
  exception.
3009

3010
  @type lu: C{LogicalUnit}
3011
  @param lu: a logical unit from which we get configuration data
3012
  @type nodenames: C{list}
3013
  @param nodenames: the list of node names to check
3014
  @type req_sizes: C{dict}
3015
  @param req_sizes: the hash of vg and corresponding amount of disk in
3016
      MiB to check for
3017
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
3018
      or we cannot check the node
3019

3020
  """
3021
  for vg, req_size in req_sizes.items():
3022
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
3023

    
3024

    
3025
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
3026
  """Checks if nodes have enough free disk space in the specified VG.
3027

3028
  This function checks if all given nodes have the needed amount of
3029
  free disk. In case any node has less disk or we cannot get the
3030
  information from the node, this function raises an OpPrereqError
3031
  exception.
3032

3033
  @type lu: C{LogicalUnit}
3034
  @param lu: a logical unit from which we get configuration data
3035
  @type nodenames: C{list}
3036
  @param nodenames: the list of node names to check
3037
  @type vg: C{str}
3038
  @param vg: the volume group to check
3039
  @type requested: C{int}
3040
  @param requested: the amount of disk in MiB to check for
3041
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
3042
      or we cannot check the node
3043

3044
  """
3045
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
3046
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
3047
  for node in nodenames:
3048
    info = nodeinfo[node]
3049
    info.Raise("Cannot get current information from node %s" % node,
3050
               prereq=True, ecode=errors.ECODE_ENVIRON)
3051
    (_, (vg_info, ), _) = info.payload
3052
    vg_free = vg_info.get("vg_free", None)
3053
    if not isinstance(vg_free, int):
3054
      raise errors.OpPrereqError("Can't compute free disk space on node"
3055
                                 " %s for vg %s, result was '%s'" %
3056
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
3057
    if requested > vg_free:
3058
      raise errors.OpPrereqError("Not enough disk space on target node %s"
3059
                                 " vg %s: required %d MiB, available %d MiB" %
3060
                                 (node, vg, requested, vg_free),
3061
                                 errors.ECODE_NORES)
3062

    
3063

    
3064
def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
3065
  """Checks if nodes have enough physical CPUs
3066

3067
  This function checks if all given nodes have the needed number of
3068
  physical CPUs. In case any node has less CPUs or we cannot get the
3069
  information from the node, this function raises an OpPrereqError
3070
  exception.
3071

3072
  @type lu: C{LogicalUnit}
3073
  @param lu: a logical unit from which we get configuration data
3074
  @type nodenames: C{list}
3075
  @param nodenames: the list of node names to check
3076
  @type requested: C{int}
3077
  @param requested: the minimum acceptable number of physical CPUs
3078
  @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
3079
      or we cannot check the node
3080

3081
  """
3082
  nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
3083
  for node in nodenames:
3084
    info = nodeinfo[node]
3085
    info.Raise("Cannot get current information from node %s" % node,
3086
               prereq=True, ecode=errors.ECODE_ENVIRON)
3087
    (_, _, (hv_info, )) = info.payload
3088
    num_cpus = hv_info.get("cpu_total", None)
3089
    if not isinstance(num_cpus, int):
3090
      raise errors.OpPrereqError("Can't compute the number of physical CPUs"
3091
                                 " on node %s, result was '%s'" %
3092
                                 (node, num_cpus), errors.ECODE_ENVIRON)
3093
    if requested > num_cpus:
3094
      raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
3095
                                 "required" % (node, num_cpus, requested),
3096
                                 errors.ECODE_NORES)
3097

    
3098

    
3099
class LUInstanceStartup(LogicalUnit):
3100
  """Starts an instance.
3101

3102
  """
3103
  HPATH = "instance-start"
3104
  HTYPE = constants.HTYPE_INSTANCE
3105
  REQ_BGL = False
3106

    
3107
  def CheckArguments(self):
3108
    # extra beparams
3109
    if self.op.beparams:
3110
      # fill the beparams dict
3111
      objects.UpgradeBeParams(self.op.beparams)
3112
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3113

    
3114
  def ExpandNames(self):
3115
    self._ExpandAndLockInstance()
3116
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3117

    
3118
  def DeclareLocks(self, level):
3119
    if level == locking.LEVEL_NODE_RES:
3120
      self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
3121

    
3122
  def BuildHooksEnv(self):
3123
    """Build hooks env.
3124

3125
    This runs on master, primary and secondary nodes of the instance.
3126

3127
    """
3128
    env = {
3129
      "FORCE": self.op.force,
3130
      }
3131

    
3132
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3133

    
3134
    return env
3135

    
3136
  def BuildHooksNodes(self):
3137
    """Build hooks nodes.
3138

3139
    """
3140
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3141
    return (nl, nl)
3142

    
3143
  def CheckPrereq(self):
3144
    """Check prerequisites.
3145

3146
    This checks that the instance is in the cluster.
3147

3148
    """
3149
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3150
    assert self.instance is not None, \
3151
      "Cannot retrieve locked instance %s" % self.op.instance_name
3152

    
3153
    # extra hvparams
3154
    if self.op.hvparams:
3155
      # check hypervisor parameter syntax (locally)
3156
      cluster = self.cfg.GetClusterInfo()
3157
      utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
3158
      filled_hvp = cluster.FillHV(instance)
3159
      filled_hvp.update(self.op.hvparams)
3160
      hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
3161
      hv_type.CheckParameterSyntax(filled_hvp)
3162
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3163

    
3164
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
3165

    
3166
    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
3167

    
3168
    if self.primary_offline and self.op.ignore_offline_nodes:
3169
      self.LogWarning("Ignoring offline primary node")
3170

    
3171
      if self.op.hvparams or self.op.beparams:
3172
        self.LogWarning("Overridden parameters are ignored")
3173
    else:
3174
      _CheckNodeOnline(self, instance.primary_node)
3175

    
3176
      bep = self.cfg.GetClusterInfo().FillBE(instance)
3177
      bep.update(self.op.beparams)
3178

    
3179
      # check bridges existence
3180
      _CheckInstanceBridgesExist(self, instance)
3181

    
3182
      remote_info = self.rpc.call_instance_info(instance.primary_node,
3183
                                                instance.name,
3184
                                                instance.hypervisor)
3185
      remote_info.Raise("Error checking node %s" % instance.primary_node,
3186
                        prereq=True, ecode=errors.ECODE_ENVIRON)
3187
      if not remote_info.payload: # not running already
3188
        _CheckNodeFreeMemory(self, instance.primary_node,
3189
                             "starting instance %s" % instance.name,
3190
                             bep[constants.BE_MINMEM], instance.hypervisor)
3191

    
3192
  def Exec(self, feedback_fn):
3193
    """Start the instance.
3194

3195
    """
3196
    instance = self.instance
3197
    force = self.op.force
3198
    reason = self.op.reason
3199

    
3200
    if not self.op.no_remember:
3201
      self.cfg.MarkInstanceUp(instance.name)
3202

    
3203
    if self.primary_offline:
3204
      assert self.op.ignore_offline_nodes
3205
      self.LogInfo("Primary node offline, marked instance as started")
3206
    else:
3207
      node_current = instance.primary_node
3208

    
3209
      _StartInstanceDisks(self, instance, force)
3210

    
3211
      result = \
3212
        self.rpc.call_instance_start(node_current,
3213
                                     (instance, self.op.hvparams,
3214
                                      self.op.beparams),
3215
                                     self.op.startup_paused, reason)
3216
      msg = result.fail_msg
3217
      if msg:
3218
        _ShutdownInstanceDisks(self, instance)
3219
        raise errors.OpExecError("Could not start instance: %s" % msg)
3220

    
3221

    
3222
class LUInstanceReboot(LogicalUnit):
3223
  """Reboot an instance.
3224

3225
  """
3226
  HPATH = "instance-reboot"
3227
  HTYPE = constants.HTYPE_INSTANCE
3228
  REQ_BGL = False
3229

    
3230
  def ExpandNames(self):
3231
    self._ExpandAndLockInstance()
3232

    
3233
  def BuildHooksEnv(self):
3234
    """Build hooks env.
3235

3236
    This runs on master, primary and secondary nodes of the instance.
3237

3238
    """
3239
    env = {
3240
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3241
      "REBOOT_TYPE": self.op.reboot_type,
3242
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
3243
      }
3244

    
3245
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3246

    
3247
    return env
3248

    
3249
  def BuildHooksNodes(self):
3250
    """Build hooks nodes.
3251

3252
    """
3253
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3254
    return (nl, nl)
3255

    
3256
  def CheckPrereq(self):
3257
    """Check prerequisites.
3258

3259
    This checks that the instance is in the cluster.
3260

3261
    """
3262
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3263
    assert self.instance is not None, \
3264
      "Cannot retrieve locked instance %s" % self.op.instance_name
3265
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
3266
    _CheckNodeOnline(self, instance.primary_node)
3267

    
3268
    # check bridges existence
3269
    _CheckInstanceBridgesExist(self, instance)
3270

    
3271
  def Exec(self, feedback_fn):
3272
    """Reboot the instance.
3273

3274
    """
3275
    instance = self.instance
3276
    ignore_secondaries = self.op.ignore_secondaries
3277
    reboot_type = self.op.reboot_type
3278
    reason = self.op.reason
3279

    
3280
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3281
                                              instance.name,
3282
                                              instance.hypervisor)
3283
    remote_info.Raise("Error checking node %s" % instance.primary_node)
3284
    instance_running = bool(remote_info.payload)
3285

    
3286
    node_current = instance.primary_node
3287

    
3288
    if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3289
                                            constants.INSTANCE_REBOOT_HARD]:
3290
      for disk in instance.disks:
3291
        self.cfg.SetDiskID(disk, node_current)
3292
      result = self.rpc.call_instance_reboot(node_current, instance,
3293
                                             reboot_type,
3294
                                             self.op.shutdown_timeout, reason)
3295
      result.Raise("Could not reboot instance")
3296
    else:
3297
      if instance_running:
3298
        result = self.rpc.call_instance_shutdown(node_current, instance,
3299
                                                 self.op.shutdown_timeout,
3300
                                                 reason)
3301
        result.Raise("Could not shutdown instance for full reboot")
3302
        _ShutdownInstanceDisks(self, instance)
3303
      else:
3304
        self.LogInfo("Instance %s was already stopped, starting now",
3305
                     instance.name)
3306
      _StartInstanceDisks(self, instance, ignore_secondaries)
3307
      result = self.rpc.call_instance_start(node_current,
3308
                                            (instance, None, None), False,
3309
                                             reason)
3310
      msg = result.fail_msg
3311
      if msg:
3312
        _ShutdownInstanceDisks(self, instance)
3313
        raise errors.OpExecError("Could not start instance for"
3314
                                 " full reboot: %s" % msg)
3315

    
3316
    self.cfg.MarkInstanceUp(instance.name)
3317

    
3318

    
3319
class LUInstanceShutdown(LogicalUnit):
3320
  """Shutdown an instance.
3321

3322
  """
3323
  HPATH = "instance-stop"
3324
  HTYPE = constants.HTYPE_INSTANCE
3325
  REQ_BGL = False
3326

    
3327
  def ExpandNames(self):
3328
    self._ExpandAndLockInstance()
3329

    
3330
  def BuildHooksEnv(self):
3331
    """Build hooks env.
3332

3333
    This runs on master, primary and secondary nodes of the instance.
3334

3335
    """
3336
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3337
    env["TIMEOUT"] = self.op.timeout
3338
    return env
3339

    
3340
  def BuildHooksNodes(self):
3341
    """Build hooks nodes.
3342

3343
    """
3344
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3345
    return (nl, nl)
3346

    
3347
  def CheckPrereq(self):
3348
    """Check prerequisites.
3349

3350
    This checks that the instance is in the cluster.
3351

3352
    """
3353
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3354
    assert self.instance is not None, \
3355
      "Cannot retrieve locked instance %s" % self.op.instance_name
3356

    
3357
    if not self.op.force:
3358
      _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
3359
    else:
3360
      self.LogWarning("Ignoring offline instance check")
3361

    
3362
    self.primary_offline = \
3363
      self.cfg.GetNodeInfo(self.instance.primary_node).offline
3364

    
3365
    if self.primary_offline and self.op.ignore_offline_nodes:
3366
      self.LogWarning("Ignoring offline primary node")
3367
    else:
3368
      _CheckNodeOnline(self, self.instance.primary_node)
3369

    
3370
  def Exec(self, feedback_fn):
3371
    """Shutdown the instance.
3372

3373
    """
3374
    instance = self.instance
3375
    node_current = instance.primary_node
3376
    timeout = self.op.timeout
3377
    reason = self.op.reason
3378

    
3379
    # If the instance is offline we shouldn't mark it as down, as that
3380
    # resets the offline flag.
3381
    if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
3382
      self.cfg.MarkInstanceDown(instance.name)
3383

    
3384
    if self.primary_offline:
3385
      assert self.op.ignore_offline_nodes
3386
      self.LogInfo("Primary node offline, marked instance as stopped")
3387
    else:
3388
      result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
3389
                                               reason)
3390
      msg = result.fail_msg
3391
      if msg:
3392
        self.LogWarning("Could not shutdown instance: %s", msg)
3393

    
3394
      _ShutdownInstanceDisks(self, instance)
3395

    
3396

    
3397
class LUInstanceReinstall(LogicalUnit):
3398
  """Reinstall an instance.
3399

3400
  """
3401
  HPATH = "instance-reinstall"
3402
  HTYPE = constants.HTYPE_INSTANCE
3403
  REQ_BGL = False
3404

    
3405
  def ExpandNames(self):
3406
    self._ExpandAndLockInstance()
3407

    
3408
  def BuildHooksEnv(self):
3409
    """Build hooks env.
3410

3411
    This runs on master, primary and secondary nodes of the instance.
3412

3413
    """
3414
    return _BuildInstanceHookEnvByObject(self, self.instance)
3415

    
3416
  def BuildHooksNodes(self):
3417
    """Build hooks nodes.
3418

3419
    """
3420
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3421
    return (nl, nl)
3422

    
3423
  def CheckPrereq(self):
3424
    """Check prerequisites.
3425

3426
    This checks that the instance is in the cluster and is not running.
3427

3428
    """
3429
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3430
    assert instance is not None, \
3431
      "Cannot retrieve locked instance %s" % self.op.instance_name
3432
    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
3433
                     " offline, cannot reinstall")
3434

    
3435
    if instance.disk_template == constants.DT_DISKLESS:
3436
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3437
                                 self.op.instance_name,
3438
                                 errors.ECODE_INVAL)
3439
    _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
3440

    
3441
    if self.op.os_type is not None:
3442
      # OS verification
3443
      pnode = _ExpandNodeName(self.cfg, instance.primary_node)
3444
      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
3445
      instance_os = self.op.os_type
3446
    else:
3447
      instance_os = instance.os
3448

    
3449
    nodelist = list(instance.all_nodes)
3450

    
3451
    if self.op.osparams:
3452
      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
3453
      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
3454
      self.os_inst = i_osdict # the new dict (without defaults)
3455
    else:
3456
      self.os_inst = None
3457

    
3458
    self.instance = instance
3459

    
3460
  def Exec(self, feedback_fn):
3461
    """Reinstall the instance.
3462

3463
    """
3464
    inst = self.instance
3465

    
3466
    if self.op.os_type is not None:
3467
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3468
      inst.os = self.op.os_type
3469
      # Write to configuration
3470
      self.cfg.Update(inst, feedback_fn)
3471

    
3472
    _StartInstanceDisks(self, inst, None)
3473
    try:
3474
      feedback_fn("Running the instance OS create scripts...")
3475
      # FIXME: pass debug option from opcode to backend
3476
      result = self.rpc.call_instance_os_add(inst.primary_node,
3477
                                             (inst, self.os_inst), True,
3478
                                             self.op.debug_level)
3479
      result.Raise("Could not install OS for instance %s on node %s" %
3480
                   (inst.name, inst.primary_node))
3481
    finally:
3482
      _ShutdownInstanceDisks(self, inst)
3483

    
3484

    
3485
class LUInstanceRecreateDisks(LogicalUnit):
3486
  """Recreate an instance's missing disks.
3487

3488
  """
3489
  HPATH = "instance-recreate-disks"
3490
  HTYPE = constants.HTYPE_INSTANCE
3491
  REQ_BGL = False
3492

    
3493
  _MODIFYABLE = compat.UniqueFrozenset([
3494
    constants.IDISK_SIZE,
3495
    constants.IDISK_MODE,
3496
    ])
3497

    
3498
  # New or changed disk parameters may have different semantics
3499
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
3500
    constants.IDISK_ADOPT,
3501

    
3502
    # TODO: Implement support changing VG while recreating
3503
    constants.IDISK_VG,
3504
    constants.IDISK_METAVG,
3505
    constants.IDISK_PROVIDER,
3506
    constants.IDISK_NAME,
3507
    ]))
3508

    
3509
  def _RunAllocator(self):
3510
    """Run the allocator based on input opcode.
3511

3512
    """
3513
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
3514

    
3515
    # FIXME
3516
    # The allocator should actually run in "relocate" mode, but current
3517
    # allocators don't support relocating all the nodes of an instance at
3518
    # the same time. As a workaround we use "allocate" mode, but this is
3519
    # suboptimal for two reasons:
3520
    # - The instance name passed to the allocator is present in the list of
3521
    #   existing instances, so there could be a conflict within the
3522
    #   internal structures of the allocator. This doesn't happen with the
3523
    #   current allocators, but it's a liability.
3524
    # - The allocator counts the resources used by the instance twice: once
3525
    #   because the instance exists already, and once because it tries to
3526
    #   allocate a new instance.
3527
    # The allocator could choose some of the nodes on which the instance is
3528
    # running, but that's not a problem. If the instance nodes are broken,
3529
    # they should be already be marked as drained or offline, and hence
3530
    # skipped by the allocator. If instance disks have been lost for other
3531
    # reasons, then recreating the disks on the same nodes should be fine.
3532
    disk_template = self.instance.disk_template
3533
    spindle_use = be_full[constants.BE_SPINDLE_USE]
3534
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
3535
                                        disk_template=disk_template,
3536
                                        tags=list(self.instance.GetTags()),
3537
                                        os=self.instance.os,
3538
                                        nics=[{}],
3539
                                        vcpus=be_full[constants.BE_VCPUS],
3540
                                        memory=be_full[constants.BE_MAXMEM],
3541
                                        spindle_use=spindle_use,
3542
                                        disks=[{constants.IDISK_SIZE: d.size,
3543
                                                constants.IDISK_MODE: d.mode}
3544
                                                for d in self.instance.disks],
3545
                                        hypervisor=self.instance.hypervisor,
3546
                                        node_whitelist=None)
3547
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3548

    
3549
    ial.Run(self.op.iallocator)
3550

    
3551
    assert req.RequiredNodes() == len(self.instance.all_nodes)
3552

    
3553
    if not ial.success:
3554
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
3555
                                 " %s" % (self.op.iallocator, ial.info),
3556
                                 errors.ECODE_NORES)
3557

    
3558
    self.op.nodes = ial.result
3559
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3560
                 self.op.instance_name, self.op.iallocator,
3561
                 utils.CommaJoin(ial.result))
3562

    
3563
  def CheckArguments(self):
3564
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
3565
      # Normalize and convert deprecated list of disk indices
3566
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
3567

    
3568
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
3569
    if duplicates:
3570
      raise errors.OpPrereqError("Some disks have been specified more than"
3571
                                 " once: %s" % utils.CommaJoin(duplicates),
3572
                                 errors.ECODE_INVAL)
3573

    
3574
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
3575
    # when neither iallocator nor nodes are specified
3576
    if self.op.iallocator or self.op.nodes:
3577
      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
3578

    
3579
    for (idx, params) in self.op.disks:
3580
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
3581
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
3582
      if unsupported:
3583
        raise errors.OpPrereqError("Parameters for disk %s try to change"
3584
                                   " unmodifyable parameter(s): %s" %
3585
                                   (idx, utils.CommaJoin(unsupported)),
3586
                                   errors.ECODE_INVAL)
3587

    
3588
  def ExpandNames(self):
3589
    self._ExpandAndLockInstance()
3590
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3591

    
3592
    if self.op.nodes:
3593
      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
3594
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
3595
    else:
3596
      self.needed_locks[locking.LEVEL_NODE] = []
3597
      if self.op.iallocator:
3598
        # iallocator will select a new node in the same group
3599
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
3600
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3601

    
3602
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3603

    
3604
  def DeclareLocks(self, level):
3605
    if level == locking.LEVEL_NODEGROUP:
3606
      assert self.op.iallocator is not None
3607
      assert not self.op.nodes
3608
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3609
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
3610
      # Lock the primary group used by the instance optimistically; this
3611
      # requires going via the node before it's locked, requiring
3612
      # verification later on
3613
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3614
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
3615

    
3616
    elif level == locking.LEVEL_NODE:
3617
      # If an allocator is used, then we lock all the nodes in the current
3618
      # instance group, as we don't know yet which ones will be selected;
3619
      # if we replace the nodes without using an allocator, locks are
3620
      # already declared in ExpandNames; otherwise, we need to lock all the
3621
      # instance nodes for disk re-creation
3622
      if self.op.iallocator:
3623
        assert not self.op.nodes
3624
        assert not self.needed_locks[locking.LEVEL_NODE]
3625
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
3626

    
3627
        # Lock member nodes of the group of the primary node
3628
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
3629
          self.needed_locks[locking.LEVEL_NODE].extend(
3630
            self.cfg.GetNodeGroup(group_uuid).members)
3631

    
3632
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3633
      elif not self.op.nodes:
3634
        self._LockInstancesNodes(primary_only=False)
3635
    elif level == locking.LEVEL_NODE_RES:
3636
      # Copy node locks
3637
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3638
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3639

    
3640
  def BuildHooksEnv(self):
3641
    """Build hooks env.
3642

3643
    This runs on master, primary and secondary nodes of the instance.
3644

3645
    """
3646
    return _BuildInstanceHookEnvByObject(self, self.instance)
3647

    
3648
  def BuildHooksNodes(self):
3649
    """Build hooks nodes.
3650

3651
    """
3652
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3653
    return (nl, nl)
3654

    
3655
  def CheckPrereq(self):
3656
    """Check prerequisites.
3657

3658
    This checks that the instance is in the cluster and is not running.
3659

3660
    """
3661
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3662
    assert instance is not None, \
3663
      "Cannot retrieve locked instance %s" % self.op.instance_name
3664
    if self.op.nodes:
3665
      if len(self.op.nodes) != len(instance.all_nodes):
3666
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
3667
                                   " %d replacement nodes were specified" %
3668
                                   (instance.name, len(instance.all_nodes),
3669
                                    len(self.op.nodes)),
3670
                                   errors.ECODE_INVAL)
3671
      assert instance.disk_template != constants.DT_DRBD8 or \
3672
          len(self.op.nodes) == 2
3673
      assert instance.disk_template != constants.DT_PLAIN or \
3674
          len(self.op.nodes) == 1
3675
      primary_node = self.op.nodes[0]
3676
    else:
3677
      primary_node = instance.primary_node
3678
    if not self.op.iallocator:
3679
      _CheckNodeOnline(self, primary_node)
3680

    
3681
    if instance.disk_template == constants.DT_DISKLESS:
3682
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3683
                                 self.op.instance_name, errors.ECODE_INVAL)
3684

    
3685
    # Verify if node group locks are still correct
3686
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3687
    if owned_groups:
3688
      # Node group locks are acquired only for the primary node (and only
3689
      # when the allocator is used)
3690
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
3691
                               primary_only=True)
3692

    
3693
    # if we replace nodes *and* the old primary is offline, we don't
3694
    # check the instance state
3695
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
3696
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
3697
      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
3698
                          msg="cannot recreate disks")
3699

    
3700
    if self.op.disks:
3701
      self.disks = dict(self.op.disks)
3702
    else:
3703
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
3704

    
3705
    maxidx = max(self.disks.keys())
3706
    if maxidx >= len(instance.disks):
3707
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
3708
                                 errors.ECODE_INVAL)
3709

    
3710
    if ((self.op.nodes or self.op.iallocator) and
3711
        sorted(self.disks.keys()) != range(len(instance.disks))):
3712
      raise errors.OpPrereqError("Can't recreate disks partially and"
3713
                                 " change the nodes at the same time",
3714
                                 errors.ECODE_INVAL)
3715

    
3716
    self.instance = instance
3717

    
3718
    if self.op.iallocator:
3719
      self._RunAllocator()
3720
      # Release unneeded node and node resource locks
3721
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
3722
      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
3723
      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
3724

    
3725
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3726

    
3727
  def Exec(self, feedback_fn):
3728
    """Recreate the disks.
3729

3730
    """
3731
    instance = self.instance
3732

    
3733
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3734
            self.owned_locks(locking.LEVEL_NODE_RES))
3735

    
3736
    to_skip = []
3737
    mods = [] # keeps track of needed changes
3738

    
3739
    for idx, disk in enumerate(instance.disks):
3740
      try:
3741
        changes = self.disks[idx]
3742
      except KeyError:
3743
        # Disk should not be recreated
3744
        to_skip.append(idx)
3745
        continue
3746

    
3747
      # update secondaries for disks, if needed
3748
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
3749
        # need to update the nodes and minors
3750
        assert len(self.op.nodes) == 2
3751
        assert len(disk.logical_id) == 6 # otherwise disk internals
3752
                                         # have changed
3753
        (_, _, old_port, _, _, old_secret) = disk.logical_id
3754
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
3755
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
3756
                  new_minors[0], new_minors[1], old_secret)
3757
        assert len(disk.logical_id) == len(new_id)
3758
      else:
3759
        new_id = None
3760

    
3761
      mods.append((idx, new_id, changes))
3762

    
3763
    # now that we have passed all asserts above, we can apply the mods
3764
    # in a single run (to avoid partial changes)
3765
    for idx, new_id, changes in mods:
3766
      disk = instance.disks[idx]
3767
      if new_id is not None:
3768
        assert disk.dev_type == constants.LD_DRBD8
3769
        disk.logical_id = new_id
3770
      if changes:
3771
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
3772
                    mode=changes.get(constants.IDISK_MODE, None))
3773

    
3774
    # change primary node, if needed
3775
    if self.op.nodes:
3776
      instance.primary_node = self.op.nodes[0]
3777
      self.LogWarning("Changing the instance's nodes, you will have to"
3778
                      " remove any disks left on the older nodes manually")
3779

    
3780
    if self.op.nodes:
3781
      self.cfg.Update(instance, feedback_fn)
3782

    
3783
    # All touched nodes must be locked
3784
    mylocks = self.owned_locks(locking.LEVEL_NODE)
3785
    assert mylocks.issuperset(frozenset(instance.all_nodes))
3786
    _CreateDisks(self, instance, to_skip=to_skip)
3787

    
3788

    
3789
class LUInstanceRename(LogicalUnit):
3790
  """Rename an instance.
3791

3792
  """
3793
  HPATH = "instance-rename"
3794
  HTYPE = constants.HTYPE_INSTANCE
3795

    
3796
  def CheckArguments(self):
3797
    """Check arguments.
3798

3799
    """
3800
    if self.op.ip_check and not self.op.name_check:
3801
      # TODO: make the ip check more flexible and not depend on the name check
3802
      raise errors.OpPrereqError("IP address check requires a name check",
3803
                                 errors.ECODE_INVAL)
3804

    
3805
  def BuildHooksEnv(self):
3806
    """Build hooks env.
3807

3808
    This runs on master, primary and secondary nodes of the instance.
3809

3810
    """
3811
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3812
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3813
    return env
3814

    
3815
  def BuildHooksNodes(self):
3816
    """Build hooks nodes.
3817

3818
    """
3819
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3820
    return (nl, nl)
3821

    
3822
  def CheckPrereq(self):
3823
    """Check prerequisites.
3824

3825
    This checks that the instance is in the cluster and is not running.
3826

3827
    """
3828
    self.op.instance_name = _ExpandInstanceName(self.cfg,
3829
                                                self.op.instance_name)
3830
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3831
    assert instance is not None
3832
    _CheckNodeOnline(self, instance.primary_node)
3833
    _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
3834
                        msg="cannot rename")
3835
    self.instance = instance
3836

    
3837
    new_name = self.op.new_name
3838
    if self.op.name_check:
3839
      hostname = _CheckHostnameSane(self, new_name)
3840
      new_name = self.op.new_name = hostname.name
3841
      if (self.op.ip_check and
3842
          netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
3843
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3844
                                   (hostname.ip, new_name),
3845
                                   errors.ECODE_NOTUNIQUE)
3846

    
3847
    instance_list = self.cfg.GetInstanceList()
3848
    if new_name in instance_list and new_name != instance.name:
3849
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3850
                                 new_name, errors.ECODE_EXISTS)
3851

    
3852
  def Exec(self, feedback_fn):
3853
    """Rename the instance.
3854

3855
    """
3856
    inst = self.instance
3857
    old_name = inst.name
3858

    
3859
    rename_file_storage = False
3860
    if (inst.disk_template in constants.DTS_FILEBASED and
3861
        self.op.new_name != inst.name):
3862
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3863
      rename_file_storage = True
3864

    
3865
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3866
    # Change the instance lock. This is definitely safe while we hold the BGL.
3867
    # Otherwise the new lock would have to be added in acquired mode.
3868
    assert self.REQ_BGL
3869
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
3870
    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
3871
    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3872

    
3873
    # re-read the instance from the configuration after rename
3874
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3875

    
3876
    if rename_file_storage:
3877
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3878
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3879
                                                     old_file_storage_dir,
3880
                                                     new_file_storage_dir)
3881
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3882
                   " (but the instance has been renamed in Ganeti)" %
3883
                   (inst.primary_node, old_file_storage_dir,
3884
                    new_file_storage_dir))
3885

    
3886
    _StartInstanceDisks(self, inst, None)
3887
    # update info on disks
3888
    info = _GetInstanceInfoText(inst)
3889
    for (idx, disk) in enumerate(inst.disks):
3890
      for node in inst.all_nodes:
3891
        self.cfg.SetDiskID(disk, node)
3892
        result = self.rpc.call_blockdev_setinfo(node, disk, info)
3893
        if result.fail_msg:
3894
          self.LogWarning("Error setting info on node %s for disk %s: %s",
3895
                          node, idx, result.fail_msg)
3896
    try:
3897
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3898
                                                 old_name, self.op.debug_level)
3899
      msg = result.fail_msg
3900
      if msg:
3901
        msg = ("Could not run OS rename script for instance %s on node %s"
3902
               " (but the instance has been renamed in Ganeti): %s" %
3903
               (inst.name, inst.primary_node, msg))
3904
        self.LogWarning(msg)
3905
    finally:
3906
      _ShutdownInstanceDisks(self, inst)
3907

    
3908
    return inst.name
3909

    
3910

    
3911
class LUInstanceRemove(LogicalUnit):
3912
  """Remove an instance.
3913

3914
  """
3915
  HPATH = "instance-remove"
3916
  HTYPE = constants.HTYPE_INSTANCE
3917
  REQ_BGL = False
3918

    
3919
  def ExpandNames(self):
3920
    self._ExpandAndLockInstance()
3921
    self.needed_locks[locking.LEVEL_NODE] = []
3922
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3923
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3924

    
3925
  def DeclareLocks(self, level):
3926
    if level == locking.LEVEL_NODE:
3927
      self._LockInstancesNodes()
3928
    elif level == locking.LEVEL_NODE_RES:
3929
      # Copy node locks
3930
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3931
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3932

    
3933
  def BuildHooksEnv(self):
3934
    """Build hooks env.
3935

3936
    This runs on master, primary and secondary nodes of the instance.
3937

3938
    """
3939
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3940
    env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
3941
    return env
3942

    
3943
  def BuildHooksNodes(self):
3944
    """Build hooks nodes.
3945

3946
    """
3947
    nl = [self.cfg.GetMasterNode()]
3948
    nl_post = list(self.instance.all_nodes) + nl
3949
    return (nl, nl_post)
3950

    
3951
  def CheckPrereq(self):
3952
    """Check prerequisites.
3953

3954
    This checks that the instance is in the cluster.
3955

3956
    """
3957
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3958
    assert self.instance is not None, \
3959
      "Cannot retrieve locked instance %s" % self.op.instance_name
3960

    
3961
  def Exec(self, feedback_fn):
3962
    """Remove the instance.
3963

3964
    """
3965
    instance = self.instance
3966
    logging.info("Shutting down instance %s on node %s",
3967
                 instance.name, instance.primary_node)
3968

    
3969
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
3970
                                             self.op.shutdown_timeout,
3971
                                             self.op.reason)
3972
    msg = result.fail_msg
3973
    if msg:
3974
      if self.op.ignore_failures:
3975
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3976
      else:
3977
        raise errors.OpExecError("Could not shutdown instance %s on"
3978
                                 " node %s: %s" %
3979
                                 (instance.name, instance.primary_node, msg))
3980

    
3981
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3982
            self.owned_locks(locking.LEVEL_NODE_RES))
3983
    assert not (set(instance.all_nodes) -
3984
                self.owned_locks(locking.LEVEL_NODE)), \
3985
      "Not owning correct locks"
3986

    
3987
    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
3988

    
3989

    
3990
def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
3991
  """Utility function to remove an instance.
3992

3993
  """
3994
  logging.info("Removing block devices for instance %s", instance.name)
3995

    
3996
  if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
3997
    if not ignore_failures:
3998
      raise errors.OpExecError("Can't remove instance's disks")
3999
    feedback_fn("Warning: can't remove instance's disks")
4000

    
4001
  logging.info("Removing instance %s out of cluster config", instance.name)
4002

    
4003
  lu.cfg.RemoveInstance(instance.name)
4004

    
4005
  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
4006
    "Instance lock removal conflict"
4007

    
4008
  # Remove lock for the instance
4009
  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4010

    
4011

    
4012
class LUInstanceQuery(NoHooksLU):
4013
  """Logical unit for querying instances.
4014

4015
  """
4016
  # pylint: disable=W0142
4017
  REQ_BGL = False
4018

    
4019
  def CheckArguments(self):
4020
    self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
4021
                             self.op.output_fields, self.op.use_locking)
4022

    
4023
  def ExpandNames(self):
4024
    self.iq.ExpandNames(self)
4025

    
4026
  def DeclareLocks(self, level):
4027
    self.iq.DeclareLocks(self, level)
4028

    
4029
  def Exec(self, feedback_fn):
4030
    return self.iq.OldStyleQuery(self)
4031

    
4032

    
4033
def _ExpandNamesForMigration(lu):
4034
  """Expands names for use with L{TLMigrateInstance}.
4035

4036
  @type lu: L{LogicalUnit}
4037

4038
  """
4039
  if lu.op.target_node is not None:
4040
    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
4041

    
4042
  lu.needed_locks[locking.LEVEL_NODE] = []
4043
  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4044

    
4045
  lu.needed_locks[locking.LEVEL_NODE_RES] = []
4046
  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
4047

    
4048
  # The node allocation lock is actually only needed for externally replicated
4049
  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
4050
  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
4051

    
4052

    
4053
def _DeclareLocksForMigration(lu, level):
4054
  """Declares locks for L{TLMigrateInstance}.
4055

4056
  @type lu: L{LogicalUnit}
4057
  @param level: Lock level
4058

4059
  """
4060
  if level == locking.LEVEL_NODE_ALLOC:
4061
    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
4062

    
4063
    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
4064

    
4065
    # Node locks are already declared here rather than at LEVEL_NODE as we need
4066
    # the instance object anyway to declare the node allocation lock.
4067
    if instance.disk_template in constants.DTS_EXT_MIRROR:
4068
      if lu.op.target_node is None:
4069
        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4070
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
4071
      else:
4072
        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
4073
                                               lu.op.target_node]
4074
      del lu.recalculate_locks[locking.LEVEL_NODE]
4075
    else:
4076
      lu._LockInstancesNodes() # pylint: disable=W0212
4077

    
4078
  elif level == locking.LEVEL_NODE:
4079
    # Node locks are declared together with the node allocation lock
4080
    assert (lu.needed_locks[locking.LEVEL_NODE] or
4081
            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
4082

    
4083
  elif level == locking.LEVEL_NODE_RES:
4084
    # Copy node locks
4085
    lu.needed_locks[locking.LEVEL_NODE_RES] = \
4086
      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
4087

    
4088

    
4089
class LUInstanceFailover(LogicalUnit):
4090
  """Failover an instance.
4091

4092
  """
4093
  HPATH = "instance-failover"
4094
  HTYPE = constants.HTYPE_INSTANCE
4095
  REQ_BGL = False
4096

    
4097
  def CheckArguments(self):
4098
    """Check the arguments.
4099

4100
    """
4101
    self.iallocator = getattr(self.op, "iallocator", None)
4102
    self.target_node = getattr(self.op, "target_node", None)
4103

    
4104
  def ExpandNames(self):
4105
    self._ExpandAndLockInstance()
4106
    _ExpandNamesForMigration(self)
4107

    
4108
    self._migrater = \
4109
      TLMigrateInstance(self, self.op.instance_name, False, True, False,
4110
                        self.op.ignore_consistency, True,
4111
                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
4112

    
4113
    self.tasklets = [self._migrater]