Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / __init__.py @ 7352d33b

History | View | Annotate | Download (434.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable=W0201,C0302
25

    
26
# W0201 since most LU attributes are defined in CheckPrereq or similar
27
# functions
28

    
29
# C0302: since we have waaaay too many lines in this module
30

    
31
import os
32
import time
33
import logging
34
import copy
35
import OpenSSL
36
import itertools
37
import operator
38

    
39
from ganeti import utils
40
from ganeti import errors
41
from ganeti import hypervisor
42
from ganeti import locking
43
from ganeti import constants
44
from ganeti import objects
45
from ganeti import compat
46
from ganeti import masterd
47
from ganeti import netutils
48
from ganeti import query
49
from ganeti import qlang
50
from ganeti import opcodes
51
from ganeti import ht
52
from ganeti import rpc
53
from ganeti import pathutils
54
from ganeti import network
55
from ganeti.masterd import iallocator
56

    
57
from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
58
  Tasklet, _QueryBase
59
from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
60
  _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
61
  _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
62
  _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
63
  _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
64
  _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
65
  _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
66
  _ComputeIPolicySpecViolation
67

    
68
from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
69
  LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
70
  LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
71
  LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
72
  LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
73
  LUClusterVerifyDisks
74
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
75
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
76
  LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
77
  LUNetworkDisconnect
78
from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
79

    
80
import ganeti.masterd.instance # pylint: disable=W0611
81

    
82

    
83
# States of instance
84
INSTANCE_DOWN = [constants.ADMINST_DOWN]
85
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
86
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
87

    
88
#: Instance status in which an instance can be marked as offline/online
89
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
90
  constants.ADMINST_OFFLINE,
91
  ]))
92

    
93

    
94
def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
95
                              cur_group_uuid):
96
  """Checks if node groups for locked instances are still correct.
97

98
  @type cfg: L{config.ConfigWriter}
99
  @param cfg: Cluster configuration
100
  @type instances: dict; string as key, L{objects.Instance} as value
101
  @param instances: Dictionary, instance name as key, instance object as value
102
  @type owned_groups: iterable of string
103
  @param owned_groups: List of owned groups
104
  @type owned_nodes: iterable of string
105
  @param owned_nodes: List of owned nodes
106
  @type cur_group_uuid: string or None
107
  @param cur_group_uuid: Optional group UUID to check against instance's groups
108

109
  """
110
  for (name, inst) in instances.items():
111
    assert owned_nodes.issuperset(inst.all_nodes), \
112
      "Instance %s's nodes changed while we kept the lock" % name
113

    
114
    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
115

    
116
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
117
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
118

    
119

    
120
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
121
                             primary_only=False):
122
  """Checks if the owned node groups are still correct for an instance.
123

124
  @type cfg: L{config.ConfigWriter}
125
  @param cfg: The cluster configuration
126
  @type instance_name: string
127
  @param instance_name: Instance name
128
  @type owned_groups: set or frozenset
129
  @param owned_groups: List of currently owned node groups
130
  @type primary_only: boolean
131
  @param primary_only: Whether to check node groups for only the primary node
132

133
  """
134
  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
135

    
136
  if not owned_groups.issuperset(inst_groups):
137
    raise errors.OpPrereqError("Instance %s's node groups changed since"
138
                               " locks were acquired, current groups are"
139
                               " are '%s', owning groups '%s'; retry the"
140
                               " operation" %
141
                               (instance_name,
142
                                utils.CommaJoin(inst_groups),
143
                                utils.CommaJoin(owned_groups)),
144
                               errors.ECODE_STATE)
145

    
146
  return inst_groups
147

    
148

    
149
def _IsExclusiveStorageEnabledNode(cfg, node):
150
  """Whether exclusive_storage is in effect for the given node.
151

152
  @type cfg: L{config.ConfigWriter}
153
  @param cfg: The cluster configuration
154
  @type node: L{objects.Node}
155
  @param node: The node
156
  @rtype: bool
157
  @return: The effective value of exclusive_storage
158

159
  """
160
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
161

    
162

    
163
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return _IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CopyLockList(names):
183
  """Makes a copy of a list of lock names.
184

185
  Handles L{locking.ALL_SET} correctly.
186

187
  """
188
  if names == locking.ALL_SET:
189
    return locking.ALL_SET
190
  else:
191
    return names[:]
192

    
193

    
194
def _ReleaseLocks(lu, level, names=None, keep=None):
195
  """Releases locks owned by an LU.
196

197
  @type lu: L{LogicalUnit}
198
  @param level: Lock level
199
  @type names: list or None
200
  @param names: Names of locks to release
201
  @type keep: list or None
202
  @param keep: Names of locks to retain
203

204
  """
205
  assert not (keep is not None and names is not None), \
206
         "Only one of the 'names' and the 'keep' parameters can be given"
207

    
208
  if names is not None:
209
    should_release = names.__contains__
210
  elif keep:
211
    should_release = lambda name: name not in keep
212
  else:
213
    should_release = None
214

    
215
  owned = lu.owned_locks(level)
216
  if not owned:
217
    # Not owning any lock at this level, do nothing
218
    pass
219

    
220
  elif should_release:
221
    retain = []
222
    release = []
223

    
224
    # Determine which locks to release
225
    for name in owned:
226
      if should_release(name):
227
        release.append(name)
228
      else:
229
        retain.append(name)
230

    
231
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
232

    
233
    # Release just some locks
234
    lu.glm.release(level, names=release)
235

    
236
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
237
  else:
238
    # Release everything
239
    lu.glm.release(level)
240

    
241
    assert not lu.glm.is_owned(level), "No locks should be owned"
242

    
243

    
244
def _MapInstanceDisksToNodes(instances):
245
  """Creates a map from (node, volume) to instance name.
246

247
  @type instances: list of L{objects.Instance}
248
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
249

250
  """
251
  return dict(((node, vol), inst.name)
252
              for inst in instances
253
              for (node, vols) in inst.MapLVsByNode().items()
254
              for vol in vols)
255

    
256

    
257
def _CheckOutputFields(static, dynamic, selected):
258
  """Checks whether all selected fields are valid.
259

260
  @type static: L{utils.FieldSet}
261
  @param static: static fields set
262
  @type dynamic: L{utils.FieldSet}
263
  @param dynamic: dynamic fields set
264

265
  """
266
  f = utils.FieldSet()
267
  f.Extend(static)
268
  f.Extend(dynamic)
269

    
270
  delta = f.NonMatching(selected)
271
  if delta:
272
    raise errors.OpPrereqError("Unknown output fields selected: %s"
273
                               % ",".join(delta), errors.ECODE_INVAL)
274

    
275

    
276
def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
277
  """Make sure that none of the given paramters is global.
278

279
  If a global parameter is found, an L{errors.OpPrereqError} exception is
280
  raised. This is used to avoid setting global parameters for individual nodes.
281

282
  @type params: dictionary
283
  @param params: Parameters to check
284
  @type glob_pars: dictionary
285
  @param glob_pars: Forbidden parameters
286
  @type kind: string
287
  @param kind: Kind of parameters (e.g. "node")
288
  @type bad_levels: string
289
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
290
      "instance")
291
  @type good_levels: strings
292
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
293
      "cluster or group")
294

295
  """
296
  used_globals = glob_pars.intersection(params)
297
  if used_globals:
298
    msg = ("The following %s parameters are global and cannot"
299
           " be customized at %s level, please modify them at"
300
           " %s level: %s" %
301
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
302
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
303

    
304

    
305
def _CheckNodeOnline(lu, node, msg=None):
306
  """Ensure that a given node is online.
307

308
  @param lu: the LU on behalf of which we make the check
309
  @param node: the node to check
310
  @param msg: if passed, should be a message to replace the default one
311
  @raise errors.OpPrereqError: if the node is offline
312

313
  """
314
  if msg is None:
315
    msg = "Can't use offline node"
316
  if lu.cfg.GetNodeInfo(node).offline:
317
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
318

    
319

    
320
def _CheckNodeNotDrained(lu, node):
321
  """Ensure that a given node is not drained.
322

323
  @param lu: the LU on behalf of which we make the check
324
  @param node: the node to check
325
  @raise errors.OpPrereqError: if the node is drained
326

327
  """
328
  if lu.cfg.GetNodeInfo(node).drained:
329
    raise errors.OpPrereqError("Can't use drained node %s" % node,
330
                               errors.ECODE_STATE)
331

    
332

    
333
def _CheckNodeVmCapable(lu, node):
334
  """Ensure that a given node is vm capable.
335

336
  @param lu: the LU on behalf of which we make the check
337
  @param node: the node to check
338
  @raise errors.OpPrereqError: if the node is not vm capable
339

340
  """
341
  if not lu.cfg.GetNodeInfo(node).vm_capable:
342
    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
343
                               errors.ECODE_STATE)
344

    
345

    
346
def _CheckNodeHasOS(lu, node, os_name, force_variant):
347
  """Ensure that a node supports a given OS.
348

349
  @param lu: the LU on behalf of which we make the check
350
  @param node: the node to check
351
  @param os_name: the OS to query about
352
  @param force_variant: whether to ignore variant errors
353
  @raise errors.OpPrereqError: if the node is not supporting the OS
354

355
  """
356
  result = lu.rpc.call_os_get(node, os_name)
357
  result.Raise("OS '%s' not in supported OS list for node %s" %
358
               (os_name, node),
359
               prereq=True, ecode=errors.ECODE_INVAL)
360
  if not force_variant:
361
    _CheckOSVariant(result.payload, os_name)
362

    
363

    
364
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
365
  """Ensure that a node has the given secondary ip.
366

367
  @type lu: L{LogicalUnit}
368
  @param lu: the LU on behalf of which we make the check
369
  @type node: string
370
  @param node: the node to check
371
  @type secondary_ip: string
372
  @param secondary_ip: the ip to check
373
  @type prereq: boolean
374
  @param prereq: whether to throw a prerequisite or an execute error
375
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
376
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
377

378
  """
379
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
380
  result.Raise("Failure checking secondary ip on node %s" % node,
381
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
382
  if not result.payload:
383
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
384
           " please fix and re-run this command" % secondary_ip)
385
    if prereq:
386
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
387
    else:
388
      raise errors.OpExecError(msg)
389

    
390

    
391
def _GetClusterDomainSecret():
392
  """Reads the cluster domain secret.
393

394
  """
395
  return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
396
                               strict=True)
397

    
398

    
399
def _CheckInstanceState(lu, instance, req_states, msg=None):
400
  """Ensure that an instance is in one of the required states.
401

402
  @param lu: the LU on behalf of which we make the check
403
  @param instance: the instance to check
404
  @param msg: if passed, should be a message to replace the default one
405
  @raise errors.OpPrereqError: if the instance is not in the required state
406

407
  """
408
  if msg is None:
409
    msg = ("can't use instance from outside %s states" %
410
           utils.CommaJoin(req_states))
411
  if instance.admin_state not in req_states:
412
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
413
                               (instance.name, instance.admin_state, msg),
414
                               errors.ECODE_STATE)
415

    
416
  if constants.ADMINST_UP not in req_states:
417
    pnode = instance.primary_node
418
    if not lu.cfg.GetNodeInfo(pnode).offline:
419
      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
420
      ins_l.Raise("Can't contact node %s for instance information" % pnode,
421
                  prereq=True, ecode=errors.ECODE_ENVIRON)
422
      if instance.name in ins_l.payload:
423
        raise errors.OpPrereqError("Instance %s is running, %s" %
424
                                   (instance.name, msg), errors.ECODE_STATE)
425
    else:
426
      lu.LogWarning("Primary node offline, ignoring check that instance"
427
                     " is down")
428

    
429

    
430
def _ComputeIPolicyInstanceSpecViolation(
431
  ipolicy, instance_spec, disk_template,
432
  _compute_fn=_ComputeIPolicySpecViolation):
433
  """Compute if instance specs meets the specs of ipolicy.
434

435
  @type ipolicy: dict
436
  @param ipolicy: The ipolicy to verify against
437
  @param instance_spec: dict
438
  @param instance_spec: The instance spec to verify
439
  @type disk_template: string
440
  @param disk_template: the disk template of the instance
441
  @param _compute_fn: The function to verify ipolicy (unittest only)
442
  @see: L{_ComputeIPolicySpecViolation}
443

444
  """
445
  mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
446
  cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
447
  disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
448
  disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
449
  nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
450
  spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
451

    
452
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
453
                     disk_sizes, spindle_use, disk_template)
454

    
455

    
456
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
457
                                 target_group, cfg,
458
                                 _compute_fn=_ComputeIPolicyInstanceViolation):
459
  """Compute if instance meets the specs of the new target group.
460

461
  @param ipolicy: The ipolicy to verify
462
  @param instance: The instance object to verify
463
  @param current_group: The current group of the instance
464
  @param target_group: The new group of the instance
465
  @type cfg: L{config.ConfigWriter}
466
  @param cfg: Cluster configuration
467
  @param _compute_fn: The function to verify ipolicy (unittest only)
468
  @see: L{_ComputeIPolicySpecViolation}
469

470
  """
471
  if current_group == target_group:
472
    return []
473
  else:
474
    return _compute_fn(ipolicy, instance, cfg)
475

    
476

    
477
def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
478
                            _compute_fn=_ComputeIPolicyNodeViolation):
479
  """Checks that the target node is correct in terms of instance policy.
480

481
  @param ipolicy: The ipolicy to verify
482
  @param instance: The instance object to verify
483
  @param node: The new node to relocate
484
  @type cfg: L{config.ConfigWriter}
485
  @param cfg: Cluster configuration
486
  @param ignore: Ignore violations of the ipolicy
487
  @param _compute_fn: The function to verify ipolicy (unittest only)
488
  @see: L{_ComputeIPolicySpecViolation}
489

490
  """
491
  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
492
  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
493

    
494
  if res:
495
    msg = ("Instance does not meet target node group's (%s) instance"
496
           " policy: %s") % (node.group, utils.CommaJoin(res))
497
    if ignore:
498
      lu.LogWarning(msg)
499
    else:
500
      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
501

    
502

    
503
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
504
                          minmem, maxmem, vcpus, nics, disk_template, disks,
505
                          bep, hvp, hypervisor_name, tags):
506
  """Builds instance related env variables for hooks
507

508
  This builds the hook environment from individual variables.
509

510
  @type name: string
511
  @param name: the name of the instance
512
  @type primary_node: string
513
  @param primary_node: the name of the instance's primary node
514
  @type secondary_nodes: list
515
  @param secondary_nodes: list of secondary nodes as strings
516
  @type os_type: string
517
  @param os_type: the name of the instance's OS
518
  @type status: string
519
  @param status: the desired status of the instance
520
  @type minmem: string
521
  @param minmem: the minimum memory size of the instance
522
  @type maxmem: string
523
  @param maxmem: the maximum memory size of the instance
524
  @type vcpus: string
525
  @param vcpus: the count of VCPUs the instance has
526
  @type nics: list
527
  @param nics: list of tuples (name, uuid, ip, mac, mode, link, net, netinfo)
528
      representing the NICs the instance has
529
  @type disk_template: string
530
  @param disk_template: the disk template of the instance
531
  @type disks: list
532
  @param disks: list of tuples (name, uuid, size, mode)
533
  @type bep: dict
534
  @param bep: the backend parameters for the instance
535
  @type hvp: dict
536
  @param hvp: the hypervisor parameters for the instance
537
  @type hypervisor_name: string
538
  @param hypervisor_name: the hypervisor for the instance
539
  @type tags: list
540
  @param tags: list of instance tags as strings
541
  @rtype: dict
542
  @return: the hook environment for this instance
543

544
  """
545
  env = {
546
    "OP_TARGET": name,
547
    "INSTANCE_NAME": name,
548
    "INSTANCE_PRIMARY": primary_node,
549
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
550
    "INSTANCE_OS_TYPE": os_type,
551
    "INSTANCE_STATUS": status,
552
    "INSTANCE_MINMEM": minmem,
553
    "INSTANCE_MAXMEM": maxmem,
554
    # TODO(2.9) remove deprecated "memory" value
555
    "INSTANCE_MEMORY": maxmem,
556
    "INSTANCE_VCPUS": vcpus,
557
    "INSTANCE_DISK_TEMPLATE": disk_template,
558
    "INSTANCE_HYPERVISOR": hypervisor_name,
559
  }
560
  if nics:
561
    nic_count = len(nics)
562
    for idx, (name, _, ip, mac, mode, link, net, netinfo) in enumerate(nics):
563
      if ip is None:
564
        ip = ""
565
      env["INSTANCE_NIC%d_NAME" % idx] = name
566
      env["INSTANCE_NIC%d_IP" % idx] = ip
567
      env["INSTANCE_NIC%d_MAC" % idx] = mac
568
      env["INSTANCE_NIC%d_MODE" % idx] = mode
569
      env["INSTANCE_NIC%d_LINK" % idx] = link
570
      if netinfo:
571
        nobj = objects.Network.FromDict(netinfo)
572
        env.update(nobj.HooksDict("INSTANCE_NIC%d_" % idx))
573
      elif network:
574
        # FIXME: broken network reference: the instance NIC specifies a
575
        # network, but the relevant network entry was not in the config. This
576
        # should be made impossible.
577
        env["INSTANCE_NIC%d_NETWORK_NAME" % idx] = net
578
      if mode == constants.NIC_MODE_BRIDGED:
579
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
580
  else:
581
    nic_count = 0
582

    
583
  env["INSTANCE_NIC_COUNT"] = nic_count
584

    
585
  if disks:
586
    disk_count = len(disks)
587
    for idx, (name, size, mode) in enumerate(disks):
588
      env["INSTANCE_DISK%d_NAME" % idx] = name
589
      env["INSTANCE_DISK%d_SIZE" % idx] = size
590
      env["INSTANCE_DISK%d_MODE" % idx] = mode
591
  else:
592
    disk_count = 0
593

    
594
  env["INSTANCE_DISK_COUNT"] = disk_count
595

    
596
  if not tags:
597
    tags = []
598

    
599
  env["INSTANCE_TAGS"] = " ".join(tags)
600

    
601
  for source, kind in [(bep, "BE"), (hvp, "HV")]:
602
    for key, value in source.items():
603
      env["INSTANCE_%s_%s" % (kind, key)] = value
604

    
605
  return env
606

    
607

    
608
def _NICToTuple(lu, nic):
609
  """Build a tupple of nic information.
610

611
  @type lu:  L{LogicalUnit}
612
  @param lu: the logical unit on whose behalf we execute
613
  @type nic: L{objects.NIC}
614
  @param nic: nic to convert to hooks tuple
615

616
  """
617
  cluster = lu.cfg.GetClusterInfo()
618
  filled_params = cluster.SimpleFillNIC(nic.nicparams)
619
  mode = filled_params[constants.NIC_MODE]
620
  link = filled_params[constants.NIC_LINK]
621
  netinfo = None
622
  if nic.network:
623
    nobj = lu.cfg.GetNetwork(nic.network)
624
    netinfo = objects.Network.ToDict(nobj)
625
  return (nic.name, nic.uuid, nic.ip, nic.mac, mode, link, nic.network, netinfo)
626

    
627

    
628
def _NICListToTuple(lu, nics):
629
  """Build a list of nic information tuples.
630

631
  This list is suitable to be passed to _BuildInstanceHookEnv or as a return
632
  value in LUInstanceQueryData.
633

634
  @type lu:  L{LogicalUnit}
635
  @param lu: the logical unit on whose behalf we execute
636
  @type nics: list of L{objects.NIC}
637
  @param nics: list of nics to convert to hooks tuples
638

639
  """
640
  hooks_nics = []
641
  for nic in nics:
642
    hooks_nics.append(_NICToTuple(lu, nic))
643
  return hooks_nics
644

    
645

    
646
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
647
  """Builds instance related env variables for hooks from an object.
648

649
  @type lu: L{LogicalUnit}
650
  @param lu: the logical unit on whose behalf we execute
651
  @type instance: L{objects.Instance}
652
  @param instance: the instance for which we should build the
653
      environment
654
  @type override: dict
655
  @param override: dictionary with key/values that will override
656
      our values
657
  @rtype: dict
658
  @return: the hook environment dictionary
659

660
  """
661
  cluster = lu.cfg.GetClusterInfo()
662
  bep = cluster.FillBE(instance)
663
  hvp = cluster.FillHV(instance)
664
  args = {
665
    "name": instance.name,
666
    "primary_node": instance.primary_node,
667
    "secondary_nodes": instance.secondary_nodes,
668
    "os_type": instance.os,
669
    "status": instance.admin_state,
670
    "maxmem": bep[constants.BE_MAXMEM],
671
    "minmem": bep[constants.BE_MINMEM],
672
    "vcpus": bep[constants.BE_VCPUS],
673
    "nics": _NICListToTuple(lu, instance.nics),
674
    "disk_template": instance.disk_template,
675
    "disks": [(disk.name, disk.size, disk.mode)
676
              for disk in instance.disks],
677
    "bep": bep,
678
    "hvp": hvp,
679
    "hypervisor_name": instance.hypervisor,
680
    "tags": instance.tags,
681
  }
682
  if override:
683
    args.update(override)
684
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
685

    
686

    
687
def _DecideSelfPromotion(lu, exceptions=None):
688
  """Decide whether I should promote myself as a master candidate.
689

690
  """
691
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
692
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
693
  # the new node will increase mc_max with one, so:
694
  mc_should = min(mc_should + 1, cp_size)
695
  return mc_now < mc_should
696

    
697

    
698
def _CheckNicsBridgesExist(lu, target_nics, target_node):
699
  """Check that the brigdes needed by a list of nics exist.
700

701
  """
702
  cluster = lu.cfg.GetClusterInfo()
703
  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
704
  brlist = [params[constants.NIC_LINK] for params in paramslist
705
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
706
  if brlist:
707
    result = lu.rpc.call_bridges_exist(target_node, brlist)
708
    result.Raise("Error checking bridges on destination node '%s'" %
709
                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
710

    
711

    
712
def _CheckInstanceBridgesExist(lu, instance, node=None):
713
  """Check that the brigdes needed by an instance exist.
714

715
  """
716
  if node is None:
717
    node = instance.primary_node
718
  _CheckNicsBridgesExist(lu, instance.nics, node)
719

    
720

    
721
def _CheckOSVariant(os_obj, name):
722
  """Check whether an OS name conforms to the os variants specification.
723

724
  @type os_obj: L{objects.OS}
725
  @param os_obj: OS object to check
726
  @type name: string
727
  @param name: OS name passed by the user, to check for validity
728

729
  """
730
  variant = objects.OS.GetVariant(name)
731
  if not os_obj.supported_variants:
732
    if variant:
733
      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
734
                                 " passed)" % (os_obj.name, variant),
735
                                 errors.ECODE_INVAL)
736
    return
737
  if not variant:
738
    raise errors.OpPrereqError("OS name must include a variant",
739
                               errors.ECODE_INVAL)
740

    
741
  if variant not in os_obj.supported_variants:
742
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
743

    
744

    
745
def _GetNodeInstancesInner(cfg, fn):
746
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
747

    
748

    
749
def _GetNodeInstances(cfg, node_name):
750
  """Returns a list of all primary and secondary instances on a node.
751

752
  """
753

    
754
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
755

    
756

    
757
def _GetNodePrimaryInstances(cfg, node_name):
758
  """Returns primary instances on a node.
759

760
  """
761
  return _GetNodeInstancesInner(cfg,
762
                                lambda inst: node_name == inst.primary_node)
763

    
764

    
765
def _GetNodeSecondaryInstances(cfg, node_name):
766
  """Returns secondary instances on a node.
767

768
  """
769
  return _GetNodeInstancesInner(cfg,
770
                                lambda inst: node_name in inst.secondary_nodes)
771

    
772

    
773
def _GetStorageTypeArgs(cfg, storage_type):
774
  """Returns the arguments for a storage type.
775

776
  """
777
  # Special case for file storage
778
  if storage_type == constants.ST_FILE:
779
    # storage.FileStorage wants a list of storage directories
780
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
781

    
782
  return []
783

    
784

    
785
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
786
  faulty = []
787

    
788
  for dev in instance.disks:
789
    cfg.SetDiskID(dev, node_name)
790

    
791
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
792
                                                                instance))
793
  result.Raise("Failed to get disk status from node %s" % node_name,
794
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
795

    
796
  for idx, bdev_status in enumerate(result.payload):
797
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
798
      faulty.append(idx)
799

    
800
  return faulty
801

    
802

    
803
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
804
  """Check the sanity of iallocator and node arguments and use the
805
  cluster-wide iallocator if appropriate.
806

807
  Check that at most one of (iallocator, node) is specified. If none is
808
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
809
  then the LU's opcode's iallocator slot is filled with the cluster-wide
810
  default iallocator.
811

812
  @type iallocator_slot: string
813
  @param iallocator_slot: the name of the opcode iallocator slot
814
  @type node_slot: string
815
  @param node_slot: the name of the opcode target node slot
816

817
  """
818
  node = getattr(lu.op, node_slot, None)
819
  ialloc = getattr(lu.op, iallocator_slot, None)
820
  if node == []:
821
    node = None
822

    
823
  if node is not None and ialloc is not None:
824
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
825
                               errors.ECODE_INVAL)
826
  elif ((node is None and ialloc is None) or
827
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
828
    default_iallocator = lu.cfg.GetDefaultIAllocator()
829
    if default_iallocator:
830
      setattr(lu.op, iallocator_slot, default_iallocator)
831
    else:
832
      raise errors.OpPrereqError("No iallocator or node given and no"
833
                                 " cluster-wide default iallocator found;"
834
                                 " please specify either an iallocator or a"
835
                                 " node, or set a cluster-wide default"
836
                                 " iallocator", errors.ECODE_INVAL)
837

    
838

    
839
def _GetDefaultIAllocator(cfg, ialloc):
840
  """Decides on which iallocator to use.
841

842
  @type cfg: L{config.ConfigWriter}
843
  @param cfg: Cluster configuration object
844
  @type ialloc: string or None
845
  @param ialloc: Iallocator specified in opcode
846
  @rtype: string
847
  @return: Iallocator name
848

849
  """
850
  if not ialloc:
851
    # Use default iallocator
852
    ialloc = cfg.GetDefaultIAllocator()
853

    
854
  if not ialloc:
855
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
856
                               " opcode nor as a cluster-wide default",
857
                               errors.ECODE_INVAL)
858

    
859
  return ialloc
860

    
861

    
862
def _CheckHostnameSane(lu, name):
863
  """Ensures that a given hostname resolves to a 'sane' name.
864

865
  The given name is required to be a prefix of the resolved hostname,
866
  to prevent accidental mismatches.
867

868
  @param lu: the logical unit on behalf of which we're checking
869
  @param name: the name we should resolve and check
870
  @return: the resolved hostname object
871

872
  """
873
  hostname = netutils.GetHostname(name=name)
874
  if hostname.name != name:
875
    lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
876
  if not utils.MatchNameComponent(name, [hostname.name]):
877
    raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
878
                                " same as given hostname '%s'") %
879
                                (hostname.name, name), errors.ECODE_INVAL)
880
  return hostname
881

    
882

    
883
class LUGroupVerifyDisks(NoHooksLU):
884
  """Verifies the status of all disks in a node group.
885

886
  """
887
  REQ_BGL = False
888

    
889
  def ExpandNames(self):
890
    # Raises errors.OpPrereqError on its own if group can't be found
891
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
892

    
893
    self.share_locks = _ShareAll()
894
    self.needed_locks = {
895
      locking.LEVEL_INSTANCE: [],
896
      locking.LEVEL_NODEGROUP: [],
897
      locking.LEVEL_NODE: [],
898

    
899
      # This opcode is acquires all node locks in a group. LUClusterVerifyDisks
900
      # starts one instance of this opcode for every group, which means all
901
      # nodes will be locked for a short amount of time, so it's better to
902
      # acquire the node allocation lock as well.
903
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
904
      }
905

    
906
  def DeclareLocks(self, level):
907
    if level == locking.LEVEL_INSTANCE:
908
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
909

    
910
      # Lock instances optimistically, needs verification once node and group
911
      # locks have been acquired
912
      self.needed_locks[locking.LEVEL_INSTANCE] = \
913
        self.cfg.GetNodeGroupInstances(self.group_uuid)
914

    
915
    elif level == locking.LEVEL_NODEGROUP:
916
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
917

    
918
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
919
        set([self.group_uuid] +
920
            # Lock all groups used by instances optimistically; this requires
921
            # going via the node before it's locked, requiring verification
922
            # later on
923
            [group_uuid
924
             for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
925
             for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
926

    
927
    elif level == locking.LEVEL_NODE:
928
      # This will only lock the nodes in the group to be verified which contain
929
      # actual instances
930
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
931
      self._LockInstancesNodes()
932

    
933
      # Lock all nodes in group to be verified
934
      assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
935
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
936
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
937

    
938
  def CheckPrereq(self):
939
    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
940
    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
941
    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
942

    
943
    assert self.group_uuid in owned_groups
944

    
945
    # Check if locked instances are still correct
946
    _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
947

    
948
    # Get instance information
949
    self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
950

    
951
    # Check if node groups for locked instances are still correct
952
    _CheckInstancesNodeGroups(self.cfg, self.instances,
953
                              owned_groups, owned_nodes, self.group_uuid)
954

    
955
  def Exec(self, feedback_fn):
956
    """Verify integrity of cluster disks.
957

958
    @rtype: tuple of three items
959
    @return: a tuple of (dict of node-to-node_error, list of instances
960
        which need activate-disks, dict of instance: (node, volume) for
961
        missing volumes
962

963
    """
964
    res_nodes = {}
965
    res_instances = set()
966
    res_missing = {}
967

    
968
    nv_dict = _MapInstanceDisksToNodes(
969
      [inst for inst in self.instances.values()
970
       if inst.admin_state == constants.ADMINST_UP])
971

    
972
    if nv_dict:
973
      nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
974
                             set(self.cfg.GetVmCapableNodeList()))
975

    
976
      node_lvs = self.rpc.call_lv_list(nodes, [])
977

    
978
      for (node, node_res) in node_lvs.items():
979
        if node_res.offline:
980
          continue
981

    
982
        msg = node_res.fail_msg
983
        if msg:
984
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
985
          res_nodes[node] = msg
986
          continue
987

    
988
        for lv_name, (_, _, lv_online) in node_res.payload.items():
989
          inst = nv_dict.pop((node, lv_name), None)
990
          if not (lv_online or inst is None):
991
            res_instances.add(inst)
992

    
993
      # any leftover items in nv_dict are missing LVs, let's arrange the data
994
      # better
995
      for key, inst in nv_dict.iteritems():
996
        res_missing.setdefault(inst, []).append(list(key))
997

    
998
    return (res_nodes, list(res_instances), res_missing)
999

    
1000

    
1001
def _WaitForSync(lu, instance, disks=None, oneshot=False):
1002
  """Sleep and poll for an instance's disk to sync.
1003

1004
  """
1005
  if not instance.disks or disks is not None and not disks:
1006
    return True
1007

    
1008
  disks = _ExpandCheckDisks(instance, disks)
1009

    
1010
  if not oneshot:
1011
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1012

    
1013
  node = instance.primary_node
1014

    
1015
  for dev in disks:
1016
    lu.cfg.SetDiskID(dev, node)
1017

    
1018
  # TODO: Convert to utils.Retry
1019

    
1020
  retries = 0
1021
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1022
  while True:
1023
    max_time = 0
1024
    done = True
1025
    cumul_degraded = False
1026
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1027
    msg = rstats.fail_msg
1028
    if msg:
1029
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1030
      retries += 1
1031
      if retries >= 10:
1032
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1033
                                 " aborting." % node)
1034
      time.sleep(6)
1035
      continue
1036
    rstats = rstats.payload
1037
    retries = 0
1038
    for i, mstat in enumerate(rstats):
1039
      if mstat is None:
1040
        lu.LogWarning("Can't compute data for node %s/%s",
1041
                           node, disks[i].iv_name)
1042
        continue
1043

    
1044
      cumul_degraded = (cumul_degraded or
1045
                        (mstat.is_degraded and mstat.sync_percent is None))
1046
      if mstat.sync_percent is not None:
1047
        done = False
1048
        if mstat.estimated_time is not None:
1049
          rem_time = ("%s remaining (estimated)" %
1050
                      utils.FormatSeconds(mstat.estimated_time))
1051
          max_time = mstat.estimated_time
1052
        else:
1053
          rem_time = "no time estimate"
1054
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1055
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1056

    
1057
    # if we're done but degraded, let's do a few small retries, to
1058
    # make sure we see a stable and not transient situation; therefore
1059
    # we force restart of the loop
1060
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1061
      logging.info("Degraded disks found, %d retries left", degr_retries)
1062
      degr_retries -= 1
1063
      time.sleep(1)
1064
      continue
1065

    
1066
    if done or oneshot:
1067
      break
1068

    
1069
    time.sleep(min(60, max_time))
1070

    
1071
  if done:
1072
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1073

    
1074
  return not cumul_degraded
1075

    
1076

    
1077
def _BlockdevFind(lu, node, dev, instance):
1078
  """Wrapper around call_blockdev_find to annotate diskparams.
1079

1080
  @param lu: A reference to the lu object
1081
  @param node: The node to call out
1082
  @param dev: The device to find
1083
  @param instance: The instance object the device belongs to
1084
  @returns The result of the rpc call
1085

1086
  """
1087
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
1088
  return lu.rpc.call_blockdev_find(node, disk)
1089

    
1090

    
1091
def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1092
  """Wrapper around L{_CheckDiskConsistencyInner}.
1093

1094
  """
1095
  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
1096
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1097
                                    ldisk=ldisk)
1098

    
1099

    
1100
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1101
                               ldisk=False):
1102
  """Check that mirrors are not degraded.
1103

1104
  @attention: The device has to be annotated already.
1105

1106
  The ldisk parameter, if True, will change the test from the
1107
  is_degraded attribute (which represents overall non-ok status for
1108
  the device(s)) to the ldisk (representing the local storage status).
1109

1110
  """
1111
  lu.cfg.SetDiskID(dev, node)
1112

    
1113
  result = True
1114

    
1115
  if on_primary or dev.AssembleOnSecondary():
1116
    rstats = lu.rpc.call_blockdev_find(node, dev)
1117
    msg = rstats.fail_msg
1118
    if msg:
1119
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1120
      result = False
1121
    elif not rstats.payload:
1122
      lu.LogWarning("Can't find disk on node %s", node)
1123
      result = False
1124
    else:
1125
      if ldisk:
1126
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1127
      else:
1128
        result = result and not rstats.payload.is_degraded
1129

    
1130
  if dev.children:
1131
    for child in dev.children:
1132
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1133
                                                     on_primary)
1134

    
1135
  return result
1136

    
1137

    
1138
class LUOobCommand(NoHooksLU):
1139
  """Logical unit for OOB handling.
1140

1141
  """
1142
  REQ_BGL = False
1143
  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
1144

    
1145
  def ExpandNames(self):
1146
    """Gather locks we need.
1147

1148
    """
1149
    if self.op.node_names:
1150
      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
1151
      lock_names = self.op.node_names
1152
    else:
1153
      lock_names = locking.ALL_SET
1154

    
1155
    self.needed_locks = {
1156
      locking.LEVEL_NODE: lock_names,
1157
      }
1158

    
1159
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
1160

    
1161
    if not self.op.node_names:
1162
      # Acquire node allocation lock only if all nodes are affected
1163
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1164

    
1165
  def CheckPrereq(self):
1166
    """Check prerequisites.
1167

1168
    This checks:
1169
     - the node exists in the configuration
1170
     - OOB is supported
1171

1172
    Any errors are signaled by raising errors.OpPrereqError.
1173

1174
    """
1175
    self.nodes = []
1176
    self.master_node = self.cfg.GetMasterNode()
1177

    
1178
    assert self.op.power_delay >= 0.0
1179

    
1180
    if self.op.node_names:
1181
      if (self.op.command in self._SKIP_MASTER and
1182
          self.master_node in self.op.node_names):
1183
        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
1184
        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
1185

    
1186
        if master_oob_handler:
1187
          additional_text = ("run '%s %s %s' if you want to operate on the"
1188
                             " master regardless") % (master_oob_handler,
1189
                                                      self.op.command,
1190
                                                      self.master_node)
1191
        else:
1192
          additional_text = "it does not support out-of-band operations"
1193

    
1194
        raise errors.OpPrereqError(("Operating on the master node %s is not"
1195
                                    " allowed for %s; %s") %
1196
                                   (self.master_node, self.op.command,
1197
                                    additional_text), errors.ECODE_INVAL)
1198
    else:
1199
      self.op.node_names = self.cfg.GetNodeList()
1200
      if self.op.command in self._SKIP_MASTER:
1201
        self.op.node_names.remove(self.master_node)
1202

    
1203
    if self.op.command in self._SKIP_MASTER:
1204
      assert self.master_node not in self.op.node_names
1205

    
1206
    for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
1207
      if node is None:
1208
        raise errors.OpPrereqError("Node %s not found" % node_name,
1209
                                   errors.ECODE_NOENT)
1210
      else:
1211
        self.nodes.append(node)
1212

    
1213
      if (not self.op.ignore_status and
1214
          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
1215
        raise errors.OpPrereqError(("Cannot power off node %s because it is"
1216
                                    " not marked offline") % node_name,
1217
                                   errors.ECODE_STATE)
1218

    
1219
  def Exec(self, feedback_fn):
1220
    """Execute OOB and return result if we expect any.
1221

1222
    """
1223
    master_node = self.master_node
1224
    ret = []
1225

    
1226
    for idx, node in enumerate(utils.NiceSort(self.nodes,
1227
                                              key=lambda node: node.name)):
1228
      node_entry = [(constants.RS_NORMAL, node.name)]
1229
      ret.append(node_entry)
1230

    
1231
      oob_program = _SupportsOob(self.cfg, node)
1232

    
1233
      if not oob_program:
1234
        node_entry.append((constants.RS_UNAVAIL, None))
1235
        continue
1236

    
1237
      logging.info("Executing out-of-band command '%s' using '%s' on %s",
1238
                   self.op.command, oob_program, node.name)
1239
      result = self.rpc.call_run_oob(master_node, oob_program,
1240
                                     self.op.command, node.name,
1241
                                     self.op.timeout)
1242

    
1243
      if result.fail_msg:
1244
        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
1245
                        node.name, result.fail_msg)
1246
        node_entry.append((constants.RS_NODATA, None))
1247
      else:
1248
        try:
1249
          self._CheckPayload(result)
1250
        except errors.OpExecError, err:
1251
          self.LogWarning("Payload returned by node '%s' is not valid: %s",
1252
                          node.name, err)
1253
          node_entry.append((constants.RS_NODATA, None))
1254
        else:
1255
          if self.op.command == constants.OOB_HEALTH:
1256
            # For health we should log important events
1257
            for item, status in result.payload:
1258
              if status in [constants.OOB_STATUS_WARNING,
1259
                            constants.OOB_STATUS_CRITICAL]:
1260
                self.LogWarning("Item '%s' on node '%s' has status '%s'",
1261
                                item, node.name, status)
1262

    
1263
          if self.op.command == constants.OOB_POWER_ON:
1264
            node.powered = True
1265
          elif self.op.command == constants.OOB_POWER_OFF:
1266
            node.powered = False
1267
          elif self.op.command == constants.OOB_POWER_STATUS:
1268
            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
1269
            if powered != node.powered:
1270
              logging.warning(("Recorded power state (%s) of node '%s' does not"
1271
                               " match actual power state (%s)"), node.powered,
1272
                              node.name, powered)
1273

    
1274
          # For configuration changing commands we should update the node
1275
          if self.op.command in (constants.OOB_POWER_ON,
1276
                                 constants.OOB_POWER_OFF):
1277
            self.cfg.Update(node, feedback_fn)
1278

    
1279
          node_entry.append((constants.RS_NORMAL, result.payload))
1280

    
1281
          if (self.op.command == constants.OOB_POWER_ON and
1282
              idx < len(self.nodes) - 1):
1283
            time.sleep(self.op.power_delay)
1284

    
1285
    return ret
1286

    
1287
  def _CheckPayload(self, result):
1288
    """Checks if the payload is valid.
1289

1290
    @param result: RPC result
1291
    @raises errors.OpExecError: If payload is not valid
1292

1293
    """
1294
    errs = []
1295
    if self.op.command == constants.OOB_HEALTH:
1296
      if not isinstance(result.payload, list):
1297
        errs.append("command 'health' is expected to return a list but got %s" %
1298
                    type(result.payload))
1299
      else:
1300
        for item, status in result.payload:
1301
          if status not in constants.OOB_STATUSES:
1302
            errs.append("health item '%s' has invalid status '%s'" %
1303
                        (item, status))
1304

    
1305
    if self.op.command == constants.OOB_POWER_STATUS:
1306
      if not isinstance(result.payload, dict):
1307
        errs.append("power-status is expected to return a dict but got %s" %
1308
                    type(result.payload))
1309

    
1310
    if self.op.command in [
1311
      constants.OOB_POWER_ON,
1312
      constants.OOB_POWER_OFF,
1313
      constants.OOB_POWER_CYCLE,
1314
      ]:
1315
      if result.payload is not None:
1316
        errs.append("%s is expected to not return payload but got '%s'" %
1317
                    (self.op.command, result.payload))
1318

    
1319
    if errs:
1320
      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
1321
                               utils.CommaJoin(errs))
1322

    
1323

    
1324
class _OsQuery(_QueryBase):
1325
  FIELDS = query.OS_FIELDS
1326

    
1327
  def ExpandNames(self, lu):
1328
    # Lock all nodes in shared mode
1329
    # Temporary removal of locks, should be reverted later
1330
    # TODO: reintroduce locks when they are lighter-weight
1331
    lu.needed_locks = {}
1332
    #self.share_locks[locking.LEVEL_NODE] = 1
1333
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1334

    
1335
    # The following variables interact with _QueryBase._GetNames
1336
    if self.names:
1337
      self.wanted = self.names
1338
    else:
1339
      self.wanted = locking.ALL_SET
1340

    
1341
    self.do_locking = self.use_locking
1342

    
1343
  def DeclareLocks(self, lu, level):
1344
    pass
1345

    
1346
  @staticmethod
1347
  def _DiagnoseByOS(rlist):
1348
    """Remaps a per-node return list into an a per-os per-node dictionary
1349

1350
    @param rlist: a map with node names as keys and OS objects as values
1351

1352
    @rtype: dict
1353
    @return: a dictionary with osnames as keys and as value another
1354
        map, with nodes as keys and tuples of (path, status, diagnose,
1355
        variants, parameters, api_versions) as values, eg::
1356

1357
          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
1358
                                     (/srv/..., False, "invalid api")],
1359
                           "node2": [(/srv/..., True, "", [], [])]}
1360
          }
1361

1362
    """
1363
    all_os = {}
1364
    # we build here the list of nodes that didn't fail the RPC (at RPC
1365
    # level), so that nodes with a non-responding node daemon don't
1366
    # make all OSes invalid
1367
    good_nodes = [node_name for node_name in rlist
1368
                  if not rlist[node_name].fail_msg]
1369
    for node_name, nr in rlist.items():
1370
      if nr.fail_msg or not nr.payload:
1371
        continue
1372
      for (name, path, status, diagnose, variants,
1373
           params, api_versions) in nr.payload:
1374
        if name not in all_os:
1375
          # build a list of nodes for this os containing empty lists
1376
          # for each node in node_list
1377
          all_os[name] = {}
1378
          for nname in good_nodes:
1379
            all_os[name][nname] = []
1380
        # convert params from [name, help] to (name, help)
1381
        params = [tuple(v) for v in params]
1382
        all_os[name][node_name].append((path, status, diagnose,
1383
                                        variants, params, api_versions))
1384
    return all_os
1385

    
1386
  def _GetQueryData(self, lu):
1387
    """Computes the list of nodes and their attributes.
1388

1389
    """
1390
    # Locking is not used
1391
    assert not (compat.any(lu.glm.is_owned(level)
1392
                           for level in locking.LEVELS
1393
                           if level != locking.LEVEL_CLUSTER) or
1394
                self.do_locking or self.use_locking)
1395

    
1396
    valid_nodes = [node.name
1397
                   for node in lu.cfg.GetAllNodesInfo().values()
1398
                   if not node.offline and node.vm_capable]
1399
    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
1400
    cluster = lu.cfg.GetClusterInfo()
1401

    
1402
    data = {}
1403

    
1404
    for (os_name, os_data) in pol.items():
1405
      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
1406
                          hidden=(os_name in cluster.hidden_os),
1407
                          blacklisted=(os_name in cluster.blacklisted_os))
1408

    
1409
      variants = set()
1410
      parameters = set()
1411
      api_versions = set()
1412

    
1413
      for idx, osl in enumerate(os_data.values()):
1414
        info.valid = bool(info.valid and osl and osl[0][1])
1415
        if not info.valid:
1416
          break
1417

    
1418
        (node_variants, node_params, node_api) = osl[0][3:6]
1419
        if idx == 0:
1420
          # First entry
1421
          variants.update(node_variants)
1422
          parameters.update(node_params)
1423
          api_versions.update(node_api)
1424
        else:
1425
          # Filter out inconsistent values
1426
          variants.intersection_update(node_variants)
1427
          parameters.intersection_update(node_params)
1428
          api_versions.intersection_update(node_api)
1429

    
1430
      info.variants = list(variants)
1431
      info.parameters = list(parameters)
1432
      info.api_versions = list(api_versions)
1433

    
1434
      data[os_name] = info
1435

    
1436
    # Prepare data in requested order
1437
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1438
            if name in data]
1439

    
1440

    
1441
class LUOsDiagnose(NoHooksLU):
1442
  """Logical unit for OS diagnose/query.
1443

1444
  """
1445
  REQ_BGL = False
1446

    
1447
  @staticmethod
1448
  def _BuildFilter(fields, names):
1449
    """Builds a filter for querying OSes.
1450

1451
    """
1452
    name_filter = qlang.MakeSimpleFilter("name", names)
1453

    
1454
    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
1455
    # respective field is not requested
1456
    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
1457
                     for fname in ["hidden", "blacklisted"]
1458
                     if fname not in fields]
1459
    if "valid" not in fields:
1460
      status_filter.append([qlang.OP_TRUE, "valid"])
1461

    
1462
    if status_filter:
1463
      status_filter.insert(0, qlang.OP_AND)
1464
    else:
1465
      status_filter = None
1466

    
1467
    if name_filter and status_filter:
1468
      return [qlang.OP_AND, name_filter, status_filter]
1469
    elif name_filter:
1470
      return name_filter
1471
    else:
1472
      return status_filter
1473

    
1474
  def CheckArguments(self):
1475
    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
1476
                       self.op.output_fields, False)
1477

    
1478
  def ExpandNames(self):
1479
    self.oq.ExpandNames(self)
1480

    
1481
  def Exec(self, feedback_fn):
1482
    return self.oq.OldStyleQuery(self)
1483

    
1484

    
1485
class _ExtStorageQuery(_QueryBase):
1486
  FIELDS = query.EXTSTORAGE_FIELDS
1487

    
1488
  def ExpandNames(self, lu):
1489
    # Lock all nodes in shared mode
1490
    # Temporary removal of locks, should be reverted later
1491
    # TODO: reintroduce locks when they are lighter-weight
1492
    lu.needed_locks = {}
1493
    #self.share_locks[locking.LEVEL_NODE] = 1
1494
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1495

    
1496
    # The following variables interact with _QueryBase._GetNames
1497
    if self.names:
1498
      self.wanted = self.names
1499
    else:
1500
      self.wanted = locking.ALL_SET
1501

    
1502
    self.do_locking = self.use_locking
1503

    
1504
  def DeclareLocks(self, lu, level):
1505
    pass
1506

    
1507
  @staticmethod
1508
  def _DiagnoseByProvider(rlist):
1509
    """Remaps a per-node return list into an a per-provider per-node dictionary
1510

1511
    @param rlist: a map with node names as keys and ExtStorage objects as values
1512

1513
    @rtype: dict
1514
    @return: a dictionary with extstorage providers as keys and as
1515
        value another map, with nodes as keys and tuples of
1516
        (path, status, diagnose, parameters) as values, eg::
1517

1518
          {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
1519
                         "node2": [(/srv/..., False, "missing file")]
1520
                         "node3": [(/srv/..., True, "", [])]
1521
          }
1522

1523
    """
1524
    all_es = {}
1525
    # we build here the list of nodes that didn't fail the RPC (at RPC
1526
    # level), so that nodes with a non-responding node daemon don't
1527
    # make all OSes invalid
1528
    good_nodes = [node_name for node_name in rlist
1529
                  if not rlist[node_name].fail_msg]
1530
    for node_name, nr in rlist.items():
1531
      if nr.fail_msg or not nr.payload:
1532
        continue
1533
      for (name, path, status, diagnose, params) in nr.payload:
1534
        if name not in all_es:
1535
          # build a list of nodes for this os containing empty lists
1536
          # for each node in node_list
1537
          all_es[name] = {}
1538
          for nname in good_nodes:
1539
            all_es[name][nname] = []
1540
        # convert params from [name, help] to (name, help)
1541
        params = [tuple(v) for v in params]
1542
        all_es[name][node_name].append((path, status, diagnose, params))
1543
    return all_es
1544

    
1545
  def _GetQueryData(self, lu):
1546
    """Computes the list of nodes and their attributes.
1547

1548
    """
1549
    # Locking is not used
1550
    assert not (compat.any(lu.glm.is_owned(level)
1551
                           for level in locking.LEVELS
1552
                           if level != locking.LEVEL_CLUSTER) or
1553
                self.do_locking or self.use_locking)
1554

    
1555
    valid_nodes = [node.name
1556
                   for node in lu.cfg.GetAllNodesInfo().values()
1557
                   if not node.offline and node.vm_capable]
1558
    pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
1559

    
1560
    data = {}
1561

    
1562
    nodegroup_list = lu.cfg.GetNodeGroupList()
1563

    
1564
    for (es_name, es_data) in pol.items():
1565
      # For every provider compute the nodegroup validity.
1566
      # To do this we need to check the validity of each node in es_data
1567
      # and then construct the corresponding nodegroup dict:
1568
      #      { nodegroup1: status
1569
      #        nodegroup2: status
1570
      #      }
1571
      ndgrp_data = {}
1572
      for nodegroup in nodegroup_list:
1573
        ndgrp = lu.cfg.GetNodeGroup(nodegroup)
1574

    
1575
        nodegroup_nodes = ndgrp.members
1576
        nodegroup_name = ndgrp.name
1577
        node_statuses = []
1578

    
1579
        for node in nodegroup_nodes:
1580
          if node in valid_nodes:
1581
            if es_data[node] != []:
1582
              node_status = es_data[node][0][1]
1583
              node_statuses.append(node_status)
1584
            else:
1585
              node_statuses.append(False)
1586

    
1587
        if False in node_statuses:
1588
          ndgrp_data[nodegroup_name] = False
1589
        else:
1590
          ndgrp_data[nodegroup_name] = True
1591

    
1592
      # Compute the provider's parameters
1593
      parameters = set()
1594
      for idx, esl in enumerate(es_data.values()):
1595
        valid = bool(esl and esl[0][1])
1596
        if not valid:
1597
          break
1598

    
1599
        node_params = esl[0][3]
1600
        if idx == 0:
1601
          # First entry
1602
          parameters.update(node_params)
1603
        else:
1604
          # Filter out inconsistent values
1605
          parameters.intersection_update(node_params)
1606

    
1607
      params = list(parameters)
1608

    
1609
      # Now fill all the info for this provider
1610
      info = query.ExtStorageInfo(name=es_name, node_status=es_data,
1611
                                  nodegroup_status=ndgrp_data,
1612
                                  parameters=params)
