Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 5e568fee

History | View | Annotate | Download (58.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
142

    
143
  def CheckPrereq(self):
144
    """Check prerequisites.
145

146
    This checks:
147
     - the new node is not already in the config
148
     - it is resolvable
149
     - its parameters (single/dual homed) matches the cluster
150

151
    Any errors are signaled by raising errors.OpPrereqError.
152

153
    """
154
    node_name = self.hostname.name
155
    self.op.primary_ip = self.hostname.ip
156
    if self.op.secondary_ip is None:
157
      if self.primary_ip_family == netutils.IP6Address.family:
158
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159
                                   " IPv4 address must be given as secondary",
160
                                   errors.ECODE_INVAL)
161
      self.op.secondary_ip = self.op.primary_ip
162

    
163
    secondary_ip = self.op.secondary_ip
164
    if not netutils.IP4Address.IsValid(secondary_ip):
165
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166
                                 " address" % secondary_ip, errors.ECODE_INVAL)
167

    
168
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169
    if not self.op.readd and existing_node_info is not None:
170
      raise errors.OpPrereqError("Node %s is already in the configuration" %
171
                                 node_name, errors.ECODE_EXISTS)
172
    elif self.op.readd and existing_node_info is None:
173
      raise errors.OpPrereqError("Node %s is not in the configuration" %
174
                                 node_name, errors.ECODE_NOENT)
175

    
176
    self.changed_primary_ip = False
177

    
178
    for existing_node in self.cfg.GetAllNodesInfo().values():
179
      if self.op.readd and node_name == existing_node.name:
180
        if existing_node.secondary_ip != secondary_ip:
181
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
182
                                     " address configuration as before",
183
                                     errors.ECODE_INVAL)
184
        if existing_node.primary_ip != self.op.primary_ip:
185
          self.changed_primary_ip = True
186

    
187
        continue
188

    
189
      if (existing_node.primary_ip == self.op.primary_ip or
190
          existing_node.secondary_ip == self.op.primary_ip or
191
          existing_node.primary_ip == secondary_ip or
192
          existing_node.secondary_ip == secondary_ip):
193
        raise errors.OpPrereqError("New node ip address(es) conflict with"
194
                                   " existing node %s" % existing_node.name,
195
                                   errors.ECODE_NOTUNIQUE)
196

    
197
    # After this 'if' block, None is no longer a valid value for the
198
    # _capable op attributes
199
    if self.op.readd:
200
      assert existing_node_info is not None, \
201
        "Can't retrieve locked node %s" % node_name
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, getattr(existing_node_info, attr))
205
    else:
206
      for attr in self._NFLAGS:
207
        if getattr(self.op, attr) is None:
208
          setattr(self.op, attr, True)
209

    
210
    if self.op.readd and not self.op.vm_capable:
211
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212
      if pri or sec:
213
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214
                                   " flag set to false, but it already holds"
215
                                   " instances" % node_name,
216
                                   errors.ECODE_STATE)
217

    
218
    # check that the type of the node (single versus dual homed) is the
219
    # same as for the master
220
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221
    master_singlehomed = myself.secondary_ip == myself.primary_ip
222
    newbie_singlehomed = secondary_ip == self.op.primary_ip
223
    if master_singlehomed != newbie_singlehomed:
224
      if master_singlehomed:
225
        raise errors.OpPrereqError("The master has no secondary ip but the"
226
                                   " new node has one",
227
                                   errors.ECODE_INVAL)
228
      else:
229
        raise errors.OpPrereqError("The master has a secondary ip but the"
230
                                   " new node doesn't have one",
231
                                   errors.ECODE_INVAL)
232

    
233
    # checks reachability
234
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235
      raise errors.OpPrereqError("Node not reachable by ping",
236
                                 errors.ECODE_ENVIRON)
237

    
238
    if not newbie_singlehomed:
239
      # check reachability from my secondary ip to newbie's secondary ip
240
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241
                              source=myself.secondary_ip):
242
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243
                                   " based ping to node daemon port",
244
                                   errors.ECODE_ENVIRON)
245

    
246
    if self.op.readd:
247
      exceptions = [existing_node_info.uuid]
248
    else:
249
      exceptions = []
250

    
251
    if self.op.master_capable:
252
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253
    else:
254
      self.master_candidate = False
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      node_group = self.cfg.LookupNodeGroup(self.op.group)
260
      self.new_node = objects.Node(name=node_name,
261
                                   primary_ip=self.op.primary_ip,
262
                                   secondary_ip=secondary_ip,
263
                                   master_candidate=self.master_candidate,
264
                                   offline=False, drained=False,
265
                                   group=node_group, ndparams={})
266

    
267
    if self.op.ndparams:
268
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270
                           "node", "cluster or group")
271

    
272
    if self.op.hv_state:
273
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274

    
275
    if self.op.disk_state:
276
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277

    
278
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279
    #       it a property on the base class.
280
    rpcrunner = rpc.DnsOnlyRunner()
281
    result = rpcrunner.call_version([node_name])[node_name]
282
    result.Raise("Can't get version information from node %s" % node_name)
283
    if constants.PROTOCOL_VERSION == result.payload:
284
      logging.info("Communication to node %s fine, sw version %s match",
285
                   node_name, result.payload)
286
    else:
287
      raise errors.OpPrereqError("Version mismatch master version %s,"
288
                                 " node version %s" %
289
                                 (constants.PROTOCOL_VERSION, result.payload),
290
                                 errors.ECODE_ENVIRON)
291

    
292
    vg_name = self.cfg.GetVGName()
293
    if vg_name is not None:
294
      vparams = {constants.NV_PVLIST: [vg_name]}
295
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296
      cname = self.cfg.GetClusterName()
297
      result = rpcrunner.call_node_verify_light(
298
          [node_name], vparams, cname,
299
          self.cfg.GetClusterInfo().hvparams)[node_name]
300
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301
      if errmsgs:
302
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
304

    
305
  def Exec(self, feedback_fn):
306
    """Adds the new node to the cluster.
307

308
    """
309
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310
      "Not owning BGL"
311

    
312
    # We adding a new node so we assume it's powered
313
    self.new_node.powered = True
314

    
315
    # for re-adds, reset the offline/drained/master-candidate flags;
316
    # we need to reset here, otherwise offline would prevent RPC calls
317
    # later in the procedure; this also means that if the re-add
318
    # fails, we are left with a non-offlined, broken node
319
    if self.op.readd:
320
      self.new_node.drained = False
321
      self.LogInfo("Readding a node, the offline/drained flags were reset")
322
      # if we demote the node, we do cleanup later in the procedure
323
      self.new_node.master_candidate = self.master_candidate
324
      if self.changed_primary_ip:
325
        self.new_node.primary_ip = self.op.primary_ip
326

    
327
    # copy the master/vm_capable flags
328
    for attr in self._NFLAGS:
329
      setattr(self.new_node, attr, getattr(self.op, attr))
330

    
331
    # notify the user about any possible mc promotion
332
    if self.new_node.master_candidate:
333
      self.LogInfo("Node will be a master candidate")
334

    
335
    if self.op.ndparams:
336
      self.new_node.ndparams = self.op.ndparams
337
    else:
338
      self.new_node.ndparams = {}
339

    
340
    if self.op.hv_state:
341
      self.new_node.hv_state_static = self.new_hv_state
342

    
343
    if self.op.disk_state:
344
      self.new_node.disk_state_static = self.new_disk_state
345

    
346
    # Add node to our /etc/hosts, and add key to known_hosts
347
    if self.cfg.GetClusterInfo().modify_etc_hosts:
348
      master_node = self.cfg.GetMasterNode()
349
      result = self.rpc.call_etc_hosts_modify(
350
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
351
                 self.hostname.ip)
352
      result.Raise("Can't update hosts file with new host data")
353

    
354
    if self.new_node.secondary_ip != self.new_node.primary_ip:
355
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
356
                               False)
357

    
358
    node_verifier_uuids = [self.cfg.GetMasterNode()]
359
    node_verify_param = {
360
      constants.NV_NODELIST: ([self.new_node.name], {}),
361
      # TODO: do a node-net-test as well?
362
    }
363

    
364
    result = self.rpc.call_node_verify(
365
               node_verifier_uuids, node_verify_param,
366
               self.cfg.GetClusterName(),
367
               self.cfg.GetClusterInfo().hvparams)
368
    for verifier in node_verifier_uuids:
369
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
370
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
371
      if nl_payload:
372
        for failed in nl_payload:
373
          feedback_fn("ssh/hostname verification failed"
374
                      " (checking from %s): %s" %
375
                      (verifier, nl_payload[failed]))
376
        raise errors.OpExecError("ssh/hostname verification failed")
377

    
378
    if self.op.readd:
379
      self.context.ReaddNode(self.new_node)
380
      RedistributeAncillaryFiles(self)
381
      # make sure we redistribute the config
382
      self.cfg.Update(self.new_node, feedback_fn)
383
      # and make sure the new node will not have old files around
384
      if not self.new_node.master_candidate:
385
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386
        result.Warn("Node failed to demote itself from master candidate status",
387
                    self.LogWarning)
388
    else:
389
      self.context.AddNode(self.new_node, self.proc.GetECId())
390
      RedistributeAncillaryFiles(self)
391

    
392

    
393
class LUNodeSetParams(LogicalUnit):
394
  """Modifies the parameters of a node.
395

396
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397
      to the node role (as _ROLE_*)
398
  @cvar _R2F: a dictionary from node role to tuples of flags
399
  @cvar _FLAGS: a list of attribute names corresponding to the flags
400

401
  """
402
  HPATH = "node-modify"
403
  HTYPE = constants.HTYPE_NODE
404
  REQ_BGL = False
405
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
406
  _F2R = {
407
    (True, False, False): _ROLE_CANDIDATE,
408
    (False, True, False): _ROLE_DRAINED,
409
    (False, False, True): _ROLE_OFFLINE,
410
    (False, False, False): _ROLE_REGULAR,
411
    }
412
  _R2F = dict((v, k) for k, v in _F2R.items())
413
  _FLAGS = ["master_candidate", "drained", "offline"]
414

    
415
  def CheckArguments(self):
416
    (self.op.node_uuid, self.op.node_name) = \
417
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419
                self.op.master_capable, self.op.vm_capable,
420
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
421
                self.op.disk_state]
422
    if all_mods.count(None) == len(all_mods):
423
      raise errors.OpPrereqError("Please pass at least one modification",
424
                                 errors.ECODE_INVAL)
425
    if all_mods.count(True) > 1:
426
      raise errors.OpPrereqError("Can't set the node into more than one"
427
                                 " state at the same time",
428
                                 errors.ECODE_INVAL)
429

    
430
    # Boolean value that tells us whether we might be demoting from MC
431
    self.might_demote = (self.op.master_candidate is False or
432
                         self.op.offline is True or
433
                         self.op.drained is True or
434
                         self.op.master_capable is False)
435

    
436
    if self.op.secondary_ip:
437
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439
                                   " address" % self.op.secondary_ip,
440
                                   errors.ECODE_INVAL)
441

    
442
    self.lock_all = self.op.auto_promote and self.might_demote
443
    self.lock_instances = self.op.secondary_ip is not None
444

    
445
  def _InstanceFilter(self, instance):
446
    """Filter for getting affected instances.
447

448
    """
449
    return (instance.disk_template in constants.DTS_INT_MIRROR and
450
            self.op.node_uuid in instance.all_nodes)
451

    
452
  def ExpandNames(self):
453
    if self.lock_all:
454
      self.needed_locks = {
455
        locking.LEVEL_NODE: locking.ALL_SET,
456

    
457
        # Block allocations when all nodes are locked
458
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
459
        }
460
    else:
461
      self.needed_locks = {
462
        locking.LEVEL_NODE: self.op.node_uuid,
463
        }
464

    
465
    # Since modifying a node can have severe effects on currently running
466
    # operations the resource lock is at least acquired in shared mode
467
    self.needed_locks[locking.LEVEL_NODE_RES] = \
468
      self.needed_locks[locking.LEVEL_NODE]
469

    
470
    # Get all locks except nodes in shared mode; they are not used for anything
471
    # but read-only access
472
    self.share_locks = ShareAll()
473
    self.share_locks[locking.LEVEL_NODE] = 0
474
    self.share_locks[locking.LEVEL_NODE_RES] = 0
475
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
476

    
477
    if self.lock_instances:
478
      self.needed_locks[locking.LEVEL_INSTANCE] = \