1613

    
1614
      data[es_name] = info
1615

    
1616
    # Prepare data in requested order
1617
    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1618
            if name in data]
1619

    
1620

    
1621
class LUExtStorageDiagnose(NoHooksLU):
1622
  """Logical unit for ExtStorage diagnose/query.
1623

1624
  """
1625
  REQ_BGL = False
1626

    
1627
  def CheckArguments(self):
1628
    self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
1629
                               self.op.output_fields, False)
1630

    
1631
  def ExpandNames(self):
1632
    self.eq.ExpandNames(self)
1633

    
1634
  def Exec(self, feedback_fn):
1635
    return self.eq.OldStyleQuery(self)
1636

    
1637

    
1638
class LUNodeRemove(LogicalUnit):
1639
  """Logical unit for removing a node.
1640

1641
  """
1642
  HPATH = "node-remove"
1643
  HTYPE = constants.HTYPE_NODE
1644

    
1645
  def BuildHooksEnv(self):
1646
    """Build hooks env.
1647

1648
    """
1649
    return {
1650
      "OP_TARGET": self.op.node_name,
1651
      "NODE_NAME": self.op.node_name,
1652
      }
1653

    
1654
  def BuildHooksNodes(self):
1655
    """Build hooks nodes.
1656

1657
    This doesn't run on the target node in the pre phase as a failed
1658
    node would then be impossible to remove.
1659

1660
    """
1661
    all_nodes = self.cfg.GetNodeList()
1662
    try:
1663
      all_nodes.remove(self.op.node_name)
1664
    except ValueError:
1665
      pass
1666
    return (all_nodes, all_nodes)
1667

    
1668
  def CheckPrereq(self):
1669
    """Check prerequisites.
1670

1671
    This checks:
1672
     - the node exists in the configuration
1673
     - it does not have primary or secondary instances
1674
     - it's not the master
1675

1676
    Any errors are signaled by raising errors.OpPrereqError.
1677

1678
    """
1679
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1680
    node = self.cfg.GetNodeInfo(self.op.node_name)
1681
    assert node is not None
1682

    
1683
    masternode = self.cfg.GetMasterNode()
1684
    if node.name == masternode:
1685
      raise errors.OpPrereqError("Node is the master node, failover to another"
1686
                                 " node is required", errors.ECODE_INVAL)
1687

    
1688
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1689
      if node.name in instance.all_nodes:
1690
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1691
                                   " please remove first" % instance_name,
1692
                                   errors.ECODE_INVAL)
1693
    self.op.node_name = node.name
1694
    self.node = node
1695

    
1696
  def Exec(self, feedback_fn):
1697
    """Removes the node from the cluster.
1698

1699
    """
1700
    node = self.node
1701
    logging.info("Stopping the node daemon and removing configs from node %s",
1702
                 node.name)
1703

    
1704
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1705

    
1706
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1707
      "Not owning BGL"
1708

    
1709
    # Promote nodes to master candidate as needed
1710
    _AdjustCandidatePool(self, exceptions=[node.name])
1711
    self.context.RemoveNode(node.name)
1712

    
1713
    # Run post hooks on the node before it's removed
1714
    _RunPostHook(self, node.name)
1715

    
1716
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1717
    msg = result.fail_msg
1718
    if msg:
1719
      self.LogWarning("Errors encountered on the remote node while leaving"
1720
                      " the cluster: %s", msg)
1721

    
1722
    # Remove node from our /etc/hosts
1723
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1724
      master_node = self.cfg.GetMasterNode()
1725
      result = self.rpc.call_etc_hosts_modify(master_node,
1726
                                              constants.ETC_HOSTS_REMOVE,
1727
                                              node.name, None)
1728
      result.Raise("Can't update hosts file with new host data")
1729
      _RedistributeAncillaryFiles(self)
1730

    
1731

    
1732
class _NodeQuery(_QueryBase):
1733
  FIELDS = query.NODE_FIELDS
1734

    
1735
  def ExpandNames(self, lu):
1736
    lu.needed_locks = {}
1737
    lu.share_locks = _ShareAll()
1738

    
1739
    if self.names:
1740
      self.wanted = _GetWantedNodes(lu, self.names)
1741
    else:
1742
      self.wanted = locking.ALL_SET
1743

    
1744
    self.do_locking = (self.use_locking and
1745
                       query.NQ_LIVE in self.requested_data)
1746

    
1747
    if self.do_locking:
1748
      # If any non-static field is requested we need to lock the nodes
1749
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1750
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1751

    
1752
  def DeclareLocks(self, lu, level):
1753
    pass
1754

    
1755
  def _GetQueryData(self, lu):
1756
    """Computes the list of nodes and their attributes.
1757

1758
    """
1759
    all_info = lu.cfg.GetAllNodesInfo()
1760

    
1761
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1762

    
1763
    # Gather data as requested
1764
    if query.NQ_LIVE in self.requested_data:
1765
      # filter out non-vm_capable nodes
1766
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1767

    
1768
      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1769
      node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
1770
                                        [lu.cfg.GetHypervisorType()], es_flags)
1771
      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1772
                       for (name, nresult) in node_data.items()
1773
                       if not nresult.fail_msg and nresult.payload)
1774
    else:
1775
      live_data = None
1776

    
1777
    if query.NQ_INST in self.requested_data:
1778
      node_to_primary = dict([(name, set()) for name in nodenames])
1779
      node_to_secondary = dict([(name, set()) for name in nodenames])
1780

    
1781
      inst_data = lu.cfg.GetAllInstancesInfo()
1782

    
1783
      for inst in inst_data.values():
1784
        if inst.primary_node in node_to_primary:
1785
          node_to_primary[inst.primary_node].add(inst.name)
1786
        for secnode in inst.secondary_nodes:
1787
          if secnode in node_to_secondary:
1788
            node_to_secondary[secnode].add(inst.name)
1789
    else:
1790
      node_to_primary = None
1791
      node_to_secondary = None
1792

    
1793
    if query.NQ_OOB in self.requested_data:
1794
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
1795
                         for name, node in all_info.iteritems())
1796
    else:
1797
      oob_support = None
1798

    
1799
    if query.NQ_GROUP in self.requested_data:
1800
      groups = lu.cfg.GetAllNodeGroupsInfo()
1801
    else:
1802
      groups = {}
1803

    
1804
    return query.NodeQueryData([all_info[name] for name in nodenames],
1805
                               live_data, lu.cfg.GetMasterNode(),
1806
                               node_to_primary, node_to_secondary, groups,
1807
                               oob_support, lu.cfg.GetClusterInfo())
1808

    
1809

    
1810
class LUNodeQuery(NoHooksLU):
1811
  """Logical unit for querying nodes.
1812

1813
  """
1814
  # pylint: disable=W0142
1815
  REQ_BGL = False
1816

    
1817
  def CheckArguments(self):
1818
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1819
                         self.op.output_fields, self.op.use_locking)
1820

    
1821
  def ExpandNames(self):
1822
    self.nq.ExpandNames(self)
1823

    
1824
  def DeclareLocks(self, level):
1825
    self.nq.DeclareLocks(self, level)
1826

    
1827
  def Exec(self, feedback_fn):
1828
    return self.nq.OldStyleQuery(self)
1829

    
1830

    
1831
class LUNodeQueryvols(NoHooksLU):
1832
  """Logical unit for getting volumes on node(s).
1833

1834
  """
1835
  REQ_BGL = False
1836
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1837
  _FIELDS_STATIC = utils.FieldSet("node")
1838

    
1839
  def CheckArguments(self):
1840
    _CheckOutputFields(static=self._FIELDS_STATIC,
1841
                       dynamic=self._FIELDS_DYNAMIC,
1842
                       selected=self.op.output_fields)
1843

    
1844
  def ExpandNames(self):
1845
    self.share_locks = _ShareAll()
1846

    
1847
    if self.op.nodes:
1848
      self.needed_locks = {
1849
        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1850
        }
1851
    else:
1852
      self.needed_locks = {
1853
        locking.LEVEL_NODE: locking.ALL_SET,
1854
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1855
        }
1856

    
1857
  def Exec(self, feedback_fn):
1858
    """Computes the list of nodes and their attributes.
1859

1860
    """
1861
    nodenames = self.owned_locks(locking.LEVEL_NODE)
1862
    volumes = self.rpc.call_node_volumes(nodenames)
1863

    
1864
    ilist = self.cfg.GetAllInstancesInfo()
1865
    vol2inst = _MapInstanceDisksToNodes(ilist.values())
1866

    
1867
    output = []
1868
    for node in nodenames:
1869
      nresult = volumes[node]
1870
      if nresult.offline:
1871
        continue
1872
      msg = nresult.fail_msg
1873
      if msg:
1874
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1875
        continue
1876

    
1877
      node_vols = sorted(nresult.payload,
1878
                         key=operator.itemgetter("dev"))
1879

    
1880
      for vol in node_vols:
1881
        node_output = []
1882
        for field in self.op.output_fields:
1883
          if field == "node":
1884
            val = node
1885
          elif field == "phys":
1886
            val = vol["dev"]
1887
          elif field == "vg":
1888
            val = vol["vg"]
1889
          elif field == "name":
1890
            val = vol["name"]
1891
          elif field == "size":
1892
            val = int(float(vol["size"]))
1893
          elif field == "instance":
1894
            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1895
          else:
1896
            raise errors.ParameterError(field)
1897
          node_output.append(str(val))
1898

    
1899
        output.append(node_output)
1900

    
1901
    return output
1902

    
1903

    
1904
class LUNodeQueryStorage(NoHooksLU):
1905
  """Logical unit for getting information on storage units on node(s).
1906

1907
  """
1908
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1909
  REQ_BGL = False
1910

    
1911
  def CheckArguments(self):
1912
    _CheckOutputFields(static=self._FIELDS_STATIC,
1913
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1914
                       selected=self.op.output_fields)
1915

    
1916
  def ExpandNames(self):
1917
    self.share_locks = _ShareAll()
1918

    
1919
    if self.op.nodes:
1920
      self.needed_locks = {
1921
        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1922
        }
1923
    else:
1924
      self.needed_locks = {
1925
        locking.LEVEL_NODE: locking.ALL_SET,
1926
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1927
        }
1928

    
1929
  def Exec(self, feedback_fn):
1930
    """Computes the list of nodes and their attributes.
1931

1932
    """
1933
    self.nodes = self.owned_locks(locking.LEVEL_NODE)
1934

    
1935
    # Always get name to sort by
1936
    if constants.SF_NAME in self.op.output_fields:
1937
      fields = self.op.output_fields[:]
1938
    else:
1939
      fields = [constants.SF_NAME] + self.op.output_fields
1940

    
1941
    # Never ask for node or type as it's only known to the LU
1942
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1943
      while extra in fields:
1944
        fields.remove(extra)
1945

    
1946
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1947
    name_idx = field_idx[constants.SF_NAME]
1948

    
1949
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1950
    data = self.rpc.call_storage_list(self.nodes,
1951
                                      self.op.storage_type, st_args,
1952
                                      self.op.name, fields)
1953

    
1954
    result = []
1955

    
1956
    for node in utils.NiceSort(self.nodes):
1957
      nresult = data[node]
1958
      if nresult.offline:
1959
        continue
1960

    
1961
      msg = nresult.fail_msg
1962
      if msg:
1963
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1964
        continue
1965

    
1966
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1967

    
1968
      for name in utils.NiceSort(rows.keys()):
1969
        row = rows[name]
1970

    
1971
        out = []
1972

    
1973
        for field in self.op.output_fields:
1974
          if field == constants.SF_NODE:
1975
            val = node
1976
          elif field == constants.SF_TYPE:
1977
            val = self.op.storage_type
1978
          elif field in field_idx:
1979
            val = row[field_idx[field]]
1980
          else:
1981
            raise errors.ParameterError(field)
1982

    
1983
          out.append(val)
1984

    
1985
        result.append(out)
1986

    
1987
    return result
1988

    
1989

    
1990
class _InstanceQuery(_QueryBase):
1991
  FIELDS = query.INSTANCE_FIELDS
1992

    
1993
  def ExpandNames(self, lu):
1994
    lu.needed_locks = {}
1995
    lu.share_locks = _ShareAll()
1996

    
1997
    if self.names:
1998
      self.wanted = _GetWantedInstances(lu, self.names)
1999
    else:
2000
      self.wanted = locking.ALL_SET
2001

    
2002
    self.do_locking = (self.use_locking and
2003
                       query.IQ_LIVE in self.requested_data)
2004
    if self.do_locking:
2005
      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2006
      lu.needed_locks[locking.LEVEL_NODEGROUP] = []
2007
      lu.needed_locks[locking.LEVEL_NODE] = []
2008
      lu.needed_locks[locking.LEVEL_NETWORK] = []
2009
      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2010

    
2011
    self.do_grouplocks = (self.do_locking and
2012
                          query.IQ_NODES in self.requested_data)
2013

    
2014
  def DeclareLocks(self, lu, level):
2015
    if self.do_locking:
2016
      if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
2017
        assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
2018

    
2019
        # Lock all groups used by instances optimistically; this requires going
2020
        # via the node before it's locked, requiring verification later on
2021
        lu.needed_locks[locking.LEVEL_NODEGROUP] = \
2022
          set(group_uuid
2023
              for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2024
              for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
2025
      elif level == locking.LEVEL_NODE:
2026
        lu._LockInstancesNodes() # pylint: disable=W0212
2027

    
2028
      elif level == locking.LEVEL_NETWORK:
2029
        lu.needed_locks[locking.LEVEL_NETWORK] = \
2030
          frozenset(net_uuid
2031
                    for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2032
                    for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
2033

    
2034
  @staticmethod
2035
  def _CheckGroupLocks(lu):
2036
    owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
2037
    owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
2038

    
2039
    # Check if node groups for locked instances are still correct
2040
    for instance_name in owned_instances:
2041
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
2042

    
2043
  def _GetQueryData(self, lu):
2044
    """Computes the list of instances and their attributes.
2045

2046
    """
2047
    if self.do_grouplocks:
2048
      self._CheckGroupLocks(lu)
2049

    
2050
    cluster = lu.cfg.GetClusterInfo()
2051
    all_info = lu.cfg.GetAllInstancesInfo()
2052

    
2053
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
2054

    
2055
    instance_list = [all_info[name] for name in instance_names]
2056
    nodes = frozenset(itertools.chain(*(inst.all_nodes
2057
                                        for inst in instance_list)))
2058
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2059
    bad_nodes = []
2060
    offline_nodes = []
2061
    wrongnode_inst = set()
2062

    
2063
    # Gather data as requested
2064
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
2065
      live_data = {}
2066
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
2067
      for name in nodes:
2068
        result = node_data[name]
2069
        if result.offline:
2070
          # offline nodes will be in both lists
2071
          assert result.fail_msg
2072
          offline_nodes.append(name)
2073
        if result.fail_msg:
2074
          bad_nodes.append(name)
2075
        elif result.payload:
2076
          for inst in result.payload:
2077
            if inst in all_info:
2078
              if all_info[inst].primary_node == name:
2079
                live_data.update(result.payload)
2080
              else:
2081
                wrongnode_inst.add(inst)
2082
            else:
2083
              # orphan instance; we don't list it here as we don't
2084
              # handle this case yet in the output of instance listing
2085
              logging.warning("Orphan instance '%s' found on node %s",
2086
                              inst, name)
2087
        # else no instance is alive
2088
    else:
2089
      live_data = {}
2090

    
2091
    if query.IQ_DISKUSAGE in self.requested_data:
2092
      gmi = ganeti.masterd.instance
2093
      disk_usage = dict((inst.name,
2094
                         gmi.ComputeDiskSize(inst.disk_template,
2095
                                             [{constants.IDISK_SIZE: disk.size}
2096
                                              for disk in inst.disks]))
2097
                        for inst in instance_list)
2098
    else:
2099
      disk_usage = None
2100

    
2101
    if query.IQ_CONSOLE in self.requested_data:
2102
      consinfo = {}
2103
      for inst in instance_list:
2104
        if inst.name in live_data:
2105
          # Instance is running
2106
          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2107
        else:
2108
          consinfo[inst.name] = None
2109
      assert set(consinfo.keys()) == set(instance_names)
2110
    else:
2111
      consinfo = None
2112

    
2113
    if query.IQ_NODES in self.requested_data:
2114
      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2115
                                            instance_list)))
2116
      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2117
      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2118
                    for uuid in set(map(operator.attrgetter("group"),
2119
                                        nodes.values())))
2120
    else:
2121
      nodes = None
2122
      groups = None
2123

    
2124
    if query.IQ_NETWORKS in self.requested_data:
2125
      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2126
                                    for i in instance_list))
2127
      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2128
    else:
2129
      networks = None
2130

    
2131
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2132
                                   disk_usage, offline_nodes, bad_nodes,
2133
                                   live_data, wrongnode_inst, consinfo,
2134
                                   nodes, groups, networks)
2135

    
2136

    
2137
class LUQuery(NoHooksLU):
2138
  """Query for resources/items of a certain kind.
2139

2140
  """
2141
  # pylint: disable=W0142
2142
  REQ_BGL = False
2143

    
2144
  def CheckArguments(self):
2145
    qcls = _GetQueryImplementation(self.op.what)
2146

    
2147
    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
2148

    
2149
  def ExpandNames(self):
2150
    self.impl.ExpandNames(self)