479
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
480

    
481
  def BuildHooksEnv(self):
482
    """Build hooks env.
483

484
    This runs on the master node.
485

486
    """
487
    return {
488
      "OP_TARGET": self.op.node_name,
489
      "MASTER_CANDIDATE": str(self.op.master_candidate),
490
      "OFFLINE": str(self.op.offline),
491
      "DRAINED": str(self.op.drained),
492
      "MASTER_CAPABLE": str(self.op.master_capable),
493
      "VM_CAPABLE": str(self.op.vm_capable),
494
      }
495

    
496
  def BuildHooksNodes(self):
497
    """Build hooks nodes.
498

499
    """
500
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
501
    return (nl, nl)
502

    
503
  def CheckPrereq(self):
504
    """Check prerequisites.
505

506
    This only checks the instance list against the existing names.
507

508
    """
509
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
510
    if self.lock_instances:
511
      affected_instances = \
512
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
513

    
514
      # Verify instance locks
515
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
516
      wanted_instances = frozenset(affected_instances.keys())
517
      if wanted_instances - owned_instances:
518
        raise errors.OpPrereqError("Instances affected by changing node %s's"
519
                                   " secondary IP address have changed since"
520
                                   " locks were acquired, wanted '%s', have"
521
                                   " '%s'; retry the operation" %
522
                                   (node.name,
523
                                    utils.CommaJoin(wanted_instances),
524
                                    utils.CommaJoin(owned_instances)),
525
                                   errors.ECODE_STATE)
526
    else:
527
      affected_instances = None
528

    
529
    if (self.op.master_candidate is not None or
530
        self.op.drained is not None or
531
        self.op.offline is not None):
532
      # we can't change the master's node flags
533
      if node.uuid == self.cfg.GetMasterNode():
534
        raise errors.OpPrereqError("The master role can be changed"
535
                                   " only via master-failover",
536
                                   errors.ECODE_INVAL)
537

    
538
    if self.op.master_candidate and not node.master_capable:
539
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
540
                                 " it a master candidate" % node.name,
541
                                 errors.ECODE_STATE)
542

    
543
    if self.op.vm_capable is False:
544
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
545
      if ipri or isec:
546
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
547
                                   " the vm_capable flag" % node.name,
548
                                   errors.ECODE_STATE)
549

    
550
    if node.master_candidate and self.might_demote and not self.lock_all:
551
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
552
      # check if after removing the current node, we're missing master
553
      # candidates
554
      (mc_remaining, mc_should, _) = \
555
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
556
      if mc_remaining < mc_should:
557
        raise errors.OpPrereqError("Not enough master candidates, please"
558
                                   " pass auto promote option to allow"
559
                                   " promotion (--auto-promote or RAPI"
560
                                   " auto_promote=True)", errors.ECODE_STATE)
561

    
562
    self.old_flags = old_flags = (node.master_candidate,
563
                                  node.drained, node.offline)
564
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
565
    self.old_role = old_role = self._F2R[old_flags]
566

    
567
    # Check for ineffective changes
568
    for attr in self._FLAGS:
569
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
570
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
571
        setattr(self.op, attr, None)
572

    
573
    # Past this point, any flag change to False means a transition
574
    # away from the respective state, as only real changes are kept
575

    
576
    # TODO: We might query the real power state if it supports OOB
577
    if SupportsOob(self.cfg, node):
578
      if self.op.offline is False and not (node.powered or
579
                                           self.op.powered is True):
580
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
581
                                    " offline status can be reset") %
582
                                   self.op.node_name, errors.ECODE_STATE)
583
    elif self.op.powered is not None:
584
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
585
                                  " as it does not support out-of-band"
586
                                  " handling") % self.op.node_name,
587
                                 errors.ECODE_STATE)
588

    
589
    # If we're being deofflined/drained, we'll MC ourself if needed
590
    if (self.op.drained is False or self.op.offline is False or
591
        (self.op.master_capable and not node.master_capable)):
592
      if _DecideSelfPromotion(self):
593
        self.op.master_candidate = True
594
        self.LogInfo("Auto-promoting node to master candidate")
595

    
596
    # If we're no longer master capable, we'll demote ourselves from MC
597
    if self.op.master_capable is False and node.master_candidate:
598
      self.LogInfo("Demoting from master candidate")
599
      self.op.master_candidate = False
600

    
601
    # Compute new role
602
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
603
    if self.op.master_candidate:
604
      new_role = self._ROLE_CANDIDATE
605
    elif self.op.drained:
606
      new_role = self._ROLE_DRAINED
607
    elif self.op.offline:
608
      new_role = self._ROLE_OFFLINE
609
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
610
      # False is still in new flags, which means we're un-setting (the
611
      # only) True flag
612
      new_role = self._ROLE_REGULAR
613
    else: # no new flags, nothing, keep old role
614
      new_role = old_role
615

    
616
    self.new_role = new_role
617

    
618
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
619
      # Trying to transition out of offline status
620
      result = self.rpc.call_version([node.uuid])[node.uuid]
621
      if result.fail_msg:
622
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
623
                                   " to report its version: %s" %
624
                                   (node.name, result.fail_msg),
625
                                   errors.ECODE_STATE)
626
      else:
627
        self.LogWarning("Transitioning node from offline to online state"
628
                        " without using re-add. Please make sure the node"
629
                        " is healthy!")
630

    
631
    # When changing the secondary ip, verify if this is a single-homed to
632
    # multi-homed transition or vice versa, and apply the relevant
633
    # restrictions.
634
    if self.op.secondary_ip:
635
      # Ok even without locking, because this can't be changed by any LU
636
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
637
      master_singlehomed = master.secondary_ip == master.primary_ip
638
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
639
        if self.op.force and node.uuid == master.uuid:
640
          self.LogWarning("Transitioning from single-homed to multi-homed"
641
                          " cluster; all nodes will require a secondary IP"
642
                          " address")
643
        else:
644
          raise errors.OpPrereqError("Changing the secondary ip on a"
645
                                     " single-homed cluster requires the"
646
                                     " --force option to be passed, and the"
647
                                     " target node to be the master",
648
                                     errors.ECODE_INVAL)
649
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
650
        if self.op.force and node.uuid == master.uuid:
651
          self.LogWarning("Transitioning from multi-homed to single-homed"
652
                          " cluster; secondary IP addresses will have to be"
653
                          " removed")
654
        else:
655
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
656
                                     " same as the primary IP on a multi-homed"
657
                                     " cluster, unless the --force option is"
658
                                     " passed, and the target node is the"
659
                                     " master", errors.ECODE_INVAL)
660

    
661
      assert not (frozenset(affected_instances) -
662
                  self.owned_locks(locking.LEVEL_INSTANCE))
663

    
664
      if node.offline:
665
        if affected_instances:
666
          msg = ("Cannot change secondary IP address: offline node has"
667
                 " instances (%s) configured to use it" %
668
                 utils.CommaJoin(affected_instances.keys()))
669
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
670
      else:
671
        # On online nodes, check that no instances are running, and that
672
        # the node has the new ip and we can reach it.
673
        for instance in affected_instances.values():
674
          CheckInstanceState(self, instance, INSTANCE_DOWN,
675
                             msg="cannot change secondary ip")
676

    
677
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
678
        if master.uuid != node.uuid:
679
          # check reachability from master secondary ip to new secondary ip
680
          if not netutils.TcpPing(self.op.secondary_ip,
681
                                  constants.DEFAULT_NODED_PORT,
682
                                  source=master.secondary_ip):
683
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
684
                                       " based ping to node daemon port",
685
                                       errors.ECODE_ENVIRON)
686

    
687
    if self.op.ndparams:
688
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
689
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
690
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
691
                           "node", "cluster or group")
692
      self.new_ndparams = new_ndparams
693

    
694
    if self.op.hv_state:
695
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
696
                                                node.hv_state_static)
697

    
698
    if self.op.disk_state:
699
      self.new_disk_state = \
700
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
701

    
702
  def Exec(self, feedback_fn):
703
    """Modifies a node.
704

705
    """
706
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
707
    result = []
708

    
709
    if self.op.ndparams:
710
      node.ndparams = self.new_ndparams
711

    
712
    if self.op.powered is not None:
713
      node.powered = self.op.powered
714

    
715
    if self.op.hv_state:
716
      node.hv_state_static = self.new_hv_state
717

    
718
    if self.op.disk_state:
719
      node.disk_state_static = self.new_disk_state
720

    
721
    for attr in ["master_capable", "vm_capable"]:
722
      val = getattr(self.op, attr)
723
      if val is not None:
724
        setattr(node, attr, val)
725
        result.append((attr, str(val)))
726

    
727
    if self.new_role != self.old_role:
728
      # Tell the node to demote itself, if no longer MC and not offline
729
      if self.old_role == self._ROLE_CANDIDATE and \
730
          self.new_role != self._ROLE_OFFLINE:
731
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
732
        if msg:
733
          self.LogWarning("Node failed to demote itself: %s", msg)
734

    
735
      new_flags = self._R2F[self.new_role]
736
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
737
        if of != nf:
738
          result.append((desc, str(nf)))
739
      (node.master_candidate, node.drained, node.offline) = new_flags
740

    
741
      # we locked all nodes, we adjust the CP before updating this node
742
      if self.lock_all:
743
        AdjustCandidatePool(self, [node.uuid])
744

    
745
    if self.op.secondary_ip:
746
      node.secondary_ip = self.op.secondary_ip
747
      result.append(("secondary_ip", self.op.secondary_ip))
748

    
749
    # this will trigger configuration file update, if needed
750
    self.cfg.Update(node, feedback_fn)
751

    
752
    # this will trigger job queue propagation or cleanup if the mc
753
    # flag changed
754
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
755
      self.context.ReaddNode(node)
756

    
757
    return result
758

    
759

    
760
class LUNodePowercycle(NoHooksLU):
761
  """Powercycles a node.
762

763
  """
764
  REQ_BGL = False
765

    
766
  def CheckArguments(self):
767
    (self.op.node_uuid, self.op.node_name) = \
768
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
769

    
770
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
771
      raise errors.OpPrereqError("The node is the master and the force"
772
                                 " parameter was not set",
773
                                 errors.ECODE_INVAL)
774

    
775
  def ExpandNames(self):
776
    """Locking for PowercycleNode.
777

778
    This is a last-resort option and shouldn't block on other
779
    jobs. Therefore, we grab no locks.
780

781
    """
782
    self.needed_locks = {}
783

    
784
  def Exec(self, feedback_fn):
785
    """Reboots a node.
786

787
    """
788
    default_hypervisor = self.cfg.GetHypervisorType()
789
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
790
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
791
                                           default_hypervisor,
792
                                           hvparams)
793
    result.Raise("Failed to schedule the reboot")
794
    return result.payload
795

    
796

    
797
def _GetNodeInstancesInner(cfg, fn):
798
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
799

    
800

    
801
def _GetNodePrimaryInstances(cfg, node_uuid):
802
  """Returns primary instances on a node.
803

804
  """
805
  return _GetNodeInstancesInner(cfg,
806
                                lambda inst: node_uuid == inst.primary_node)
807

    
808

    
809
def _GetNodeSecondaryInstances(cfg, node_uuid):
810
  """Returns secondary instances on a node.
811

812
  """
813
  return _GetNodeInstancesInner(cfg,
814
                                lambda inst: node_uuid in inst.secondary_nodes)
815

    
816

    
817
def _GetNodeInstances(cfg, node_uuid):
818
  """Returns a list of all primary and secondary instances on a node.
819

820
  """
821

    
822
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
823

    
824

    
825
class LUNodeEvacuate(NoHooksLU):
826
  """Evacuates instances off a list of nodes.
827

828
  """
829
  REQ_BGL = False
830

    
831
  _MODE2IALLOCATOR = {
832
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
833
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
834
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
835
    }
836
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
837
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
838
          constants.IALLOCATOR_NEVAC_MODES)
839

    
840
  def CheckArguments(self):
841
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
842

    
843
  def ExpandNames(self):
844
    (self.op.node_uuid, self.op.node_name) = \
845
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
846

    
847
    if self.op.remote_node is not None:
848
      (self.op.remote_node_uuid, self.op.remote_node) = \
849
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
850
                              self.op.remote_node)
851
      assert self.op.remote_node
852

    
853
      if self.op.node_uuid == self.op.remote_node_uuid:
854
        raise errors.OpPrereqError("Can not use evacuated node as a new"
855
                                   " secondary node", errors.ECODE_INVAL)
856

    
857
      if self.op.mode != constants.NODE_EVAC_SEC:
858
        raise errors.OpPrereqError("Without the use of an iallocator only"
859
                                   " secondary instances can be evacuated",
860
                                   errors.ECODE_INVAL)
861

    
862
    # Declare locks
863
    self.share_locks = ShareAll()
864
    self.needed_locks = {
865
      locking.LEVEL_INSTANCE: [],
866
      locking.LEVEL_NODEGROUP: [],
867
      locking.LEVEL_NODE: [],
868
      }
869

    
870
    # Determine nodes (via group) optimistically, needs verification once locks
871
    # have been acquired
872
    self.lock_nodes = self._DetermineNodes()
873

    
874
  def _DetermineNodes(self):
875
    """Gets the list of node UUIDs to operate on.
876

877
    """
878
    if self.op.remote_node is None:
879
      # Iallocator will choose any node(s) in the same group
880
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
881
    else:
882
      group_nodes = frozenset([self.op.remote_node_uuid])
883

    
884
    # Determine nodes to be locked
885
    return set([self.op.node_uuid]) | group_nodes
886

    
887
  def _DetermineInstances(self):
888
    """Builds list of instances to operate on.
889

890
    """
891
    assert self.op.mode in constants.NODE_EVAC_MODES
892

    
893
    if self.op.mode == constants.NODE_EVAC_PRI:
894
      # Primary instances only
895
      inst_fn = _GetNodePrimaryInstances
896
      assert self.op.remote_node is None, \
897
        "Evacuating primary instances requires iallocator"
898
    elif self.op.mode == constants.NODE_EVAC_SEC:
899
      # Secondary instances only
900
      inst_fn = _GetNodeSecondaryInstances
901
    else:
902
      # All instances
903
      assert self.op.mode == constants.NODE_EVAC_ALL
904
      inst_fn = _GetNodeInstances
905
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
906
      # per instance
907
      raise errors.OpPrereqError("Due to an issue with the iallocator"
908
                                 " interface it is not possible to evacuate"
909
                                 " all instances at once; specify explicitly"
910
                                 " whether to evacuate primary or secondary"
911
                                 " instances",
912
                                 errors.ECODE_INVAL)
913

    
914
    return inst_fn(self.cfg, self.op.node_uuid)
915

    
916
  def DeclareLocks(self, level):
917
    if level == locking.LEVEL_INSTANCE:
918
      # Lock instances optimistically, needs verification once node and group
919
      # locks have been acquired
920
      self.needed_locks[locking.LEVEL_INSTANCE] = \
921
        set(i.name for i in self._DetermineInstances())
922

    
923
    elif level == locking.LEVEL_NODEGROUP:
924
      # Lock node groups for all potential target nodes optimistically, needs
925
      # verification once nodes have been acquired
926
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
927
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
928

    
929
    elif level == locking.LEVEL_NODE:
930
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
931

    
932
  def CheckPrereq(self):
933
    # Verify locks
934
    owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
935
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
936
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
937

    
938
    need_nodes = self._DetermineNodes()
939

    
940
    if not owned_nodes.issuperset(need_nodes):
941
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
942
                                 " locks were acquired, current nodes are"
943
                                 " are '%s', used to be '%s'; retry the"
944
                                 " operation" %
945
                                 (self.op.node_name,
946
                                  utils.CommaJoin(need_nodes),
947
                                  utils.CommaJoin(owned_nodes)),
948
                                 errors.ECODE_STATE)
949

    
950
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
951
    if owned_groups != wanted_groups:
952
      raise errors.OpExecError("Node groups changed since locks were acquired,"
953
                               " current groups are '%s', used to be '%s';"
954
                               " retry the operation" %
955
                               (utils.CommaJoin(wanted_groups),
956
                                utils.CommaJoin(owned_groups)))
957

    
958
    # Determine affected instances
959
    self.instances = self._DetermineInstances()
960
    self.instance_names = [i.name for i in self.instances]
961

    
962
    if set(self.instance_names) != owned_instances:
963
      raise errors.OpExecError("Instances on node '%s' changed since locks"
964
                               " were acquired, current instances are '%s',"
965
                               " used to be '%s'; retry the operation" %
966
                               (self.op.node_name,
967
                                utils.CommaJoin(self.instance_names),
968
                                utils.CommaJoin(owned_instances)))
969

    
970
    if self.instance_names:
971
      self.LogInfo("Evacuating instances from node '%s': %s",
972
                   self.op.node_name,
973
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
974
    else:
975
      self.LogInfo("No instances to evacuate from node '%s'",
976
                   self.op.node_name)
977

    
978
    if self.op.remote_node is not None:
979
      for i in self.instances:
980
        if i.primary_node == self.op.remote_node_uuid:
981
          raise errors.OpPrereqError("Node %s is the primary node of"
982
                                     " instance %s, cannot use it as"
983
                                     " secondary" %
984
                                     (self.op.remote_node, i.name),
985
                                     errors.ECODE_INVAL)
986

    
987
  def Exec(self, feedback_fn):
988
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
989

    
990
    if not self.instance_names:
991
      # No instances to evacuate
992
      jobs = []
993

    
994
    elif self.op.iallocator is not None:
995
      # TODO: Implement relocation to other group
996
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
997
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
998
                                     instances=list(self.instance_names))
999
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1000

    
1001
      ial.Run(self.op.iallocator)
1002

    
1003
      if not ial.success:
1004
        raise errors.OpPrereqError("Can't compute node evacuation using"
1005
                                   " iallocator '%s': %s" %
1006
                                   (self.op.iallocator, ial.info),
1007
                                   errors.ECODE_NORES)
1008

    
1009
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1010

    
1011
    elif self.op.remote_node is not None:
1012
      assert self.op.mode == constants.NODE_EVAC_SEC
1013
      jobs = [
1014
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1015
                                        remote_node=self.op.remote_node,
1016
                                        disks=[],
1017
                                        mode=constants.REPLACE_DISK_CHG,
1018
                                        early_release=self.op.early_release)]
1019
        for instance_name in self.instance_names]
1020

    
1021
    else:
1022
      raise errors.ProgrammerError("No iallocator or remote node")