2151

    
2152
  def DeclareLocks(self, level):
2153
    self.impl.DeclareLocks(self, level)
2154

    
2155
  def Exec(self, feedback_fn):
2156
    return self.impl.NewStyleQuery(self)
2157

    
2158

    
2159
class LUQueryFields(NoHooksLU):
2160
  """Query for resources/items of a certain kind.
2161

2162
  """
2163
  # pylint: disable=W0142
2164
  REQ_BGL = False
2165

    
2166
  def CheckArguments(self):
2167
    self.qcls = _GetQueryImplementation(self.op.what)
2168

    
2169
  def ExpandNames(self):
2170
    self.needed_locks = {}
2171

    
2172
  def Exec(self, feedback_fn):
2173
    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
2174

    
2175

    
2176
class LUNodeModifyStorage(NoHooksLU):
2177
  """Logical unit for modifying a storage volume on a node.
2178

2179
  """
2180
  REQ_BGL = False
2181

    
2182
  def CheckArguments(self):
2183
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2184

    
2185
    storage_type = self.op.storage_type
2186

    
2187
    try:
2188
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2189
    except KeyError:
2190
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
2191
                                 " modified" % storage_type,
2192
                                 errors.ECODE_INVAL)
2193

    
2194
    diff = set(self.op.changes.keys()) - modifiable
2195
    if diff:
2196
      raise errors.OpPrereqError("The following fields can not be modified for"
2197
                                 " storage units of type '%s': %r" %
2198
                                 (storage_type, list(diff)),
2199
                                 errors.ECODE_INVAL)
2200

    
2201
  def ExpandNames(self):
2202
    self.needed_locks = {
2203
      locking.LEVEL_NODE: self.op.node_name,
2204
      }
2205

    
2206
  def Exec(self, feedback_fn):
2207
    """Computes the list of nodes and their attributes.
2208

2209
    """
2210
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2211
    result = self.rpc.call_storage_modify(self.op.node_name,
2212
                                          self.op.storage_type, st_args,
2213
                                          self.op.name, self.op.changes)
2214
    result.Raise("Failed to modify storage unit '%s' on %s" %
2215
                 (self.op.name, self.op.node_name))
2216

    
2217

    
2218
class LUNodeAdd(LogicalUnit):
2219
  """Logical unit for adding node to the cluster.
2220

2221
  """
2222
  HPATH = "node-add"
2223
  HTYPE = constants.HTYPE_NODE
2224
  _NFLAGS = ["master_capable", "vm_capable"]
2225

    
2226
  def CheckArguments(self):
2227
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
2228
    # validate/normalize the node name
2229
    self.hostname = netutils.GetHostname(name=self.op.node_name,
2230
                                         family=self.primary_ip_family)
2231
    self.op.node_name = self.hostname.name
2232

    
2233
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
2234
      raise errors.OpPrereqError("Cannot readd the master node",
2235
                                 errors.ECODE_STATE)
2236

    
2237
    if self.op.readd and self.op.group:
2238
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
2239
                                 " being readded", errors.ECODE_INVAL)
2240

    
2241
  def BuildHooksEnv(self):
2242
    """Build hooks env.
2243

2244
    This will run on all nodes before, and on all nodes + the new node after.
2245

2246
    """
2247
    return {
2248
      "OP_TARGET": self.op.node_name,
2249
      "NODE_NAME": self.op.node_name,
2250
      "NODE_PIP": self.op.primary_ip,
2251
      "NODE_SIP": self.op.secondary_ip,
2252
      "MASTER_CAPABLE": str(self.op.master_capable),
2253
      "VM_CAPABLE": str(self.op.vm_capable),
2254
      }
2255

    
2256
  def BuildHooksNodes(self):
2257
    """Build hooks nodes.
2258

2259
    """
2260
    # Exclude added node
2261
    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
2262
    post_nodes = pre_nodes + [self.op.node_name, ]
2263

    
2264
    return (pre_nodes, post_nodes)
2265

    
2266
  def CheckPrereq(self):
2267
    """Check prerequisites.
2268

2269
    This checks:
2270
     - the new node is not already in the config
2271
     - it is resolvable
2272
     - its parameters (single/dual homed) matches the cluster
2273

2274
    Any errors are signaled by raising errors.OpPrereqError.
2275

2276
    """
2277
    cfg = self.cfg
2278
    hostname = self.hostname
2279
    node = hostname.name
2280
    primary_ip = self.op.primary_ip = hostname.ip
2281
    if self.op.secondary_ip is None:
2282
      if self.primary_ip_family == netutils.IP6Address.family:
2283
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
2284
                                   " IPv4 address must be given as secondary",
2285
                                   errors.ECODE_INVAL)
2286
      self.op.secondary_ip = primary_ip
2287

    
2288
    secondary_ip = self.op.secondary_ip
2289
    if not netutils.IP4Address.IsValid(secondary_ip):
2290
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
2291
                                 " address" % secondary_ip, errors.ECODE_INVAL)
2292

    
2293
    node_list = cfg.GetNodeList()
2294
    if not self.op.readd and node in node_list:
2295
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2296
                                 node, errors.ECODE_EXISTS)
2297
    elif self.op.readd and node not in node_list:
2298
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2299
                                 errors.ECODE_NOENT)
2300

    
2301
    self.changed_primary_ip = False
2302

    
2303
    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
2304
      if self.op.readd and node == existing_node_name:
2305
        if existing_node.secondary_ip != secondary_ip:
2306
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2307
                                     " address configuration as before",
2308
                                     errors.ECODE_INVAL)
2309
        if existing_node.primary_ip != primary_ip:
2310
          self.changed_primary_ip = True
2311

    
2312
        continue
2313

    
2314
      if (existing_node.primary_ip == primary_ip or
2315
          existing_node.secondary_ip == primary_ip or
2316
          existing_node.primary_ip == secondary_ip or
2317
          existing_node.secondary_ip == secondary_ip):
2318
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2319
                                   " existing node %s" % existing_node.name,
2320
                                   errors.ECODE_NOTUNIQUE)
2321

    
2322
    # After this 'if' block, None is no longer a valid value for the
2323
    # _capable op attributes
2324
    if self.op.readd:
2325
      old_node = self.cfg.GetNodeInfo(node)
2326
      assert old_node is not None, "Can't retrieve locked node %s" % node
2327
      for attr in self._NFLAGS:
2328
        if getattr(self.op, attr) is None:
2329
          setattr(self.op, attr, getattr(old_node, attr))
2330
    else:
2331
      for attr in self._NFLAGS:
2332
        if getattr(self.op, attr) is None:
2333
          setattr(self.op, attr, True)
2334

    
2335
    if self.op.readd and not self.op.vm_capable:
2336
      pri, sec = cfg.GetNodeInstances(node)
2337
      if pri or sec:
2338
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
2339
                                   " flag set to false, but it already holds"
2340
                                   " instances" % node,
2341
                                   errors.ECODE_STATE)
2342

    
2343
    # check that the type of the node (single versus dual homed) is the
2344
    # same as for the master
2345
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2346
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2347
    newbie_singlehomed = secondary_ip == primary_ip
2348
    if master_singlehomed != newbie_singlehomed:
2349
      if master_singlehomed:
2350
        raise errors.OpPrereqError("The master has no secondary ip but the"
2351
                                   " new node has one",
2352
                                   errors.ECODE_INVAL)
2353
      else:
2354
        raise errors.OpPrereqError("The master has a secondary ip but the"
2355
                                   " new node doesn't have one",
2356
                                   errors.ECODE_INVAL)
2357

    
2358
    # checks reachability
2359
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2360
      raise errors.OpPrereqError("Node not reachable by ping",
2361
                                 errors.ECODE_ENVIRON)
2362

    
2363
    if not newbie_singlehomed:
2364
      # check reachability from my secondary ip to newbie's secondary ip
2365
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2366
                              source=myself.secondary_ip):
2367
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2368
                                   " based ping to node daemon port",
2369
                                   errors.ECODE_ENVIRON)
2370

    
2371
    if self.op.readd:
2372
      exceptions = [node]
2373
    else:
2374
      exceptions = []
2375

    
2376
    if self.op.master_capable:
2377
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2378
    else:
2379
      self.master_candidate = False
2380

    
2381
    if self.op.readd:
2382
      self.new_node = old_node
2383
    else:
2384
      node_group = cfg.LookupNodeGroup(self.op.group)
2385
      self.new_node = objects.Node(name=node,
2386
                                   primary_ip=primary_ip,
2387
                                   secondary_ip=secondary_ip,
2388
                                   master_candidate=self.master_candidate,
2389
                                   offline=False, drained=False,
2390
                                   group=node_group, ndparams={})
2391

    
2392
    if self.op.ndparams:
2393
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2394
      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
2395
                            "node", "cluster or group")
2396

    
2397
    if self.op.hv_state:
2398
      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
2399

    
2400
    if self.op.disk_state:
2401
      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
2402

    
2403
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
2404
    #       it a property on the base class.
2405
    rpcrunner = rpc.DnsOnlyRunner()
2406
    result = rpcrunner.call_version([node])[node]
2407
    result.Raise("Can't get version information from node %s" % node)
2408
    if constants.PROTOCOL_VERSION == result.payload:
2409
      logging.info("Communication to node %s fine, sw version %s match",
2410
                   node, result.payload)
2411
    else:
2412
      raise errors.OpPrereqError("Version mismatch master version %s,"
2413
                                 " node version %s" %
2414
                                 (constants.PROTOCOL_VERSION, result.payload),
2415
                                 errors.ECODE_ENVIRON)
2416

    
2417
    vg_name = cfg.GetVGName()
2418
    if vg_name is not None:
2419
      vparams = {constants.NV_PVLIST: [vg_name]}
2420
      excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
2421
      cname = self.cfg.GetClusterName()
2422
      result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
2423
      (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
2424
      if errmsgs:
2425
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
2426
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Adds the new node to the cluster.
2430

2431
    """
2432
    new_node = self.new_node
2433
    node = new_node.name
2434

    
2435
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
2436
      "Not owning BGL"
2437

    
2438
    # We adding a new node so we assume it's powered
2439
    new_node.powered = True
2440

    
2441
    # for re-adds, reset the offline/drained/master-candidate flags;
2442
    # we need to reset here, otherwise offline would prevent RPC calls
2443
    # later in the procedure; this also means that if the re-add
2444
    # fails, we are left with a non-offlined, broken node
2445
    if self.op.readd:
2446
      new_node.drained = new_node.offline = False # pylint: disable=W0201
2447
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2448
      # if we demote the node, we do cleanup later in the procedure
2449
      new_node.master_candidate = self.master_candidate
2450
      if self.changed_primary_ip:
2451
        new_node.primary_ip = self.op.primary_ip
2452

    
2453
    # copy the master/vm_capable flags
2454
    for attr in self._NFLAGS:
2455
      setattr(new_node, attr, getattr(self.op, attr))
2456

    
2457
    # notify the user about any possible mc promotion
2458
    if new_node.master_candidate:
2459
      self.LogInfo("Node will be a master candidate")
2460

    
2461
    if self.op.ndparams:
2462
      new_node.ndparams = self.op.ndparams
2463
    else:
2464
      new_node.ndparams = {}
2465

    
2466
    if self.op.hv_state:
2467
      new_node.hv_state_static = self.new_hv_state
2468

    
2469
    if self.op.disk_state:
2470
      new_node.disk_state_static = self.new_disk_state
2471

    
2472
    # Add node to our /etc/hosts, and add key to known_hosts
2473
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2474
      master_node = self.cfg.GetMasterNode()
2475
      result = self.rpc.call_etc_hosts_modify(master_node,
2476
                                              constants.ETC_HOSTS_ADD,
2477
                                              self.hostname.name,
2478
                                              self.hostname.ip)
2479
      result.Raise("Can't update hosts file with new host data")
2480

    
2481
    if new_node.secondary_ip != new_node.primary_ip:
2482
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
2483
                               False)
2484

    
2485
    node_verify_list = [self.cfg.GetMasterNode()]
2486
    node_verify_param = {
2487
      constants.NV_NODELIST: ([node], {}),
2488
      # TODO: do a node-net-test as well?
2489
    }
2490

    
2491
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2492
                                       self.cfg.GetClusterName())
2493
    for verifier in node_verify_list:
2494
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2495
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
2496
      if nl_payload:
2497
        for failed in nl_payload:
2498
          feedback_fn("ssh/hostname verification failed"
2499
                      " (checking from %s): %s" %
2500
                      (verifier, nl_payload[failed]))
2501
        raise errors.OpExecError("ssh/hostname verification failed")
2502

    
2503
    if self.op.readd:
2504
      _RedistributeAncillaryFiles(self)
2505
      self.context.ReaddNode(new_node)
2506
      # make sure we redistribute the config
2507
      self.cfg.Update(new_node, feedback_fn)
2508
      # and make sure the new node will not have old files around
2509
      if not new_node.master_candidate:
2510
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2511
        msg = result.fail_msg
2512
        if msg:
2513
          self.LogWarning("Node failed to demote itself from master"
2514
                          " candidate status: %s" % msg)
2515
    else:
2516
      _RedistributeAncillaryFiles(self, additional_nodes=[node],
2517
                                  additional_vm=self.op.vm_capable)
2518
      self.context.AddNode(new_node, self.proc.GetECId())
2519

    
2520

    
2521
class LUNodeSetParams(LogicalUnit):
2522
  """Modifies the parameters of a node.
2523

2524
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
2525
      to the node role (as _ROLE_*)
2526
  @cvar _R2F: a dictionary from node role to tuples of flags
2527
  @cvar _FLAGS: a list of attribute names corresponding to the flags
2528

2529
  """
2530
  HPATH = "node-modify"
2531
  HTYPE = constants.HTYPE_NODE
2532
  REQ_BGL = False
2533
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
2534
  _F2R = {
2535
    (True, False, False): _ROLE_CANDIDATE,
2536
    (False, True, False): _ROLE_DRAINED,
2537
    (False, False, True): _ROLE_OFFLINE,
2538
    (False, False, False): _ROLE_REGULAR,
2539
    }
2540
  _R2F = dict((v, k) for k, v in _F2R.items())
2541
  _FLAGS = ["master_candidate", "drained", "offline"]
2542

    
2543
  def CheckArguments(self):
2544
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2545
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
2546
                self.op.master_capable, self.op.vm_capable,
2547
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
2548
                self.op.disk_state]
2549
    if all_mods.count(None) == len(all_mods):
2550
      raise errors.OpPrereqError("Please pass at least one modification",
2551
                                 errors.ECODE_INVAL)
2552
    if all_mods.count(True) > 1:
2553
      raise errors.OpPrereqError("Can't set the node into more than one"
2554
                                 " state at the same time",
2555
                                 errors.ECODE_INVAL)
2556

    
2557
    # Boolean value that tells us whether we might be demoting from MC
2558
    self.might_demote = (self.op.master_candidate is False or
2559
                         self.op.offline is True or
2560
                         self.op.drained is True or
2561
                         self.op.master_capable is False)
2562

    
2563
    if self.op.secondary_ip:
2564
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
2565
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
2566
                                   " address" % self.op.secondary_ip,
2567
                                   errors.ECODE_INVAL)
2568

    
2569
    self.lock_all = self.op.auto_promote and self.might_demote
2570
    self.lock_instances = self.op.secondary_ip is not None
2571

    
2572
  def _InstanceFilter(self, instance):
2573
    """Filter for getting affected instances.
2574

2575
    """
2576
    return (instance.disk_template in constants.DTS_INT_MIRROR and
2577
            self.op.node_name in instance.all_nodes)
2578

    
2579
  def ExpandNames(self):
2580
    if self.lock_all:
2581
      self.needed_locks = {
2582
        locking.LEVEL_NODE: locking.ALL_SET,
2583

    
2584
        # Block allocations when all nodes are locked
2585
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2586
        }
2587
    else:
2588
      self.needed_locks = {
2589
        locking.LEVEL_NODE: self.op.node_name,
2590
        }
2591

    
2592
    # Since modifying a node can have severe effects on currently running
2593
    # operations the resource lock is at least acquired in shared mode
2594
    self.needed_locks[locking.LEVEL_NODE_RES] = \
2595
      self.needed_locks[locking.LEVEL_NODE]
2596

    
2597
    # Get all locks except nodes in shared mode; they are not used for anything
2598
    # but read-only access
2599
    self.share_locks = _ShareAll()
2600
    self.share_locks[locking.LEVEL_NODE] = 0
2601
    self.share_locks[locking.LEVEL_NODE_RES] = 0
2602
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
2603

    
2604
    if self.lock_instances:
2605
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2606
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
2607

    
2608
  def BuildHooksEnv(self):
2609
    """Build hooks env.
2610

2611
    This runs on the master node.
2612

2613
    """
2614
    return {
2615
      "OP_TARGET": self.op.node_name,
2616
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2617
      "OFFLINE": str(self.op.offline),
2618
      "DRAINED": str(self.op.drained),
2619
      "MASTER_CAPABLE": str(self.op.master_capable),
2620
      "VM_CAPABLE": str(self.op.vm_capable),
2621
      }
2622

    
2623
  def BuildHooksNodes(self):
2624
    """Build hooks nodes.
2625

2626
    """
2627
    nl = [self.cfg.GetMasterNode(), self.op.node_name]
2628
    return (nl, nl)
2629

    
2630
  def CheckPrereq(self):
2631
    """Check prerequisites.
2632

2633
    This only checks the instance list against the existing names.
2634

2635
    """
2636
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2637

    
2638
    if self.lock_instances:
2639
      affected_instances = \
2640
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
2641

    
2642
      # Verify instance locks
2643
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2644
      wanted_instances = frozenset(affected_instances.keys())
2645
      if wanted_instances - owned_instances:
2646
        raise errors.OpPrereqError("Instances affected by changing node %s's"
2647
                                   " secondary IP address have changed since"
2648
                                   " locks were acquired, wanted '%s', have"
2649
                                   " '%s'; retry the operation" %
2650
                                   (self.op.node_name,
2651
                                    utils.CommaJoin(wanted_instances),
2652
                                    utils.CommaJoin(owned_instances)),
2653
                                   errors.ECODE_STATE)
2654
    else:
2655
      affected_instances = None
2656

    
2657
    if (self.op.master_candidate is not None or
2658
        self.op.drained is not None or
2659
        self.op.offline is not None):
2660
      # we can't change the master's node flags
2661
      if self.op.node_name == self.cfg.GetMasterNode():
2662
        raise errors.OpPrereqError("The master role can be changed"
2663
                                   " only via master-failover",
2664
                                   errors.ECODE_INVAL)
2665

    
2666
    if self.op.master_candidate and not node.master_capable:
2667
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
2668
                                 " it a master candidate" % node.name,
2669
                                 errors.ECODE_STATE)
2670

    
2671
    if self.op.vm_capable is False:
2672
      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
2673
      if ipri or isec:
2674
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
2675
                                   " the vm_capable flag" % node.name,
2676
                                   errors.ECODE_STATE)
2677

    
2678
    if node.master_candidate and self.might_demote and not self.lock_all:
2679
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
2680
      # check if after removing the current node, we're missing master
2681
      # candidates
2682
      (mc_remaining, mc_should, _) = \
2683
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
2684
      if mc_remaining < mc_should:
2685
        raise errors.OpPrereqError("Not enough master candidates, please"
2686
                                   " pass auto promote option to allow"
2687
                                   " promotion (--auto-promote or RAPI"
2688
                                   " auto_promote=True)", errors.ECODE_STATE)
2689

    
2690
    self.old_flags = old_flags = (node.master_candidate,
2691
                                  node.drained, node.offline)
2692
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
2693
    self.old_role = old_role = self._F2R[old_flags]
2694

    
2695
    # Check for ineffective changes
2696
    for attr in self._FLAGS:
2697
      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
2698
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
2699
        setattr(self.op, attr, None)
2700

    
2701
    # Past this point, any flag change to False means a transition
2702
    # away from the respective state, as only real changes are kept
2703

    
2704
    # TODO: We might query the real power state if it supports OOB
2705
    if _SupportsOob(self.cfg, node):
2706
      if self.op.offline is False and not (node.powered or
2707
                                           self.op.powered is True):
2708
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
2709
                                    " offline status can be reset") %
2710
                                   self.op.node_name, errors.ECODE_STATE)
2711
    elif self.op.powered is not None:
2712
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
2713
                                  " as it does not support out-of-band"
2714
                                  " handling") % self.op.node_name,
2715
                                 errors.ECODE_STATE)
2716

    
2717
    # If we're being deofflined/drained, we'll MC ourself if needed
2718
    if (self.op.drained is False or self.op.offline is False or
2719
        (self.op.master_capable and not node.master_capable)):
2720
      if _DecideSelfPromotion(self):
2721
        self.op.master_candidate = True
2722
        self.LogInfo("Auto-promoting node to master candidate")
2723

    
2724
    # If we're no longer master capable, we'll demote ourselves from MC
2725
    if self.op.master_capable is False and node.master_candidate:
2726
      self.LogInfo("Demoting from master candidate")
2727
      self.op.master_candidate = False
2728

    
2729
    # Compute new role
2730
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
2731
    if self.op.master_candidate:
2732
      new_role = self._ROLE_CANDIDATE
2733
    elif self.op.drained:
2734
      new_role = self._ROLE_DRAINED
2735
    elif self.op.offline:
2736
      new_role = self._ROLE_OFFLINE
2737
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
2738
      # False is still in new flags, which means we're un-setting (the
2739
      # only) True flag
2740
      new_role = self._ROLE_REGULAR
2741
    else: # no new flags, nothing, keep old role
2742
      new_role = old_role
2743

    
2744
    self.new_role = new_role
2745

    
2746
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
2747
      # Trying to transition out of offline status
2748
      result = self.rpc.call_version([node.name])[node.name]
2749
      if result.fail_msg:
2750
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
2751
                                   " to report its version: %s" %
2752
                                   (node.name, result.fail_msg),
2753
                                   errors.ECODE_STATE)
2754
      else:
2755
        self.LogWarning("Transitioning node from offline to online state"
2756
                        " without using re-add. Please make sure the node"
2757
                        " is healthy!")
2758

    
2759
    # When changing the secondary ip, verify if this is a single-homed to
2760
    # multi-homed transition or vice versa, and apply the relevant
2761
    # restrictions.
2762
    if self.op.secondary_ip:
2763
      # Ok even without locking, because this can't be changed by any LU
2764
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2765
      master_singlehomed = master.secondary_ip == master.primary_ip
2766
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
2767
        if self.op.force and node.name == master.name:
2768
          self.LogWarning("Transitioning from single-homed to multi-homed"
2769
                          " cluster; all nodes will require a secondary IP"
2770
                          " address")
2771
        else:
2772
          raise errors.OpPrereqError("Changing the secondary ip on a"
2773
                                     " single-homed cluster requires the"
2774
                                     " --force option to be passed, and the"
2775
                                     " target node to be the master",
2776
                                     errors.ECODE_INVAL)
2777
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
2778
        if self.op.force and node.name == master.name:
2779
          self.LogWarning("Transitioning from multi-homed to single-homed"
2780
                          " cluster; secondary IP addresses will have to be"
2781
                          " removed")
2782
        else:
2783
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
2784
                                     " same as the primary IP on a multi-homed"
2785
                                     " cluster, unless the --force option is"
2786
                                     " passed, and the target node is the"
2787
                                     " master", errors.ECODE_INVAL)
2788

    
2789
      assert not (frozenset(affected_instances) -
2790
                  self.owned_locks(locking.LEVEL_INSTANCE))
2791

    
2792
      if node.offline:
2793
        if affected_instances:
2794
          msg = ("Cannot change secondary IP address: offline node has"
2795
                 " instances (%s) configured to use it" %
2796
                 utils.CommaJoin(affected_instances.keys()))
2797
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
2798
      else:
2799
        # On online nodes, check that no instances are running, and that
2800
        # the node has the new ip and we can reach it.
2801
        for instance in affected_instances.values():
2802
          _CheckInstanceState(self, instance, INSTANCE_DOWN,
2803
                              msg="cannot change secondary ip")
2804

    
2805
        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
2806
        if master.name != node.name:
2807
          # check reachability from master secondary ip to new secondary ip
2808
          if not netutils.TcpPing(self.op.secondary_ip,
2809
                                  constants.DEFAULT_NODED_PORT,
2810
                                  source=master.secondary_ip):
2811
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2812
                                       " based ping to node daemon port",
2813
                                       errors.ECODE_ENVIRON)
2814

    
2815
    if self.op.ndparams:
2816
      new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
2817
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
2818
      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
2819
                            "node", "cluster or group")
2820
      self.new_ndparams = new_ndparams
2821

    
2822
    if self.op.hv_state:
2823
      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
2824
                                                 self.node.hv_state_static)
2825

    
2826
    if self.op.disk_state:
2827
      self.new_disk_state = \
2828
        _MergeAndVerifyDiskState(self.op.disk_state,
2829
                                 self.node.disk_state_static)
2830

    
2831
  def Exec(self, feedback_fn):
2832
    """Modifies a node.
2833

2834
    """
2835
    node = self.node
2836
    old_role = self.old_role
2837
    new_role = self.new_role
2838

    
2839
    result = []
2840

    
2841
    if self.op.ndparams:
2842
      node.ndparams = self.new_ndparams
2843

    
2844
    if self.op.powered is not None:
2845
      node.powered = self.op.powered
2846

    
2847
    if self.op.hv_state:
2848
      node.hv_state_static = self.new_hv_state
2849

    
2850
    if self.op.disk_state:
2851
      node.disk_state_static = self.new_disk_state
2852

    
2853
    for attr in ["master_capable", "vm_capable"]:
2854
      val = getattr(self.op, attr)
2855
      if val is not None:
2856
        setattr(node, attr, val)
2857
        result.append((attr, str(val)))
2858

    
2859
    if new_role != old_role:
2860
      # Tell the node to demote itself, if no longer MC and not offline
2861
      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
2862
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
2863
        if msg:
2864
          self.LogWarning("Node failed to demote itself: %s", msg)
2865

    
2866
      new_flags = self._R2F[new_role]
2867
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
2868
        if of != nf:
2869
          result.append((desc, str(nf)))
2870
      (node.master_candidate, node.drained, node.offline) = new_flags
2871

    
2872
      # we locked all nodes, we adjust the CP before updating this node
2873
      if self.lock_all:
2874
        _AdjustCandidatePool(self, [node.name])
2875

    
2876
    if self.op.secondary_ip:
2877
      node.secondary_ip = self.op.secondary_ip
2878
      result.append(("secondary_ip", self.op.secondary_ip))
2879

    
2880
    # this will trigger configuration file update, if needed
2881
    self.cfg.Update(node, feedback_fn)
2882

    
2883
    # this will trigger job queue propagation or cleanup if the mc
2884
    # flag changed
2885
    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
2886
      self.context.ReaddNode(node)
2887

    
2888
    return result
2889

    
2890

    
2891
class LUNodePowercycle(NoHooksLU):
2892
  """Powercycles a node.
2893

2894
  """
2895
  REQ_BGL = False
2896

    
2897
  def CheckArguments(self):
2898
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2899
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
2900
      raise errors.OpPrereqError("The node is the master and the force"
2901
                                 " parameter was not set",
2902
                                 errors.ECODE_INVAL)
2903

    
2904
  def ExpandNames(self):
2905
    """Locking for PowercycleNode.
2906

2907
    This is a last-resort option and shouldn't block on other
2908
    jobs. Therefore, we grab no locks.
2909

2910
    """
2911
    self.needed_locks = {}
2912

    
2913
  def Exec(self, feedback_fn):
2914
    """Reboots a node.
2915

2916
    """
2917
    result = self.rpc.call_node_powercycle(self.op.node_name,
2918
                                           self.cfg.GetHypervisorType())
2919
    result.Raise("Failed to schedule the reboot")
2920
    return result.payload
2921

    
2922

    
2923
class LUInstanceActivateDisks(NoHooksLU):
2924
  """Bring up an instance's disks.
2925

2926
  """
2927
  REQ_BGL = False
2928

    
2929
  def ExpandNames(self):
2930
    self._ExpandAndLockInstance()
2931
    self.needed_locks[locking.LEVEL_NODE] = []
2932
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2933

    
2934
  def DeclareLocks(self, level):
2935
    if level == locking.LEVEL_NODE:
2936
      self._LockInstancesNodes()
2937

    
2938
  def CheckPrereq(self):
2939
    """Check prerequisites.
2940

2941
    This checks that the instance is in the cluster.
2942

2943
    """
2944
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945
    assert self.instance is not None, \
2946
      "Cannot retrieve locked instance %s" % self.op.instance_name
2947
    _CheckNodeOnline(self, self.instance.primary_node)
2948

    
2949
  def Exec(self, feedback_fn):
2950
    """Activate the disks.
2951

2952
    """
2953
    disks_ok, disks_info = \
2954
              _AssembleInstanceDisks(self, self.instance,
2955
                                     ignore_size=self.op.ignore_size)
2956
    if not disks_ok:
2957
      raise errors.OpExecError("Cannot activate block devices")
2958

    
2959
    if self.op.wait_for_sync:
2960
      if not _WaitForSync(self, self.instance):
2961
        raise errors.OpExecError("Some disks of the instance are degraded!")
2962

    
2963
    return disks_info
2964

    
2965

    
2966
def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
2967
                           ignore_size=False):
2968
  """Prepare the block devices for an instance.
2969

2970
  This sets up the block devices on all nodes.
2971

2972
  @type lu: L{LogicalUnit}
2973
  @param lu: the logical unit on whose behalf we execute
2974
  @type instance: L{objects.Instance}
2975
  @param instance: the instance for whose disks we assemble
2976
  @type disks: list of L{objects.Disk} or None
2977
  @param disks: which disks to assemble (or all, if None)
2978
  @type ignore_secondaries: boolean
2979
  @param ignore_secondaries: if true, errors on secondary nodes
2980
      won't result in an error return from the function
2981
  @type ignore_size: boolean
2982
  @param ignore_size: if true, the current known size of the disk
2983
      will not be used during the disk activation, useful for cases
2984
      when the size is wrong
2985
  @return: False if the operation failed, otherwise a list of
2986
      (host, instance_visible_name, node_visible_name)
2987
      with the mapping from node devices to instance devices
2988

2989
  """
2990
  device_info = []
2991
  disks_ok = True
2992
  iname = instance.name
2993
  disks = _ExpandCheckDisks(instance, disks)
2994

    
2995
  # With the two passes mechanism we try to reduce the window of
2996
  # opportunity for the race condition of switching DRBD to primary
2997
  # before handshaking occured, but we do not eliminate it
2998

    
2999
  # The proper fix would be to wait (with some limits) until the
3000
  # connection has been made and drbd transitions from WFConnection
3001
  # into any other network-connected state (Connected, SyncTarget,
3002
  # SyncSource, etc.)
3003

    
3004
  # 1st pass, assemble on all nodes in secondary mode
3005
  for idx, inst_disk in enumerate(disks):
3006
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3007
      if ignore_size:
3008
        node_disk = node_disk.Copy()
3009
        node_disk.UnsetSize()
3010
      lu.cfg.SetDiskID(node_disk, node)
3011
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
3012
                                             False, idx)
3013
      msg = result.fail_msg
3014
      if msg:
3015
        is_offline_secondary = (node in instance.secondary_nodes and
3016
                                result.offline)
3017
        lu.LogWarning("Could not prepare block device %s on node %s"
3018
                      " (is_primary=False, pass=1): %s",
3019
                      inst_disk.iv_name, node, msg)
3020
        if not (ignore_secondaries or is_offline_secondary):
3021
          disks_ok = False
3022

    
3023
  # FIXME: race condition on drbd migration to primary
3024

    
3025
  # 2nd pass, do only the primary node
3026
  for idx, inst_disk in enumerate(disks):
3027
    dev_path = None
3028

    
3029
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3030
      if node != instance.primary_node:
3031
        continue
3032
      if ignore_size:
3033
        node_disk = node_disk.Copy()
3034
        node_disk.UnsetSize()
3035
      lu.cfg.SetDiskID(node_disk, node)
3036
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
3037
                                             True, idx)
3038
      msg = result.fail_msg
3039
      if msg:
3040
        lu.LogWarning("Could not prepare block device %s on node %s"
3041
                      " (is_primary=True, pass=2): %s",
3042
                      inst_disk.iv_name, node, msg)
3043
        disks_ok = False
3044
      else:
3045
        dev_path = result.payload
3046

    
3047
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3048

    
3049
  # leave the disks configured for the primary node
3050
  # this is a workaround that would be fixed better by
3051
  # improving the logical/physical id handling
3052
  for disk in disks:
3053
    lu.cfg.SetDiskID(disk, instance.primary_node)
3054

    
3055
  return disks_ok, device_info
3056

    
3057

    
3058
def _StartInstanceDisks(lu, instance, force):
3059
  """Start the disks of an instance.
3060

3061
  """
3062
  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3063
                                           ignore_secondaries=force)
3064
  if not disks_ok:
3065
    _ShutdownInstanceDisks(lu, instance)
3066
    if force is not None and not force:
3067
      lu.LogWarning("",
3068
                    hint=("If the message above refers to a secondary node,"
3069
                          " you can retry the operation using '--force'"))
3070
    raise errors.OpExecError("Disk consistency error")
3071

    
3072

    
3073
class LUInstanceDeactivateDisks(NoHooksLU):
3074
  """Shutdown an instance's disks.
3075

3076
  """
3077
  REQ_BGL = False
3078

    
3079
  def ExpandNames(self):
3080
    self._ExpandAndLockInstance()
3081
    self.needed_locks[locking.LEVEL_NODE] = []
3082
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3083

    
3084
  def DeclareLocks(self, level):
3085
    if level == locking.LEVEL_NODE:
3086
      self._LockInstancesNodes()
3087

    
3088
  def CheckPrereq(self):
3089
    """Check prerequisites.
3090

3091
    This checks that the instance is in the cluster.
3092

3093
    """
3094
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3095
    assert self.instance is not None, \
3096
      "Cannot retrieve locked instance %s" % self.op.instance_name
3097

    
3098
  def Exec(self, feedback_fn):
3099
    """Deactivate the disks
3100

3101
    """
3102
    instance = self.instance
3103
    if self.op.force:
3104
      _ShutdownInstanceDisks(self, instance)
3105
    else:
3106
      _SafeShutdownInstanceDisks(self, instance)
3107

    
3108

    
3109
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
3110
  """Shutdown block devices of an instance.
3111

3112
  This function checks if an instance is running, before calling
3113
  _ShutdownInstanceDisks.
3114

3115
  """
3116
  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
3117
  _ShutdownInstanceDisks(lu, instance, disks=disks)
3118

    
3119

    
3120
def _ExpandCheckDisks(instance, disks):
3121
  """Return the instance disks selected by the disks list
3122

3123
  @type disks: list of L{objects.Disk} or None
3124
  @param disks: selected disks
3125
  @rtype: list of L{objects.Disk}
3126
  @return: selected instance disks to act on
3127

3128
  """
3129
  if disks is None:
3130
    return instance.disks
3131
  else:
3132
    if not set(disks).issubset(instance.disks):
3133
      raise errors.ProgrammerError("Can only act on disks belonging to the"
3134
                                   " target instance")
3135
    return disks
3136

    
3137

    
3138
def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
3139
  """Shutdown block devices of an instance.
3140

3141
  This does the shutdown on all nodes of the instance.
3142

3143
  If the ignore_primary is false, errors on the primary node are
3144
  ignored.
3145

3146
  """
3147
  all_result = True
3148
  disks = _ExpandCheckDisks(instance, disks)
3149

    
3150
  for disk in disks:
3151
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3152
      lu.cfg.SetDiskID(top_disk, node)
3153
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
3154
      msg = result.fail_msg
3155
      if msg:
3156
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3157
                      disk.iv_name, node, msg)
3158
        if ((node == instance.primary_node and not ignore_primary) or
3159
            (node != instance.primary_node and not result.offline)):
3160
          all_result = False
3161
  return all_result
3162

    
3163

    
3164
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3165
  """Checks if a node has enough free memory.
3166

3167
  This function checks if a given node has the needed amount of free
3168
  memory. In case the node has less memory or we cannot get the
3169
  information from the node, this function raises an OpPrereqError
3170
  exception.
3171

3172
  @type lu: C{LogicalUnit}
3173
  @param lu: a logical unit from which we get configuration data
3174
  @type node: C{str}
3175
  @param node: the node to check
3176
  @type reason: C{str}
3177
  @param reason: string to use in the error message
3178
  @type requested: C{int}
3179
  @param requested: the amount of memory in MiB to check for
3180
  @type hypervisor_name: C{str}
3181
  @param hypervisor_name: the hypervisor to ask for memory stats
3182
  @rtype: integer
3183
  @return: node current free memory
3184
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3185
      we cannot check the node
3186

3187
  """
3188
  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
3189
  nodeinfo[node].Raise("Can't get data from node %s" % node,
3190
                       prereq=True, ecode=errors.ECODE_ENVIRON)
3191
  (_, _, (hv_info, )) = nodeinfo[node].payload
3192

    
3193
  free_mem = hv_info.get("memory_free", None)
3194
  if not isinstance(free_mem, int):
3195
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3196
                               " was '%s'" % (node, free_mem),
3197
                               errors.ECODE_ENVIRON)
3198
  if requested > free_mem:
3199
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3200
                               " needed %s MiB, available %s MiB" %
3201
                               (node, reason, requested, free_mem),
3202
                               errors.ECODE_NORES)
3203
  return free_mem
3204

    
3205

    
3206
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
3207
  """Checks if nodes have enough free disk space in all the VGs.
3208

3209
  This function checks if all given nodes have the needed amount of
3210
  free disk. In case any node has less disk or we cannot get the
3211
  information from the node, this function raises an OpPrereqError
3212
  exception.
3213

3214
  @type lu: C{LogicalUnit}
3215
  @param lu: a logical unit from which we get configuration data
3216
  @type nodenames: C{list}
3217
  @param nodenames: the list of node names to check
3218
  @type req_sizes: C{dict}
3219
  @param req_sizes: the hash of vg and corresponding amount of disk in
3220
      MiB to check for
3221
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
3222
      or we cannot check the node
3223

3224
  """
3225
  for vg, req_size in req_sizes.items():
3226
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
3227

    
3228

    
3229
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
3230
  """Checks if nodes have enough free disk space in the specified VG.
3231

3232
  This function checks if all given nodes have the needed amount of
3233
  free disk. In case any node has less disk or we cannot get the
3234
  information from the node, this function raises an OpPrereqError
3235
  exception.
3236

3237
  @type lu: C{LogicalUnit}
3238
  @param lu: a logical unit from which we get configuration data
3239
  @type nodenames: C{list}
3240
  @param nodenames: the list of node names to check
3241
  @type vg: C{str}
3242
  @param vg: the volume group to check
3243
  @type requested: C{int}
3244
  @param requested: the amount of disk in MiB to check for
3245
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
3246
      or we cannot check the node
3247

3248
  """
3249
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
3250
  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
3251
  for node in nodenames:
3252
    info = nodeinfo[node]
3253
    info.Raise("Cannot get current information from node %s" % node,
3254
               prereq=True, ecode=errors.ECODE_ENVIRON)
3255
    (_, (vg_info, ), _) = info.payload
3256
    vg_free = vg_info.get("vg_free", None)
3257
    if not isinstance(vg_free, int):
3258
      raise errors.OpPrereqError("Can't compute free disk space on node"
3259
                                 " %s for vg %s, result was '%s'" %
3260
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
3261
    if requested > vg_free:
3262
      raise errors.OpPrereqError("Not enough disk space on target node %s"
3263
                                 " vg %s: required %d MiB, available %d MiB" %
3264
                                 (node, vg, requested, vg_free),
3265
                                 errors.ECODE_NORES)
3266

    
3267

    
3268
def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
3269
  """Checks if nodes have enough physical CPUs
3270

3271
  This function checks if all given nodes have the needed number of
3272
  physical CPUs. In case any node has less CPUs or we cannot get the
3273
  information from the node, this function raises an OpPrereqError
3274
  exception.
3275

3276
  @type lu: C{LogicalUnit}
3277
  @param lu: a logical unit from which we get configuration data
3278
  @type nodenames: C{list}
3279
  @param nodenames: the list of node names to check
3280
  @type requested: C{int}
3281
  @param requested: the minimum acceptable number of physical CPUs
3282
  @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
3283
      or we cannot check the node
3284

3285
  """
3286
  nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
3287
  for node in nodenames:
3288
    info = nodeinfo[node]
3289
    info.Raise("Cannot get current information from node %s" % node,
3290
               prereq=True, ecode=errors.ECODE_ENVIRON)
3291
    (_, _, (hv_info, )) = info.payload
3292
    num_cpus = hv_info.get("cpu_total", None)
3293
    if not isinstance(num_cpus, int):
3294
      raise errors.OpPrereqError("Can't compute the number of physical CPUs"
3295
                                 " on node %s, result was '%s'" %
3296
                                 (node, num_cpus), errors.ECODE_ENVIRON)
3297
    if requested > num_cpus:
3298
      raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
3299
                                 "required" % (node, num_cpus, requested),
3300
                                 errors.ECODE_NORES)
3301

    
3302

    
3303
class LUInstanceStartup(LogicalUnit):
3304
  """Starts an instance.
3305

3306
  """
3307
  HPATH = "instance-start"
3308
  HTYPE = constants.HTYPE_INSTANCE
3309
  REQ_BGL = False
3310

    
3311
  def CheckArguments(self):
3312
    # extra beparams
3313
    if self.op.beparams:
3314
      # fill the beparams dict
3315
      objects.UpgradeBeParams(self.op.beparams)
3316
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
3317

    
3318
  def ExpandNames(self):
3319
    self._ExpandAndLockInstance()
3320
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3321

    
3322
  def DeclareLocks(self, level):
3323
    if level == locking.LEVEL_NODE_RES:
3324
      self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
3325

    
3326
  def BuildHooksEnv(self):
3327
    """Build hooks env.
3328

3329
    This runs on master, primary and secondary nodes of the instance.
3330

3331
    """
3332
    env = {
3333
      "FORCE": self.op.force,
3334
      }
3335

    
3336
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3337

    
3338
    return env
3339

    
3340
  def BuildHooksNodes(self):
3341
    """Build hooks nodes.
3342

3343
    """
3344
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3345
    return (nl, nl)
3346

    
3347
  def CheckPrereq(self):
3348
    """Check prerequisites.
3349

3350
    This checks that the instance is in the cluster.
3351

3352
    """
3353
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3354
    assert self.instance is not None, \
3355
      "Cannot retrieve locked instance %s" % self.op.instance_name
3356

    
3357
    # extra hvparams
3358
    if self.op.hvparams:
3359
      # check hypervisor parameter syntax (locally)
3360
      cluster = self.cfg.GetClusterInfo()
3361
      utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
3362
      filled_hvp = cluster.FillHV(instance)
3363
      filled_hvp.update(self.op.hvparams)
3364
      hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
3365
      hv_type.CheckParameterSyntax(filled_hvp)
3366
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3367

    
3368
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
3369

    
3370
    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
3371

    
3372
    if self.primary_offline and self.op.ignore_offline_nodes:
3373
      self.LogWarning("Ignoring offline primary node")
3374

    
3375
      if self.op.hvparams or self.op.beparams:
3376
        self.LogWarning("Overridden parameters are ignored")
3377
    else:
3378
      _CheckNodeOnline(self, instance.primary_node)
3379

    
3380
      bep = self.cfg.GetClusterInfo().FillBE(instance)
3381
      bep.update(self.op.beparams)
3382

    
3383
      # check bridges existence
3384
      _CheckInstanceBridgesExist(self, instance)
3385

    
3386
      remote_info = self.rpc.call_instance_info(instance.primary_node,
3387
                                                instance.name,
3388
                                                instance.hypervisor)
3389
      remote_info.Raise("Error checking node %s" % instance.primary_node,
3390
                        prereq=True, ecode=errors.ECODE_ENVIRON)
3391
      if not remote_info.payload: # not running already
3392
        _CheckNodeFreeMemory(self, instance.primary_node,
3393
                             "starting instance %s" % instance.name,
3394
                             bep[constants.BE_MINMEM], instance.hypervisor)
3395

    
3396
  def Exec(self, feedback_fn):
3397
    """Start the instance.
3398

3399
    """
3400
    instance = self.instance
3401
    force = self.op.force
3402
    reason = self.op.reason
3403

    
3404
    if not self.op.no_remember:
3405
      self.cfg.MarkInstanceUp(instance.name)
3406

    
3407
    if self.primary_offline:
3408
      assert self.op.ignore_offline_nodes
3409
      self.LogInfo("Primary node offline, marked instance as started")
3410
    else:
3411
      node_current = instance.primary_node
3412

    
3413
      _StartInstanceDisks(self, instance, force)
3414

    
3415
      result = \
3416
        self.rpc.call_instance_start(node_current,
3417
                                     (instance, self.op.hvparams,
3418
                                      self.op.beparams),
3419
                                     self.op.startup_paused, reason)
3420
      msg = result.fail_msg
3421
      if msg:
3422
        _ShutdownInstanceDisks(self, instance)
3423
        raise errors.OpExecError("Could not start instance: %s" % msg)
3424

    
3425

    
3426
class LUInstanceReboot(LogicalUnit):
3427
  """Reboot an instance.
3428

3429
  """
3430
  HPATH = "instance-reboot"
3431
  HTYPE = constants.HTYPE_INSTANCE
3432
  REQ_BGL = False
3433

    
3434
  def ExpandNames(self):
3435
    self._ExpandAndLockInstance()
3436

    
3437
  def BuildHooksEnv(self):
3438
    """Build hooks env.
3439

3440
    This runs on master, primary and secondary nodes of the instance.
3441

3442
    """
3443
    env = {
3444
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3445
      "REBOOT_TYPE": self.op.reboot_type,
3446
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
3447
      }
3448

    
3449
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3450

    
3451
    return env
3452

    
3453
  def BuildHooksNodes(self):
3454
    """Build hooks nodes.
3455

3456
    """
3457
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3458
    return (nl, nl)
3459

    
3460
  def CheckPrereq(self):
3461
    """Check prerequisites.
3462

3463
    This checks that the instance is in the cluster.
3464

3465
    """
3466
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3467
    assert self.instance is not None, \
3468
      "Cannot retrieve locked instance %s" % self.op.instance_name
3469
    _CheckInstanceState(self, instance, INSTANCE_ONLINE)
3470
    _CheckNodeOnline(self, instance.primary_node)
3471

    
3472
    # check bridges existence
3473
    _CheckInstanceBridgesExist(self, instance)
3474

    
3475
  def Exec(self, feedback_fn):
3476
    """Reboot the instance.
3477

3478
    """
3479
    instance = self.instance
3480
    ignore_secondaries = self.op.ignore_secondaries
3481
    reboot_type = self.op.reboot_type
3482
    reason = self.op.reason
3483

    
3484
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3485
                                              instance.name,
3486
                                              instance.hypervisor)
3487
    remote_info.Raise("Error checking node %s" % instance.primary_node)
3488
    instance_running = bool(remote_info.payload)
3489

    
3490
    node_current = instance.primary_node
3491

    
3492
    if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3493
                                            constants.INSTANCE_REBOOT_HARD]:
3494
      for disk in instance.disks:
3495
        self.cfg.SetDiskID(disk, node_current)
3496
      result = self.rpc.call_instance_reboot(node_current, instance,
3497
                                             reboot_type,
3498
                                             self.op.shutdown_timeout, reason)
3499
      result.Raise("Could not reboot instance")
3500
    else:
3501
      if instance_running:
3502
        result = self.rpc.call_instance_shutdown(node_current, instance,
3503
                                                 self.op.shutdown_timeout,
3504
                                                 reason)
3505
        result.Raise("Could not shutdown instance for full reboot")
3506
        _ShutdownInstanceDisks(self, instance)
3507
      else:
3508
        self.LogInfo("Instance %s was already stopped, starting now",
3509
                     instance.name)
3510
      _StartInstanceDisks(self, instance, ignore_secondaries)
3511
      result = self.rpc.call_instance_start(node_current,
3512
                                            (instance, None, None), False,
3513
                                             reason)
3514
      msg = result.fail_msg
3515
      if msg:
3516
        _ShutdownInstanceDisks(self, instance)
3517
        raise errors.OpExecError("Could not start instance for"
3518
                                 " full reboot: %s" % msg)
3519

    
3520
    self.cfg.MarkInstanceUp(instance.name)
3521

    
3522

    
3523
class LUInstanceShutdown(LogicalUnit):
3524
  """Shutdown an instance.
3525

3526
  """
3527
  HPATH = "instance-stop"
3528
  HTYPE = constants.HTYPE_INSTANCE
3529
  REQ_BGL = False
3530

    
3531
  def ExpandNames(self):
3532
    self._ExpandAndLockInstance()
3533

    
3534
  def BuildHooksEnv(self):
3535
    """Build hooks env.
3536

3537
    This runs on master, primary and secondary nodes of the instance.
3538

3539
    """
3540
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3541
    env["TIMEOUT"] = self.op.timeout
3542
    return env
3543

    
3544
  def BuildHooksNodes(self):
3545
    """Build hooks nodes.
3546

3547
    """
3548
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3549
    return (nl, nl)
3550

    
3551
  def CheckPrereq(self):
3552
    """Check prerequisites.
3553

3554
    This checks that the instance is in the cluster.
3555

3556
    """
3557
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3558
    assert self.instance is not None, \
3559
      "Cannot retrieve locked instance %s" % self.op.instance_name
3560

    
3561
    if not self.op.force:
3562
      _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
3563
    else:
3564
      self.LogWarning("Ignoring offline instance check")
3565

    
3566
    self.primary_offline = \
3567
      self.cfg.GetNodeInfo(self.instance.primary_node).offline
3568

    
3569
    if self.primary_offline and self.op.ignore_offline_nodes:
3570
      self.LogWarning("Ignoring offline primary node")
3571
    else:
3572
      _CheckNodeOnline(self, self.instance.primary_node)
3573

    
3574
  def Exec(self, feedback_fn):
3575
    """Shutdown the instance.
3576

3577
    """
3578
    instance = self.instance
3579
    node_current = instance.primary_node
3580
    timeout = self.op.timeout
3581
    reason = self.op.reason
3582

    
3583
    # If the instance is offline we shouldn't mark it as down, as that
3584
    # resets the offline flag.
3585
    if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
3586
      self.cfg.MarkInstanceDown(instance.name)
3587

    
3588
    if self.primary_offline:
3589
      assert self.op.ignore_offline_nodes
3590
      self.LogInfo("Primary node offline, marked instance as stopped")
3591
    else:
3592
      result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
3593
                                               reason)
3594
      msg = result.fail_msg
3595
      if msg:
3596
        self.LogWarning("Could not shutdown instance: %s", msg)
3597

    
3598
      _ShutdownInstanceDisks(self, instance)
3599

    
3600

    
3601
class LUInstanceReinstall(LogicalUnit):
3602
  """Reinstall an instance.
3603

3604
  """
3605
  HPATH = "instance-reinstall"
3606
  HTYPE = constants.HTYPE_INSTANCE
3607
  REQ_BGL = False
3608

    
3609
  def ExpandNames(self):
3610
    self._ExpandAndLockInstance()
3611

    
3612
  def BuildHooksEnv(self):
3613
    """Build hooks env.
3614

3615
    This runs on master, primary and secondary nodes of the instance.
3616

3617
    """
3618
    return _BuildInstanceHookEnvByObject(self, self.instance)
3619

    
3620
  def BuildHooksNodes(self):
3621
    """Build hooks nodes.
3622

3623
    """
3624
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3625
    return (nl, nl)
3626

    
3627
  def CheckPrereq(self):
3628
    """Check prerequisites.
3629

3630
    This checks that the instance is in the cluster and is not running.
3631

3632
    """
3633
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3634
    assert instance is not None, \
3635
      "Cannot retrieve locked instance %s" % self.op.instance_name
3636
    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
3637
                     " offline, cannot reinstall")
3638

    
3639
    if instance.disk_template == constants.DT_DISKLESS:
3640
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3641
                                 self.op.instance_name,
3642
                                 errors.ECODE_INVAL)
3643
    _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
3644

    
3645
    if self.op.os_type is not None:
3646
      # OS verification
3647
      pnode = _ExpandNodeName(self.cfg, instance.primary_node)
3648
      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
3649
      instance_os = self.op.os_type
3650
    else:
3651
      instance_os = instance.os
3652

    
3653
    nodelist = list(instance.all_nodes)
3654

    
3655
    if self.op.osparams:
3656
      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
3657
      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
3658
      self.os_inst = i_osdict # the new dict (without defaults)
3659
    else:
3660
      self.os_inst = None
3661

    
3662
    self.instance = instance
3663

    
3664
  def Exec(self, feedback_fn):
3665
    """Reinstall the instance.
3666

3667
    """
3668
    inst = self.instance
3669

    
3670
    if self.op.os_type is not None:
3671
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3672
      inst.os = self.op.os_type
3673
      # Write to configuration
3674
      self.cfg.Update(inst, feedback_fn)
3675

    
3676
    _StartInstanceDisks(self, inst, None)
3677
    try:
3678
      feedback_fn("Running the instance OS create scripts...")
3679
      # FIXME: pass debug option from opcode to backend
3680
      result = self.rpc.call_instance_os_add(inst.primary_node,
3681
                                             (inst, self.os_inst), True,
3682
                                             self.op.debug_level)
3683
      result.Raise("Could not install OS for instance %s on node %s" %
3684
                   (inst.name, inst.primary_node))
3685
    finally:
3686
      _ShutdownInstanceDisks(self, inst)
3687

    
3688

    
3689
class LUInstanceRecreateDisks(LogicalUnit):
3690
  """Recreate an instance's missing disks.
3691

3692
  """
3693
  HPATH = "instance-recreate-disks"
3694
  HTYPE = constants.HTYPE_INSTANCE
3695
  REQ_BGL = False
3696

    
3697
  _MODIFYABLE = compat.UniqueFrozenset([
3698
    constants.IDISK_SIZE,
3699
    constants.IDISK_MODE,
3700
    ])
3701

    
3702
  # New or changed disk parameters may have different semantics
3703
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
3704
    constants.IDISK_ADOPT,
3705

    
3706
    # TODO: Implement support changing VG while recreating
3707
    constants.IDISK_VG,
3708
    constants.IDISK_METAVG,
3709
    constants.IDISK_PROVIDER,
3710
    constants.IDISK_NAME,
3711
    ]))
3712

    
3713
  def _RunAllocator(self):
3714
    """Run the allocator based on input opcode.
3715

3716
    """
3717
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
3718

    
3719
    # FIXME
3720
    # The allocator should actually run in "relocate" mode, but current
3721
    # allocators don't support relocating all the nodes of an instance at
3722
    # the same time. As a workaround we use "allocate" mode, but this is
3723
    # suboptimal for two reasons:
3724
    # - The instance name passed to the allocator is present in the list of
3725
    #   existing instances, so there could be a conflict within the
3726
    #   internal structures of the allocator. This doesn't happen with the
3727
    #   current allocators, but it's a liability.
3728
    # - The allocator counts the resources used by the instance twice: once
3729
    #   because the instance exists already, and once because it tries to
3730
    #   allocate a new instance.
3731
    # The allocator could choose some of the nodes on which the instance is
3732
    # running, but that's not a problem. If the instance nodes are broken,
3733
    # they should be already be marked as drained or offline, and hence
3734
    # skipped by the allocator. If instance disks have been lost for other
3735
    # reasons, then recreating the disks on the same nodes should be fine.
3736
    disk_template = self.instance.disk_template
3737
    spindle_use = be_full[constants.BE_SPINDLE_USE]
3738
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
3739
                                        disk_template=disk_template,
3740
                                        tags=list(self.instance.GetTags()),
3741
                                        os=self.instance.os,
3742
                                        nics=[{}],
3743
                                        vcpus=be_full[constants.BE_VCPUS],
3744
                                        memory=be_full[constants.BE_MAXMEM],
3745
                                        spindle_use=spindle_use,
3746
                                        disks=[{constants.IDISK_SIZE: d.size,
3747
                                                constants.IDISK_MODE: d.mode}
3748
                                                for d in self.instance.disks],
3749
                                        hypervisor=self.instance.hypervisor,
3750
                                        node_whitelist=None)
3751
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3752

    
3753
    ial.Run(self.op.iallocator)
3754

    
3755
    assert req.RequiredNodes() == len(self.instance.all_nodes)
3756

    
3757
    if not ial.success:
3758
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
3759
                                 " %s" % (self.op.iallocator, ial.info),
3760
                                 errors.ECODE_NORES)
3761

    
3762
    self.op.nodes = ial.result
3763
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3764
                 self.op.instance_name, self.op.iallocator,
3765
                 utils.CommaJoin(ial.result))
3766

    
3767
  def CheckArguments(self):
3768
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
3769
      # Normalize and convert deprecated list of disk indices
3770
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
3771

    
3772
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
3773
    if duplicates:
3774
      raise errors.OpPrereqError("Some disks have been specified more than"
3775
                                 " once: %s" % utils.CommaJoin(duplicates),
3776
                                 errors.ECODE_INVAL)
3777

    
3778
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
3779
    # when neither iallocator nor nodes are specified
3780
    if self.op.iallocator or self.op.nodes:
3781
      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
3782

    
3783
    for (idx, params) in self.op.disks:
3784
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
3785
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
3786
      if unsupported:
3787
        raise errors.OpPrereqError("Parameters for disk %s try to change"
3788
                                   " unmodifyable parameter(s): %s" %
3789
                                   (idx, utils.CommaJoin(unsupported)),
3790
                                   errors.ECODE_INVAL)
3791

    
3792
  def ExpandNames(self):
3793
    self._ExpandAndLockInstance()
3794
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3795

    
3796
    if self.op.nodes:
3797
      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
3798
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
3799
    else:
3800
      self.needed_locks[locking.LEVEL_NODE] = []
3801
      if self.op.iallocator:
3802
        # iallocator will select a new node in the same group
3803
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
3804
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
3805

    
3806
    self.needed_locks[locking.LEVEL_NODE_RES] = []
3807

    
3808
  def DeclareLocks(self, level):
3809
    if level == locking.LEVEL_NODEGROUP:
3810
      assert self.op.iallocator is not None
3811
      assert not self.op.nodes
3812
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3813
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
3814
      # Lock the primary group used by the instance optimistically; this
3815
      # requires going via the node before it's locked, requiring
3816
      # verification later on
3817
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
3818
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
3819

    
3820
    elif level == locking.LEVEL_NODE:
3821
      # If an allocator is used, then we lock all the nodes in the current
3822
      # instance group, as we don't know yet which ones will be selected;
3823
      # if we replace the nodes without using an allocator, locks are
3824
      # already declared in ExpandNames; otherwise, we need to lock all the
3825
      # instance nodes for disk re-creation
3826
      if self.op.iallocator:
3827
        assert not self.op.nodes
3828
        assert not self.needed_locks[locking.LEVEL_NODE]
3829
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
3830

    
3831
        # Lock member nodes of the group of the primary node
3832
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
3833
          self.needed_locks[locking.LEVEL_NODE].extend(
3834
            self.cfg.GetNodeGroup(group_uuid).members)
3835

    
3836
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
3837
      elif not self.op.nodes:
3838
        self._LockInstancesNodes(primary_only=False)
3839
    elif level == locking.LEVEL_NODE_RES:
3840
      # Copy node locks
3841
      self.needed_locks[locking.LEVEL_NODE_RES] = \
3842
        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
3843

    
3844
  def BuildHooksEnv(self):
3845
    """Build hooks env.
3846

3847
    This runs on master, primary and secondary nodes of the instance.
3848

3849
    """
3850
    return _BuildInstanceHookEnvByObject(self, self.instance)
3851

    
3852
  def BuildHooksNodes(self):
3853
    """Build hooks nodes.
3854

3855
    """
3856
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3857
    return (nl, nl)
3858

    
3859
  def CheckPrereq(self):
3860
    """Check prerequisites.
3861

3862
    This checks that the instance is in the cluster and is not running.
3863

3864
    """
3865
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3866
    assert instance is not None, \
3867
      "Cannot retrieve locked instance %s" % self.op.instance_name
3868
    if self.op.nodes:
3869
      if len(self.op.nodes) != len(instance.all_nodes):
3870
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
3871
                                   " %d replacement nodes were specified" %
3872
                                   (instance.name, len(instance.all_nodes),
3873
                                    len(self.op.nodes)),
3874
                                   errors.ECODE_INVAL)
3875
      assert instance.disk_template != constants.DT_DRBD8 or \
3876
          len(self.op.nodes) == 2
3877
      assert instance.disk_template != constants.DT_PLAIN or \
3878
          len(self.op.nodes) == 1
3879
      primary_node = self.op.nodes[0]
3880
    else:
3881
      primary_node = instance.primary_node
3882
    if not self.op.iallocator:
3883
      _CheckNodeOnline(self, primary_node)
3884

    
3885
    if instance.disk_template == constants.DT_DISKLESS:
3886
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3887
                                 self.op.instance_name, errors.ECODE_INVAL)
3888

    
3889
    # Verify if node group locks are still correct
3890
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
3891
    if owned_groups:
3892
      # Node group locks are acquired only for the primary node (and only
3893
      # when the allocator is used)
3894
      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
3895
                               primary_only=True)
3896

    
3897
    # if we replace nodes *and* the old primary is offline, we don't
3898
    # check the instance state
3899
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
3900
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
3901
      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
3902
                          msg="cannot recreate disks")
3903

    
3904
    if self.op.disks:
3905
      self.disks = dict(self.op.disks)
3906
    else:
3907
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
3908

    
3909
    maxidx = max(self.disks.keys())
3910
    if maxidx >= len(instance.disks):
3911
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
3912
                                 errors.ECODE_INVAL)
3913

    
3914
    if ((self.op.nodes or self.op.iallocator) and
3915
        sorted(self.disks.keys()) != range(len(instance.disks))):
3916
      raise errors.OpPrereqError("Can't recreate disks partially and"
3917
                                 " change the nodes at the same time",
3918
                                 errors.ECODE_INVAL)
3919

    
3920
    self.instance = instance
3921

    
3922
    if self.op.iallocator:
3923
      self._RunAllocator()
3924
      # Release unneeded node and node resource locks
3925
      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
3926
      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
3927
      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
3928

    
3929
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3930

    
3931
  def Exec(self, feedback_fn):
3932
    """Recreate the disks.
3933

3934
    """
3935
    instance = self.instance
3936

    
3937
    assert (self.owned_locks(locking.LEVEL_NODE) ==
3938
            self.owned_locks(locking.LEVEL_NODE_RES))
3939

    
3940
    to_skip = []
3941
    mods = [] # keeps track of needed changes
3942

    
3943
    for idx, disk in enumerate(instance.disks):
3944
      try:
3945
        changes = self.disks[idx]
3946
      except KeyError:
3947
        # Disk should not be recreated
3948
        to_skip.append(idx)
3949
        continue
3950

    
3951
      # update secondaries for disks, if needed
3952
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
3953
        # need to update the nodes and minors
3954
        assert len(self.op.nodes) == 2
3955
        assert len(disk.logical_id) == 6 # otherwise disk internals
3956
                                         # have changed
3957
        (_, _, old_port, _, _, old_secret) = disk.logical_id
3958
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
3959
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
3960
                  new_minors[0], new_minors[1], old_secret)
3961
        assert len(disk.logical_id) == len(new_id)
3962
      else:
3963
        new_id = None
3964

    
3965
      mods.append((idx, new_id, changes))
3966

    
3967
    # now that we have passed all asserts above, we can apply the mods
3968
    # in a single run (to avoid partial changes)
3969
    for idx, new_id, changes in mods:
3970
      disk = instance.disks[idx]
3971
      if new_id is not None:
3972
        assert disk.dev_type == constants.LD_DRBD8
3973
        disk.logical_id = new_id
3974
      if changes:
3975
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
3976
                    mode=changes.get(constants.IDISK_MODE, None))
3977

    
3978
    # change primary node, if needed
3979
    if self.op.nodes:
3980
      instance.primary_node = self.op.nodes[0]
3981
      self.LogWarning("Changing the instance's nodes, you will have to"
3982
                      " remove any disks left on the older nodes manually")
3983

    
3984
    if self.op.nodes:
3985
      self.cfg.Update(instance, feedback_fn)
3986

    
3987
    # All touched nodes must be locked
3988
    mylocks = self.owned_locks(locking.LEVEL_NODE)
3989
    assert mylocks.issuperset(frozenset(instance.all_nodes))
3990
    _CreateDisks(self, instance, to_skip=to_skip)
3991

    
3992

    
3993
class LUInstanceRename(LogicalUnit):
3994
  """Rename an instance.
3995

3996
  """
3997
  HPATH = "instance-rename"
3998
  HTYPE = constants.HTYPE_INSTANCE
3999

    
4000
  def CheckArguments(self):
4001
    """Check arguments.
4002

4003
    """
4004
    if self.op.ip_check and not self.op.name_check:
4005
      # TODO: make the ip check more flexible and not depend on the name check
4006
      raise errors.OpPrereqError("IP address check requires a name check",
4007
                                 errors.ECODE_INVAL)
4008

    
4009
  def BuildHooksEnv(self):
4010
    """Build hooks env.
4011

4012
    This runs on master, primary and secondary nodes of the instance.
4013

4014
    """
4015
    env = _BuildInstanceHookEnvByObject(self, self.instance)
4016
    env["INSTANCE_NEW_NAME"] = self.op.new_name
4017
    return env
4018

    
4019
  def BuildHooksNodes(self):
4020
    """Build hooks nodes.
4021

4022
    """
4023
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4024
    return (nl, nl)
4025

    
4026
  def CheckPrereq(self):
4027
    """Check prerequisites.
4028

4029
    This checks that the instance is in the cluster and is not running.
4030

4031
    """
4032
    self.op.instance_name = _ExpandInstanceName(self.cfg,
4033
                                                self.op.instance_name)
4034
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4035
    assert instance is not None
4036
    _CheckNodeOnline(self, instance.primary_node)
4037
    _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
4038
                        msg="cannot rename")
4039
    self.instance = instance
4040

    
4041
    new_name = self.op.new_name
4042
    if self.op.name_check:
4043
      hostname = _CheckHostnameSane(self, new_name)
4044
      new_name = self.op.new_name = hostname.name
4045
      if (self.op.ip_check and
4046
          netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
4047
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4048
                                   (hostname.ip, new_name),
4049
                                   errors.ECODE_NOTUNIQUE)
4050

    
4051
    instance_list = self.cfg.GetInstanceList()
4052
    if new_name in instance_list and new_name != instance.name:
4053
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4054
                                 new_name, errors.ECODE_EXISTS)
4055

    
4056
  def Exec(self, feedback_fn):
4057
    """Rename the instance.
4058

4059
    """
4060
    inst = self.instance
4061
    old_name = inst.name
4062

    
4063
    rename_file_storage = False
4064
    if (inst.disk_template in constants.DTS_FILEBASED and
4065
        self.op.new_name != inst.name):
4066
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4067
      rename_file_storage = True
4068

    
4069
    self.cfg.RenameInstance(inst.name, self.op.new_name)
4070
    # Change the instance lock. This is definitely safe while we hold the BGL.
4071
    # Otherwise the new lock would have to be added in acquired mode.
4072
    assert self.REQ_BGL
4073
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
4074
    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
4075
    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4076

    
4077
    # re-read the instance from the configuration after rename
4078
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
4079

    
4080
    if rename_file_storage:
4081
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4082
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4083
                                                     old_file_storage_dir,
4084
                                                     new_file_storage_dir)
4085
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
4086
                   " (but the instance has been renamed in Ganeti)" %
4087
                   (inst.primary_node, old_file_storage_dir,
4088
                    new_file_storage_dir))
4089

    
4090
    _StartInstanceDisks(self, inst, None)
4091
    # update info on disks
4092
    info = _GetInstanceInfoText(inst)
4093
    for (idx, disk) in enumerate(inst.disks):
4094
      for node in inst.all_nodes:
4095
        self.cfg.SetDiskID(disk, node)
4096
        result = self.rpc.call_blockdev_setinfo(node, disk, info)
4097
        if result.fail_msg:
4098
          self.LogWarning("Error setting info on node %s for disk %s: %s",
4099
                          node, idx, result.fail_msg)
4100
    try:
4101
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4102
                                                 old_name, self.op.debug_level)
4103
      msg = result.fail_msg
4104
      if msg:
4105
        msg = ("Could not run OS rename script for instance %s on node %s"
4106
               " (but the instance has been renamed in Ganeti): %s" %
4107
               (inst.name, inst.primary_node, msg))
4108
        self.LogWarning(msg)
4109
    finally:
4110
      _ShutdownInstanceDisks(self, inst)
4111

    
4112
    return inst.name
4113

    
4114