1023

    
1024
    return ResultWithJobs(jobs)
1025

    
1026

    
1027
class LUNodeMigrate(LogicalUnit):
1028
  """Migrate all instances from a node.
1029

1030
  """
1031
  HPATH = "node-migrate"
1032
  HTYPE = constants.HTYPE_NODE
1033
  REQ_BGL = False
1034

    
1035
  def CheckArguments(self):
1036
    pass
1037

    
1038
  def ExpandNames(self):
1039
    (self.op.node_uuid, self.op.node_name) = \
1040
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1041

    
1042
    self.share_locks = ShareAll()
1043
    self.needed_locks = {
1044
      locking.LEVEL_NODE: [self.op.node_uuid],
1045
      }
1046

    
1047
  def BuildHooksEnv(self):
1048
    """Build hooks env.
1049

1050
    This runs on the master, the primary and all the secondaries.
1051

1052
    """
1053
    return {
1054
      "NODE_NAME": self.op.node_name,
1055
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1056
      }
1057

    
1058
  def BuildHooksNodes(self):
1059
    """Build hooks nodes.
1060

1061
    """
1062
    nl = [self.cfg.GetMasterNode()]
1063
    return (nl, nl)
1064

    
1065
  def CheckPrereq(self):
1066
    pass
1067

    
1068
  def Exec(self, feedback_fn):
1069
    # Prepare jobs for migration instances
1070
    jobs = [
1071
      [opcodes.OpInstanceMigrate(
1072
        instance_name=inst.name,
1073
        mode=self.op.mode,
1074
        live=self.op.live,
1075
        iallocator=self.op.iallocator,
1076
        target_node=self.op.target_node,
1077
        allow_runtime_changes=self.op.allow_runtime_changes,
1078
        ignore_ipolicy=self.op.ignore_ipolicy)]
1079
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1080

    
1081
    # TODO: Run iallocator in this opcode and pass correct placement options to
1082
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1083
    # running the iallocator and the actual migration, a good consistency model
1084
    # will have to be found.
1085

    
1086
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1087
            frozenset([self.op.node_uuid]))
1088

    
1089
    return ResultWithJobs(jobs)
1090

    
1091

    
1092
def _GetStorageTypeArgs(cfg, storage_type):
1093
  """Returns the arguments for a storage type.
1094

1095
  """
1096
  # Special case for file storage
1097
  if storage_type == constants.ST_FILE:
1098
    # storage.FileStorage wants a list of storage directories
1099
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1100

    
1101
  return []
1102

    
1103

    
1104
class LUNodeModifyStorage(NoHooksLU):
1105
  """Logical unit for modifying a storage volume on a node.
1106

1107
  """
1108
  REQ_BGL = False
1109

    
1110
  def CheckArguments(self):
1111
    (self.op.node_uuid, self.op.node_name) = \
1112
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1113

    
1114
    storage_type = self.op.storage_type
1115

    
1116
    try:
1117
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1118
    except KeyError:
1119
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1120
                                 " modified" % storage_type,
1121
                                 errors.ECODE_INVAL)
1122

    
1123
    diff = set(self.op.changes.keys()) - modifiable
1124
    if diff:
1125
      raise errors.OpPrereqError("The following fields can not be modified for"
1126
                                 " storage units of type '%s': %r" %
1127
                                 (storage_type, list(diff)),
1128
                                 errors.ECODE_INVAL)
1129

    
1130
  def ExpandNames(self):
1131
    self.needed_locks = {
1132
      locking.LEVEL_NODE: self.op.node_uuid,
1133
      }
1134

    
1135
  def Exec(self, feedback_fn):
1136
    """Computes the list of nodes and their attributes.
1137

1138
    """
1139
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1140
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1141
                                          self.op.storage_type, st_args,
1142
                                          self.op.name, self.op.changes)
1143
    result.Raise("Failed to modify storage unit '%s' on %s" %
1144
                 (self.op.name, self.op.node_name))
1145

    
1146

    
1147
class NodeQuery(QueryBase):
1148
  FIELDS = query.NODE_FIELDS
1149

    
1150
  def ExpandNames(self, lu):
1151
    lu.needed_locks = {}
1152
    lu.share_locks = ShareAll()
1153

    
1154
    if self.names:
1155
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1156
    else:
1157
      self.wanted = locking.ALL_SET
1158

    
1159
    self.do_locking = (self.use_locking and
1160
                       query.NQ_LIVE in self.requested_data)
1161

    
1162
    if self.do_locking:
1163
      # If any non-static field is requested we need to lock the nodes
1164
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1165
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1166

    
1167
  def DeclareLocks(self, lu, level):
1168
    pass
1169

    
1170
  def _GetQueryData(self, lu):
1171
    """Computes the list of nodes and their attributes.
1172

1173
    """
1174
    all_info = lu.cfg.GetAllNodesInfo()
1175

    
1176
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1177

    
1178
    # Gather data as requested
1179
    if query.NQ_LIVE in self.requested_data:
1180
      # filter out non-vm_capable nodes
1181
      toquery_node_uuids = [node.uuid for node in all_info.values()
1182
                            if node.vm_capable and node.uuid in node_uuids]
1183

    
1184
      es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, toquery_node_uuids)
1185
      # FIXME: This currently maps everything to lvm, this should be more
1186
      # flexible
1187
      lvm_enabled = utils.storage.IsLvmEnabled(
1188
          lu.cfg.GetClusterInfo().enabled_disk_templates)
1189
      storage_units = utils.storage.GetStorageUnitsOfCluster(
1190
          lu.cfg, include_spindles=True)
1191
      default_hypervisor = lu.cfg.GetHypervisorType()
1192
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1193
      hvspecs = [(default_hypervisor, hvparams)]
1194
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1195
                                        hvspecs, es_flags)
1196
      live_data = dict(
1197
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1198
                                        require_vg_info=lvm_enabled))
1199
          for (uuid, nresult) in node_data.items()
1200
          if not nresult.fail_msg and nresult.payload)
1201
    else:
1202
      live_data = None
1203

    
1204
    if query.NQ_INST in self.requested_data:
1205
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1206
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1207

    
1208
      inst_data = lu.cfg.GetAllInstancesInfo()
1209

    
1210
      for inst in inst_data.values():
1211
        if inst.primary_node in node_to_primary:
1212
          node_to_primary[inst.primary_node].add(inst.name)
1213
        for secnode in inst.secondary_nodes:
1214
          if secnode in node_to_secondary:
1215
            node_to_secondary[secnode].add(inst.name)
1216
    else:
1217
      node_to_primary = None
1218
      node_to_secondary = None
1219

    
1220
    if query.NQ_OOB in self.requested_data:
1221
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1222
                         for uuid, node in all_info.iteritems())
1223
    else:
1224
      oob_support = None
1225

    
1226
    if query.NQ_GROUP in self.requested_data:
1227
      groups = lu.cfg.GetAllNodeGroupsInfo()
1228
    else:
1229
      groups = {}
1230

    
1231
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1232
                               live_data, lu.cfg.GetMasterNode(),
1233
                               node_to_primary, node_to_secondary, groups,
1234
                               oob_support, lu.cfg.GetClusterInfo())
1235

    
1236

    
1237
class LUNodeQuery(NoHooksLU):
1238
  """Logical unit for querying nodes.
1239

1240
  """
1241
  # pylint: disable=W0142
1242
  REQ_BGL = False
1243

    
1244
  def CheckArguments(self):
1245
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1246
                         self.op.output_fields, self.op.use_locking)
1247

    
1248
  def ExpandNames(self):
1249
    self.nq.ExpandNames(self)
1250

    
1251
  def DeclareLocks(self, level):
1252
    self.nq.DeclareLocks(self, level)
1253

    
1254
  def Exec(self, feedback_fn):
1255
    return self.nq.OldStyleQuery(self)
1256

    
1257

    
1258
def _CheckOutputFields(static, dynamic, selected):
1259
  """Checks whether all selected fields are valid.
1260

1261
  @type static: L{utils.FieldSet}
1262
  @param static: static fields set
1263
  @type dynamic: L{utils.FieldSet}
1264
  @param dynamic: dynamic fields set
1265

1266
  """
1267
  f = utils.FieldSet()
1268
  f.Extend(static)
1269
  f.Extend(dynamic)
1270

    
1271
  delta = f.NonMatching(selected)
1272
  if delta:
1273
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1274
                               % ",".join(delta), errors.ECODE_INVAL)
1275

    
1276

    
1277
class LUNodeQueryvols(NoHooksLU):
1278
  """Logical unit for getting volumes on node(s).
1279

1280
  """
1281
  REQ_BGL = False
1282
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1283
  _FIELDS_STATIC = utils.FieldSet("node")
1284

    
1285
  def CheckArguments(self):
1286
    _CheckOutputFields(static=self._FIELDS_STATIC,
1287
                       dynamic=self._FIELDS_DYNAMIC,
1288
                       selected=self.op.output_fields)
1289

    
1290
  def ExpandNames(self):
1291
    self.share_locks = ShareAll()
1292

    
1293
    if self.op.nodes:
1294
      self.needed_locks = {
1295
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1296
        }
1297
    else:
1298
      self.needed_locks = {
1299
        locking.LEVEL_NODE: locking.ALL_SET,
1300
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1301
        }
1302

    
1303
  def Exec(self, feedback_fn):
1304
    """Computes the list of nodes and their attributes.
1305

1306
    """
1307
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1308
    volumes = self.rpc.call_node_volumes(node_uuids)
1309

    
1310
    ilist = self.cfg.GetAllInstancesInfo()
1311
    vol2inst = MapInstanceDisksToNodes(ilist.values())
1312

    
1313
    output = []
1314
    for node_uuid in node_uuids:
1315
      nresult = volumes[node_uuid]
1316
      if nresult.offline:
1317
        continue
1318
      msg = nresult.fail_msg
1319
      if msg:
1320
        self.LogWarning("Can't compute volume data on node %s: %s",
1321
                        self.cfg.GetNodeName(node_uuid), msg)
1322
        continue
1323

    
1324
      node_vols = sorted(nresult.payload,
1325
                         key=operator.itemgetter("dev"))
1326

    
1327
      for vol in node_vols:
1328
        node_output = []
1329
        for field in self.op.output_fields:
1330
          if field == "node":
1331
            val = self.cfg.GetNodeName(node_uuid)
1332
          elif field == "phys":
1333
            val = vol["dev"]
1334
          elif field == "vg":
1335
            val = vol["vg"]
1336
          elif field == "name":
1337
            val = vol["name"]
1338
          elif field == "size":
1339
            val = int(float(vol["size"]))
1340
          elif field == "instance":
1341
            val = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]), "-")
1342
          else:
1343
            raise errors.ParameterError(field)
1344
          node_output.append(str(val))
1345

    
1346
        output.append(node_output)
1347

    
1348
    return output
1349

    
1350

    
1351
class LUNodeQueryStorage(NoHooksLU):
1352
  """Logical unit for getting information on storage units on node(s).
1353

1354
  """
1355
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1356
  REQ_BGL = False
1357

    
1358
  def CheckArguments(self):
1359
    _CheckOutputFields(static=self._FIELDS_STATIC,
1360
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1361
                       selected=self.op.output_fields)
1362

    
1363
  def ExpandNames(self):
1364
    self.share_locks = ShareAll()
1365

    
1366
    if self.op.nodes:
1367
      self.needed_locks = {
1368
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1369
        }
1370
    else:
1371
      self.needed_locks = {
1372
        locking.LEVEL_NODE: locking.ALL_SET,
1373
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1374
        }
1375

    
1376
  def Exec(self, feedback_fn):
1377
    """Computes the list of nodes and their attributes.
1378

1379
    """
1380
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1381

    
1382
    # Always get name to sort by
1383
    if constants.SF_NAME in self.op.output_fields:
1384
      fields = self.op.output_fields[:]
1385
    else:
1386
      fields = [constants.SF_NAME] + self.op.output_fields
1387

    
1388
    # Never ask for node or type as it's only known to the LU
1389
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1390
      while extra in fields:
1391
        fields.remove(extra)
1392

    
1393
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1394
    name_idx = field_idx[constants.SF_NAME]
1395

    
1396
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1397
    data = self.rpc.call_storage_list(self.node_uuids,
1398
                                      self.op.storage_type, st_args,
1399
                                      self.op.name, fields)
1400

    
1401
    result = []
1402

    
1403
    for node_uuid in utils.NiceSort(self.node_uuids):
1404
      node_name = self.cfg.GetNodeName(node_uuid)
1405
      nresult = data[node_uuid]
1406
      if nresult.offline:
1407
        continue
1408

    
1409
      msg = nresult.fail_msg
1410
      if msg:
1411
        self.LogWarning("Can't get storage data from node %s: %s",
1412
                        node_name, msg)
1413
        continue
1414

    
1415
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1416

    
1417
      for name in utils.NiceSort(rows.keys()):
1418
        row = rows[name]
1419

    
1420
        out = []
1421

    
1422
        for field in self.op.output_fields:
1423
          if field == constants.SF_NODE:
1424
            val = node_name
1425
          elif field == constants.SF_TYPE:
1426
            val = self.op.storage_type
1427
          elif field in field_idx:
1428
            val = row[field_idx[field]]
1429
          else:
1430
            raise errors.ParameterError(field)
1431

    
1432
          out.append(val)
1433

    
1434
        result.append(out)
1435

    
1436
    return result
1437

    
1438

    
1439
class LUNodeRemove(LogicalUnit):
1440
  """Logical unit for removing a node.
1441

1442
  """
1443
  HPATH = "node-remove"
1444
  HTYPE = constants.HTYPE_NODE
1445

    
1446
  def BuildHooksEnv(self):
1447
    """Build hooks env.
1448

1449
    """
1450
    return {
1451
      "OP_TARGET": self.op.node_name,
1452
      "NODE_NAME": self.op.node_name,
1453
      }
1454

    
1455
  def BuildHooksNodes(self):
1456
    """Build hooks nodes.
1457

1458
    This doesn't run on the target node in the pre phase as a failed
1459
    node would then be impossible to remove.
1460

1461
    """
1462
    all_nodes = self.cfg.GetNodeList()
1463
    try:
1464
      all_nodes.remove(self.op.node_uuid)
1465
    except ValueError:
1466
      pass
1467
    return (all_nodes, all_nodes)
1468

    
1469
  def CheckPrereq(self):
1470
    """Check prerequisites.
1471

1472
    This checks:
1473
     - the node exists in the configuration
1474
     - it does not have primary or secondary instances
1475
     - it's not the master
1476

1477
    Any errors are signaled by raising errors.OpPrereqError.
1478

1479
    """
1480
    (self.op.node_uuid, self.op.node_name) = \
1481
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1482
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1483
    assert node is not None
1484

    
1485
    masternode = self.cfg.GetMasterNode()
1486
    if node.uuid == masternode:
1487
      raise errors.OpPrereqError("Node is the master node, failover to another"
1488
                                 " node is required", errors.ECODE_INVAL)
1489

    
1490
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1491
      if node.uuid in instance.all_nodes:
1492
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1493
                                   " please remove first" % instance_name,
1494
                                   errors.ECODE_INVAL)
1495
    self.op.node_name = node.name
1496
    self.node = node
1497

    
1498
  def Exec(self, feedback_fn):
1499
    """Removes the node from the cluster.
1500

1501
    """
1502
    logging.info("Stopping the node daemon and removing configs from node %s",
1503
                 self.node.name)
1504

    
1505
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1506

    
1507
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1508
      "Not owning BGL"
1509

    
1510
    # Promote nodes to master candidate as needed
1511
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1512
    self.context.RemoveNode(self.node)
1513

    
1514
    # Run post hooks on the node before it's removed
1515
    RunPostHook(self, self.node.name)
1516

    
1517
    # we have to call this by name rather than by UUID, as the node is no longer
1518
    # in the config
1519
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1520
    msg = result.fail_msg
1521
    if msg:
1522
      self.LogWarning("Errors encountered on the remote node while leaving"
1523
                      " the cluster: %s", msg)
1524

    
1525
    # Remove node from our /etc/hosts
1526
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1527
      master_node_uuid = self.cfg.GetMasterNode()
1528
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1529
                                              constants.ETC_HOSTS_REMOVE,
1530
                                              self.node.name, None)
1531
      result.Raise("Can't update hosts file with new host data")
1532
      RedistributeAncillaryFiles(self)
1533

    
1534

    
1535
class LURepairNodeStorage(NoHooksLU):
1536
  """Repairs the volume group on a node.
1537

1538
  """
1539
  REQ_BGL = False
1540

    
1541
  def CheckArguments(self):
1542
    (self.op.node_uuid, self.op.node_name) = \
1543
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1544

    
1545
    storage_type = self.op.storage_type
1546

    
1547
    if (constants.SO_FIX_CONSISTENCY not in
1548
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1549
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1550
                                 " repaired" % storage_type,
1551
                                 errors.ECODE_INVAL)
1552

    
1553
  def ExpandNames(self):
1554
    self.needed_locks = {
1555
      locking.LEVEL_NODE: [self.op.node_uuid],
1556
      }
1557

    
1558
  def _CheckFaultyDisks(self, instance, node_uuid):
1559
    """Ensure faulty disks abort the opcode or at least warn."""
1560
    try:
1561
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1562
                                 node_uuid, True):
1563
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1564
                                   " node '%s'" %
1565
                                   (instance.name,
1566
                                    self.cfg.GetNodeName(node_uuid)),
1567
                                   errors.ECODE_STATE)
1568
    except errors.OpPrereqError, err:
1569
      if self.op.ignore_consistency:
1570
        self.LogWarning(str(err.args[0]))
1571
      else:
1572
        raise
1573

    
1574
  def CheckPrereq(self):
1575
    """Check prerequisites.
1576

1577
    """
1578
    # Check whether any instance on this node has faulty disks
1579
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1580
      if not inst.disks_active:
1581
        continue
1582
      check_nodes = set(inst.all_nodes)
1583
      check_nodes.discard(self.op.node_uuid)
1584
      for inst_node_uuid in check_nodes:
1585
        self._CheckFaultyDisks(inst, inst_node_uuid)
1586

    
1587
  def Exec(self, feedback_fn):
1588
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1589
                (self.op.name, self.op.node_name))
1590

    
1591
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1592
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1593
                                           self.op.storage_type, st_args,
1594
                                           self.op.name,
1595
                                           constants.SO_FIX_CONSISTENCY)
1596
    result.Raise("Failed to repair storage unit '%s' on %s" %
1597
                 (self.op.name, self.op.node_name))