Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 8ef418bb

History | View | Annotate | Download (56.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: string
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
74
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75

76
  """
77
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
78
  result.Raise("Failure checking secondary ip on node %s" % node,
79
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
80
  if not result.payload:
81
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82
           " please fix and re-run this command" % secondary_ip)
83
    if prereq:
84
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85
    else:
86
      raise errors.OpExecError(msg)
87

    
88

    
89
class LUNodeAdd(LogicalUnit):
90
  """Logical unit for adding node to the cluster.
91

92
  """
93
  HPATH = "node-add"
94
  HTYPE = constants.HTYPE_NODE
95
  _NFLAGS = ["master_capable", "vm_capable"]
96

    
97
  def CheckArguments(self):
98
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99
    # validate/normalize the node name
100
    self.hostname = netutils.GetHostname(name=self.op.node_name,
101
                                         family=self.primary_ip_family)
102
    self.op.node_name = self.hostname.name
103

    
104
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
105
      raise errors.OpPrereqError("Cannot readd the master node",
106
                                 errors.ECODE_STATE)
107

    
108
    if self.op.readd and self.op.group:
109
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
110
                                 " being readded", errors.ECODE_INVAL)
111

    
112
  def BuildHooksEnv(self):
113
    """Build hooks env.
114

115
    This will run on all nodes before, and on all nodes + the new node after.
116

117
    """
118
    return {
119
      "OP_TARGET": self.op.node_name,
120
      "NODE_NAME": self.op.node_name,
121
      "NODE_PIP": self.op.primary_ip,
122
      "NODE_SIP": self.op.secondary_ip,
123
      "MASTER_CAPABLE": str(self.op.master_capable),
124
      "VM_CAPABLE": str(self.op.vm_capable),
125
      }
126

    
127
  def BuildHooksNodes(self):
128
    """Build hooks nodes.
129

130
    """
131
    # Exclude added node
132
    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
133
    post_nodes = pre_nodes + [self.op.node_name, ]
134

    
135
    return (pre_nodes, post_nodes)
136

    
137
  def CheckPrereq(self):
138
    """Check prerequisites.
139

140
    This checks:
141
     - the new node is not already in the config
142
     - it is resolvable
143
     - its parameters (single/dual homed) matches the cluster
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    cfg = self.cfg
149
    hostname = self.hostname
150
    node = hostname.name
151
    primary_ip = self.op.primary_ip = hostname.ip
152
    if self.op.secondary_ip is None:
153
      if self.primary_ip_family == netutils.IP6Address.family:
154
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
155
                                   " IPv4 address must be given as secondary",
156
                                   errors.ECODE_INVAL)
157
      self.op.secondary_ip = primary_ip
158

    
159
    secondary_ip = self.op.secondary_ip
160
    if not netutils.IP4Address.IsValid(secondary_ip):
161
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
162
                                 " address" % secondary_ip, errors.ECODE_INVAL)
163

    
164
    node_list = cfg.GetNodeList()
165
    if not self.op.readd and node in node_list:
166
      raise errors.OpPrereqError("Node %s is already in the configuration" %
167
                                 node, errors.ECODE_EXISTS)
168
    elif self.op.readd and node not in node_list:
169
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
170
                                 errors.ECODE_NOENT)
171

    
172
    self.changed_primary_ip = False
173

    
174
    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
175
      if self.op.readd and node == existing_node_name:
176
        if existing_node.secondary_ip != secondary_ip:
177
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
178
                                     " address configuration as before",
179
                                     errors.ECODE_INVAL)
180
        if existing_node.primary_ip != primary_ip:
181
          self.changed_primary_ip = True
182

    
183
        continue
184

    
185
      if (existing_node.primary_ip == primary_ip or
186
          existing_node.secondary_ip == primary_ip or
187
          existing_node.primary_ip == secondary_ip or
188
          existing_node.secondary_ip == secondary_ip):
189
        raise errors.OpPrereqError("New node ip address(es) conflict with"
190
                                   " existing node %s" % existing_node.name,
191
                                   errors.ECODE_NOTUNIQUE)
192

    
193
    # After this 'if' block, None is no longer a valid value for the
194
    # _capable op attributes
195
    if self.op.readd:
196
      old_node = self.cfg.GetNodeInfo(node)
197
      assert old_node is not None, "Can't retrieve locked node %s" % node
198
      for attr in self._NFLAGS:
199
        if getattr(self.op, attr) is None:
200
          setattr(self.op, attr, getattr(old_node, attr))
201
    else:
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, True)
205

    
206
    if self.op.readd and not self.op.vm_capable:
207
      pri, sec = cfg.GetNodeInstances(node)
208
      if pri or sec:
209
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
210
                                   " flag set to false, but it already holds"
211
                                   " instances" % node,
212
                                   errors.ECODE_STATE)
213

    
214
    # check that the type of the node (single versus dual homed) is the
215
    # same as for the master
216
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
217
    master_singlehomed = myself.secondary_ip == myself.primary_ip
218
    newbie_singlehomed = secondary_ip == primary_ip
219
    if master_singlehomed != newbie_singlehomed:
220
      if master_singlehomed:
221
        raise errors.OpPrereqError("The master has no secondary ip but the"
222
                                   " new node has one",
223
                                   errors.ECODE_INVAL)
224
      else:
225
        raise errors.OpPrereqError("The master has a secondary ip but the"
226
                                   " new node doesn't have one",
227
                                   errors.ECODE_INVAL)
228

    
229
    # checks reachability
230
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
231
      raise errors.OpPrereqError("Node not reachable by ping",
232
                                 errors.ECODE_ENVIRON)
233

    
234
    if not newbie_singlehomed:
235
      # check reachability from my secondary ip to newbie's secondary ip
236
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
237
                              source=myself.secondary_ip):
238
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
239
                                   " based ping to node daemon port",
240
                                   errors.ECODE_ENVIRON)
241

    
242
    if self.op.readd:
243
      exceptions = [node]
244
    else:
245
      exceptions = []
246

    
247
    if self.op.master_capable:
248
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
249
    else:
250
      self.master_candidate = False
251

    
252
    if self.op.readd:
253
      self.new_node = old_node
254
    else:
255
      node_group = cfg.LookupNodeGroup(self.op.group)
256
      self.new_node = objects.Node(name=node,
257
                                   primary_ip=primary_ip,
258
                                   secondary_ip=secondary_ip,
259
                                   master_candidate=self.master_candidate,
260
                                   offline=False, drained=False,
261
                                   group=node_group, ndparams={})
262

    
263
    if self.op.ndparams:
264
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
265
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
266
                           "node", "cluster or group")
267

    
268
    if self.op.hv_state:
269
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
270

    
271
    if self.op.disk_state:
272
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
273

    
274
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
275
    #       it a property on the base class.
276
    rpcrunner = rpc.DnsOnlyRunner()
277
    result = rpcrunner.call_version([node])[node]
278
    result.Raise("Can't get version information from node %s" % node)
279
    if constants.PROTOCOL_VERSION == result.payload:
280
      logging.info("Communication to node %s fine, sw version %s match",
281
                   node, result.payload)
282
    else:
283
      raise errors.OpPrereqError("Version mismatch master version %s,"
284
                                 " node version %s" %
285
                                 (constants.PROTOCOL_VERSION, result.payload),
286
                                 errors.ECODE_ENVIRON)
287

    
288
    vg_name = cfg.GetVGName()
289
    if vg_name is not None:
290
      vparams = {constants.NV_PVLIST: [vg_name]}
291
      excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
292
      cname = self.cfg.GetClusterName()
293
      result = rpcrunner.call_node_verify_light(
294
          [node], vparams, cname, cfg.GetClusterInfo().hvparams)[node]
295
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
296
      if errmsgs:
297
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
298
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
299

    
300
  def Exec(self, feedback_fn):
301
    """Adds the new node to the cluster.
302

303
    """
304
    new_node = self.new_node
305
    node = new_node.name
306

    
307
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
308
      "Not owning BGL"
309

    
310
    # We adding a new node so we assume it's powered
311
    new_node.powered = True
312

    
313
    # for re-adds, reset the offline/drained/master-candidate flags;
314
    # we need to reset here, otherwise offline would prevent RPC calls
315
    # later in the procedure; this also means that if the re-add
316
    # fails, we are left with a non-offlined, broken node
317
    if self.op.readd:
318
      new_node.drained = new_node.offline = False # pylint: disable=W0201
319
      self.LogInfo("Readding a node, the offline/drained flags were reset")
320
      # if we demote the node, we do cleanup later in the procedure
321
      new_node.master_candidate = self.master_candidate
322
      if self.changed_primary_ip:
323
        new_node.primary_ip = self.op.primary_ip
324

    
325
    # copy the master/vm_capable flags
326
    for attr in self._NFLAGS:
327
      setattr(new_node, attr, getattr(self.op, attr))
328

    
329
    # notify the user about any possible mc promotion
330
    if new_node.master_candidate:
331
      self.LogInfo("Node will be a master candidate")
332

    
333
    if self.op.ndparams:
334
      new_node.ndparams = self.op.ndparams
335
    else:
336
      new_node.ndparams = {}
337

    
338
    if self.op.hv_state:
339
      new_node.hv_state_static = self.new_hv_state
340

    
341
    if self.op.disk_state:
342
      new_node.disk_state_static = self.new_disk_state
343

    
344
    # Add node to our /etc/hosts, and add key to known_hosts
345
    if self.cfg.GetClusterInfo().modify_etc_hosts:
346
      master_node = self.cfg.GetMasterNode()
347
      result = self.rpc.call_etc_hosts_modify(master_node,
348
                                              constants.ETC_HOSTS_ADD,
349
                                              self.hostname.name,
350
                                              self.hostname.ip)
351
      result.Raise("Can't update hosts file with new host data")
352

    
353
    if new_node.secondary_ip != new_node.primary_ip:
354
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
355
                               False)
356

    
357
    node_verify_list = [self.cfg.GetMasterNode()]
358
    node_verify_param = {
359
      constants.NV_NODELIST: ([node], {}),
360
      # TODO: do a node-net-test as well?
361
    }
362

    
363
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
364
                                       self.cfg.GetClusterName(),
365
                                       self.cfg.GetClusterInfo().hvparams)
366
    for verifier in node_verify_list:
367
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
368
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
369
      if nl_payload:
370
        for failed in nl_payload:
371
          feedback_fn("ssh/hostname verification failed"
372
                      " (checking from %s): %s" %
373
                      (verifier, nl_payload[failed]))
374
        raise errors.OpExecError("ssh/hostname verification failed")
375

    
376
    if self.op.readd:
377
      RedistributeAncillaryFiles(self)
378
      self.context.ReaddNode(new_node)
379
      # make sure we redistribute the config
380
      self.cfg.Update(new_node, feedback_fn)
381
      # and make sure the new node will not have old files around
382
      if not new_node.master_candidate:
383
        result = self.rpc.call_node_demote_from_mc(new_node.name)
384
        result.Warn("Node failed to demote itself from master candidate status",
385
                    self.LogWarning)
386
    else:
387
      RedistributeAncillaryFiles(self, additional_nodes=[node],
388
                                 additional_vm=self.op.vm_capable)
389
      self.context.AddNode(new_node, self.proc.GetECId())
390

    
391

    
392
class LUNodeSetParams(LogicalUnit):
393
  """Modifies the parameters of a node.
394

395
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
396
      to the node role (as _ROLE_*)
397
  @cvar _R2F: a dictionary from node role to tuples of flags
398
  @cvar _FLAGS: a list of attribute names corresponding to the flags
399

400
  """
401
  HPATH = "node-modify"
402
  HTYPE = constants.HTYPE_NODE
403
  REQ_BGL = False
404
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
405
  _F2R = {
406
    (True, False, False): _ROLE_CANDIDATE,
407
    (False, True, False): _ROLE_DRAINED,
408
    (False, False, True): _ROLE_OFFLINE,
409
    (False, False, False): _ROLE_REGULAR,
410
    }
411
  _R2F = dict((v, k) for k, v in _F2R.items())
412
  _FLAGS = ["master_candidate", "drained", "offline"]
413

    
414
  def CheckArguments(self):
415
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
416
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
417
                self.op.master_capable, self.op.vm_capable,
418
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
419
                self.op.disk_state]
420
    if all_mods.count(None) == len(all_mods):
421
      raise errors.OpPrereqError("Please pass at least one modification",
422
                                 errors.ECODE_INVAL)
423
    if all_mods.count(True) > 1:
424
      raise errors.OpPrereqError("Can't set the node into more than one"
425
                                 " state at the same time",
426
                                 errors.ECODE_INVAL)
427

    
428
    # Boolean value that tells us whether we might be demoting from MC
429
    self.might_demote = (self.op.master_candidate is False or
430
                         self.op.offline is True or
431
                         self.op.drained is True or
432
                         self.op.master_capable is False)
433

    
434
    if self.op.secondary_ip:
435
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
436
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
437
                                   " address" % self.op.secondary_ip,
438
                                   errors.ECODE_INVAL)
439

    
440
    self.lock_all = self.op.auto_promote and self.might_demote
441
    self.lock_instances = self.op.secondary_ip is not None
442

    
443
  def _InstanceFilter(self, instance):
444
    """Filter for getting affected instances.
445

446
    """
447
    return (instance.disk_template in constants.DTS_INT_MIRROR and
448
            self.op.node_name in instance.all_nodes)
449

    
450
  def ExpandNames(self):
451
    if self.lock_all:
452
      self.needed_locks = {
453
        locking.LEVEL_NODE: locking.ALL_SET,
454

    
455
        # Block allocations when all nodes are locked
456
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
457
        }
458
    else:
459
      self.needed_locks = {
460
        locking.LEVEL_NODE: self.op.node_name,
461
        }
462

    
463
    # Since modifying a node can have severe effects on currently running
464
    # operations the resource lock is at least acquired in shared mode
465
    self.needed_locks[locking.LEVEL_NODE_RES] = \
466
      self.needed_locks[locking.LEVEL_NODE]
467

    
468
    # Get all locks except nodes in shared mode; they are not used for anything
469
    # but read-only access
470
    self.share_locks = ShareAll()
471
    self.share_locks[locking.LEVEL_NODE] = 0
472
    self.share_locks[locking.LEVEL_NODE_RES] = 0
473
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
474

    
475
    if self.lock_instances:
476
      self.needed_locks[locking.LEVEL_INSTANCE] = \
477
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
478

    
479
  def BuildHooksEnv(self):
480
    """Build hooks env.
481

482
    This runs on the master node.
483

484
    """
485
    return {
486
      "OP_TARGET": self.op.node_name,
487
      "MASTER_CANDIDATE": str(self.op.master_candidate),
488
      "OFFLINE": str(self.op.offline),
489
      "DRAINED": str(self.op.drained),
490
      "MASTER_CAPABLE": str(self.op.master_capable),
491
      "VM_CAPABLE": str(self.op.vm_capable),
492
      }
493

    
494
  def BuildHooksNodes(self):
495
    """Build hooks nodes.
496

497
    """
498
    nl = [self.cfg.GetMasterNode(), self.op.node_name]
499
    return (nl, nl)
500

    
501
  def CheckPrereq(self):
502
    """Check prerequisites.
503

504
    This only checks the instance list against the existing names.
505

506
    """
507
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
508

    
509
    if self.lock_instances:
510
      affected_instances = \
511
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
512

    
513
      # Verify instance locks
514
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
515
      wanted_instances = frozenset(affected_instances.keys())
516
      if wanted_instances - owned_instances:
517
        raise errors.OpPrereqError("Instances affected by changing node %s's"
518
                                   " secondary IP address have changed since"
519
                                   " locks were acquired, wanted '%s', have"
520
                                   " '%s'; retry the operation" %
521
                                   (self.op.node_name,
522
                                    utils.CommaJoin(wanted_instances),
523
                                    utils.CommaJoin(owned_instances)),
524
                                   errors.ECODE_STATE)
525
    else:
526
      affected_instances = None
527

    
528
    if (self.op.master_candidate is not None or
529
        self.op.drained is not None or
530
        self.op.offline is not None):
531
      # we can't change the master's node flags
532
      if self.op.node_name == self.cfg.GetMasterNode():
533
        raise errors.OpPrereqError("The master role can be changed"
534
                                   " only via master-failover",
535
                                   errors.ECODE_INVAL)
536

    
537
    if self.op.master_candidate and not node.master_capable:
538
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
539
                                 " it a master candidate" % node.name,
540
                                 errors.ECODE_STATE)
541

    
542
    if self.op.vm_capable is False:
543
      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
544
      if ipri or isec:
545
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
546
                                   " the vm_capable flag" % node.name,
547
                                   errors.ECODE_STATE)
548

    
549
    if node.master_candidate and self.might_demote and not self.lock_all:
550
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
551
      # check if after removing the current node, we're missing master
552
      # candidates
553
      (mc_remaining, mc_should, _) = \
554
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
555
      if mc_remaining < mc_should:
556
        raise errors.OpPrereqError("Not enough master candidates, please"
557
                                   " pass auto promote option to allow"
558
                                   " promotion (--auto-promote or RAPI"
559
                                   " auto_promote=True)", errors.ECODE_STATE)
560

    
561
    self.old_flags = old_flags = (node.master_candidate,
562
                                  node.drained, node.offline)
563
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
564
    self.old_role = old_role = self._F2R[old_flags]
565

    
566
    # Check for ineffective changes
567
    for attr in self._FLAGS:
568
      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
569
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
570
        setattr(self.op, attr, None)
571

    
572
    # Past this point, any flag change to False means a transition
573
    # away from the respective state, as only real changes are kept
574

    
575
    # TODO: We might query the real power state if it supports OOB
576
    if SupportsOob(self.cfg, node):
577
      if self.op.offline is False and not (node.powered or
578
                                           self.op.powered is True):
579
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
580
                                    " offline status can be reset") %
581
                                   self.op.node_name, errors.ECODE_STATE)
582
    elif self.op.powered is not None:
583
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
584
                                  " as it does not support out-of-band"
585
                                  " handling") % self.op.node_name,
586
                                 errors.ECODE_STATE)
587

    
588
    # If we're being deofflined/drained, we'll MC ourself if needed
589
    if (self.op.drained is False or self.op.offline is False or
590
        (self.op.master_capable and not node.master_capable)):
591
      if _DecideSelfPromotion(self):
592
        self.op.master_candidate = True
593
        self.LogInfo("Auto-promoting node to master candidate")
594

    
595
    # If we're no longer master capable, we'll demote ourselves from MC
596
    if self.op.master_capable is False and node.master_candidate:
597
      self.LogInfo("Demoting from master candidate")
598
      self.op.master_candidate = False
599

    
600
    # Compute new role
601
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
602
    if self.op.master_candidate:
603
      new_role = self._ROLE_CANDIDATE
604
    elif self.op.drained:
605
      new_role = self._ROLE_DRAINED
606
    elif self.op.offline:
607
      new_role = self._ROLE_OFFLINE
608
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
609
      # False is still in new flags, which means we're un-setting (the
610
      # only) True flag
611
      new_role = self._ROLE_REGULAR
612
    else: # no new flags, nothing, keep old role
613
      new_role = old_role
614

    
615
    self.new_role = new_role
616

    
617
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
618
      # Trying to transition out of offline status
619
      result = self.rpc.call_version([node.name])[node.name]
620
      if result.fail_msg:
621
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
622
                                   " to report its version: %s" %
623
                                   (node.name, result.fail_msg),
624
                                   errors.ECODE_STATE)
625
      else:
626
        self.LogWarning("Transitioning node from offline to online state"
627
                        " without using re-add. Please make sure the node"
628
                        " is healthy!")
629

    
630
    # When changing the secondary ip, verify if this is a single-homed to
631
    # multi-homed transition or vice versa, and apply the relevant
632
    # restrictions.
633
    if self.op.secondary_ip:
634
      # Ok even without locking, because this can't be changed by any LU
635
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
636
      master_singlehomed = master.secondary_ip == master.primary_ip
637
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
638
        if self.op.force and node.name == master.name:
639
          self.LogWarning("Transitioning from single-homed to multi-homed"
640
                          " cluster; all nodes will require a secondary IP"
641
                          " address")
642
        else:
643
          raise errors.OpPrereqError("Changing the secondary ip on a"
644
                                     " single-homed cluster requires the"
645
                                     " --force option to be passed, and the"
646
                                     " target node to be the master",
647
                                     errors.ECODE_INVAL)
648
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
649
        if self.op.force and node.name == master.name:
650
          self.LogWarning("Transitioning from multi-homed to single-homed"
651
                          " cluster; secondary IP addresses will have to be"
652
                          " removed")
653
        else:
654
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
655
                                     " same as the primary IP on a multi-homed"
656
                                     " cluster, unless the --force option is"
657
                                     " passed, and the target node is the"
658
                                     " master", errors.ECODE_INVAL)
659

    
660
      assert not (frozenset(affected_instances) -
661
                  self.owned_locks(locking.LEVEL_INSTANCE))
662

    
663
      if node.offline:
664
        if affected_instances:
665
          msg = ("Cannot change secondary IP address: offline node has"
666
                 " instances (%s) configured to use it" %
667
                 utils.CommaJoin(affected_instances.keys()))
668
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
669
      else:
670
        # On online nodes, check that no instances are running, and that
671
        # the node has the new ip and we can reach it.
672
        for instance in affected_instances.values():
673
          CheckInstanceState(self, instance, INSTANCE_DOWN,
674
                             msg="cannot change secondary ip")
675

    
676
        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
677
        if master.name != node.name:
678
          # check reachability from master secondary ip to new secondary ip
679
          if not netutils.TcpPing(self.op.secondary_ip,
680
                                  constants.DEFAULT_NODED_PORT,
681
                                  source=master.secondary_ip):
682
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
683
                                       " based ping to node daemon port",
684
                                       errors.ECODE_ENVIRON)
685

    
686
    if self.op.ndparams:
687
      new_ndparams = GetUpdatedParams(self.node.ndparams, self.op.ndparams)
688
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
689
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
690
                           "node", "cluster or group")
691
      self.new_ndparams = new_ndparams
692

    
693
    if self.op.hv_state:
694
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
695
                                                self.node.hv_state_static)
696

    
697
    if self.op.disk_state:
698
      self.new_disk_state = \
699
        MergeAndVerifyDiskState(self.op.disk_state,
700
                                self.node.disk_state_static)
701

    
702
  def Exec(self, feedback_fn):
703
    """Modifies a node.
704

705
    """
706
    node = self.node
707
    old_role = self.old_role
708
    new_role = self.new_role
709

    
710
    result = []
711

    
712
    if self.op.ndparams:
713
      node.ndparams = self.new_ndparams
714

    
715
    if self.op.powered is not None:
716
      node.powered = self.op.powered
717

    
718
    if self.op.hv_state:
719
      node.hv_state_static = self.new_hv_state
720

    
721
    if self.op.disk_state:
722
      node.disk_state_static = self.new_disk_state
723

    
724
    for attr in ["master_capable", "vm_capable"]:
725
      val = getattr(self.op, attr)
726
      if val is not None:
727
        setattr(node, attr, val)
728
        result.append((attr, str(val)))
729

    
730
    if new_role != old_role:
731
      # Tell the node to demote itself, if no longer MC and not offline
732
      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
733
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
734
        if msg:
735
          self.LogWarning("Node failed to demote itself: %s", msg)
736

    
737
      new_flags = self._R2F[new_role]
738
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
739
        if of != nf:
740
          result.append((desc, str(nf)))
741
      (node.master_candidate, node.drained, node.offline) = new_flags
742

    
743
      # we locked all nodes, we adjust the CP before updating this node
744
      if self.lock_all:
745
        AdjustCandidatePool(self, [node.name])
746

    
747
    if self.op.secondary_ip:
748
      node.secondary_ip = self.op.secondary_ip
749
      result.append(("secondary_ip", self.op.secondary_ip))
750

    
751
    # this will trigger configuration file update, if needed
752
    self.cfg.Update(node, feedback_fn)
753

    
754
    # this will trigger job queue propagation or cleanup if the mc
755
    # flag changed
756
    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
757
      self.context.ReaddNode(node)
758

    
759
    return result
760

    
761

    
762
class LUNodePowercycle(NoHooksLU):
763
  """Powercycles a node.
764

765
  """
766
  REQ_BGL = False
767

    
768
  def CheckArguments(self):
769
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
770
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
771
      raise errors.OpPrereqError("The node is the master and the force"
772
                                 " parameter was not set",
773
                                 errors.ECODE_INVAL)
774

    
775
  def ExpandNames(self):
776
    """Locking for PowercycleNode.
777

778
    This is a last-resort option and shouldn't block on other
779
    jobs. Therefore, we grab no locks.
780

781
    """
782
    self.needed_locks = {}
783

    
784
  def Exec(self, feedback_fn):
785
    """Reboots a node.
786

787
    """
788
    default_hypervisor = self.cfg.GetHypervisorType()
789
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
790
    result = self.rpc.call_node_powercycle(self.op.node_name,
791
                                           default_hypervisor,
792
                                           hvparams)
793
    result.Raise("Failed to schedule the reboot")
794
    return result.payload
795

    
796

    
797
def _GetNodeInstancesInner(cfg, fn):
798
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
799

    
800

    
801
def _GetNodePrimaryInstances(cfg, node_name):
802
  """Returns primary instances on a node.
803

804
  """
805
  return _GetNodeInstancesInner(cfg,
806
                                lambda inst: node_name == inst.primary_node)
807

    
808

    
809
def _GetNodeSecondaryInstances(cfg, node_name):
810
  """Returns secondary instances on a node.
811

812
  """
813
  return _GetNodeInstancesInner(cfg,
814
                                lambda inst: node_name in inst.secondary_nodes)
815

    
816

    
817
def _GetNodeInstances(cfg, node_name):
818
  """Returns a list of all primary and secondary instances on a node.
819

820
  """
821

    
822
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
823

    
824

    
825
class LUNodeEvacuate(NoHooksLU):
826
  """Evacuates instances off a list of nodes.
827

828
  """
829
  REQ_BGL = False
830

    
831
  _MODE2IALLOCATOR = {
832
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
833
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
834
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
835
    }
836
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
837
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
838
          constants.IALLOCATOR_NEVAC_MODES)
839

    
840
  def CheckArguments(self):
841
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
842

    
843
  def ExpandNames(self):
844
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
845

    
846
    if self.op.remote_node is not None:
847
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
848
      assert self.op.remote_node
849

    
850
      if self.op.remote_node == self.op.node_name:
851
        raise errors.OpPrereqError("Can not use evacuated node as a new"
852
                                   " secondary node", errors.ECODE_INVAL)
853

    
854
      if self.op.mode != constants.NODE_EVAC_SEC:
855
        raise errors.OpPrereqError("Without the use of an iallocator only"
856
                                   " secondary instances can be evacuated",
857
                                   errors.ECODE_INVAL)
858

    
859
    # Declare locks
860
    self.share_locks = ShareAll()
861
    self.needed_locks = {
862
      locking.LEVEL_INSTANCE: [],
863
      locking.LEVEL_NODEGROUP: [],
864
      locking.LEVEL_NODE: [],
865
      }
866

    
867
    # Determine nodes (via group) optimistically, needs verification once locks
868
    # have been acquired
869
    self.lock_nodes = self._DetermineNodes()
870

    
871
  def _DetermineNodes(self):
872
    """Gets the list of nodes to operate on.
873

874
    """
875
    if self.op.remote_node is None:
876
      # Iallocator will choose any node(s) in the same group
877
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
878
    else:
879
      group_nodes = frozenset([self.op.remote_node])
880

    
881
    # Determine nodes to be locked
882
    return set([self.op.node_name]) | group_nodes
883

    
884
  def _DetermineInstances(self):
885
    """Builds list of instances to operate on.
886

887
    """
888
    assert self.op.mode in constants.NODE_EVAC_MODES
889

    
890
    if self.op.mode == constants.NODE_EVAC_PRI:
891
      # Primary instances only
892
      inst_fn = _GetNodePrimaryInstances
893
      assert self.op.remote_node is None, \
894
        "Evacuating primary instances requires iallocator"
895
    elif self.op.mode == constants.NODE_EVAC_SEC:
896
      # Secondary instances only
897
      inst_fn = _GetNodeSecondaryInstances
898
    else:
899
      # All instances
900
      assert self.op.mode == constants.NODE_EVAC_ALL
901
      inst_fn = _GetNodeInstances
902
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
903
      # per instance
904
      raise errors.OpPrereqError("Due to an issue with the iallocator"
905
                                 " interface it is not possible to evacuate"
906
                                 " all instances at once; specify explicitly"
907
                                 " whether to evacuate primary or secondary"
908
                                 " instances",
909
                                 errors.ECODE_INVAL)
910

    
911
    return inst_fn(self.cfg, self.op.node_name)
912

    
913
  def DeclareLocks(self, level):
914
    if level == locking.LEVEL_INSTANCE:
915
      # Lock instances optimistically, needs verification once node and group
916
      # locks have been acquired
917
      self.needed_locks[locking.LEVEL_INSTANCE] = \
918
        set(i.name for i in self._DetermineInstances())
919

    
920
    elif level == locking.LEVEL_NODEGROUP:
921
      # Lock node groups for all potential target nodes optimistically, needs
922
      # verification once nodes have been acquired
923
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
924
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
925

    
926
    elif level == locking.LEVEL_NODE:
927
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
928

    
929
  def CheckPrereq(self):
930
    # Verify locks
931
    owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
932
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
933
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
934

    
935
    need_nodes = self._DetermineNodes()
936

    
937
    if not owned_nodes.issuperset(need_nodes):
938
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
939
                                 " locks were acquired, current nodes are"
940
                                 " are '%s', used to be '%s'; retry the"
941
                                 " operation" %
942
                                 (self.op.node_name,
943
                                  utils.CommaJoin(need_nodes),
944
                                  utils.CommaJoin(owned_nodes)),
945
                                 errors.ECODE_STATE)
946

    
947
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
948
    if owned_groups != wanted_groups:
949
      raise errors.OpExecError("Node groups changed since locks were acquired,"
950
                               " current groups are '%s', used to be '%s';"
951
                               " retry the operation" %
952
                               (utils.CommaJoin(wanted_groups),
953
                                utils.CommaJoin(owned_groups)))
954

    
955
    # Determine affected instances
956
    self.instances = self._DetermineInstances()
957
    self.instance_names = [i.name for i in self.instances]
958

    
959
    if set(self.instance_names) != owned_instances:
960
      raise errors.OpExecError("Instances on node '%s' changed since locks"
961
                               " were acquired, current instances are '%s',"
962
                               " used to be '%s'; retry the operation" %
963
                               (self.op.node_name,
964
                                utils.CommaJoin(self.instance_names),
965
                                utils.CommaJoin(owned_instances)))
966

    
967
    if self.instance_names:
968
      self.LogInfo("Evacuating instances from node '%s': %s",
969
                   self.op.node_name,
970
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
971
    else:
972
      self.LogInfo("No instances to evacuate from node '%s'",
973
                   self.op.node_name)
974

    
975
    if self.op.remote_node is not None:
976
      for i in self.instances:
977
        if i.primary_node == self.op.remote_node:
978
          raise errors.OpPrereqError("Node %s is the primary node of"
979
                                     " instance %s, cannot use it as"
980
                                     " secondary" %
981
                                     (self.op.remote_node, i.name),
982
                                     errors.ECODE_INVAL)
983

    
984
  def Exec(self, feedback_fn):
985
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
986

    
987
    if not self.instance_names:
988
      # No instances to evacuate
989
      jobs = []
990

    
991
    elif self.op.iallocator is not None:
992
      # TODO: Implement relocation to other group
993
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
994
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
995
                                     instances=list(self.instance_names))
996
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
997

    
998
      ial.Run(self.op.iallocator)
999

    
1000
      if not ial.success:
1001
        raise errors.OpPrereqError("Can't compute node evacuation using"
1002
                                   " iallocator '%s': %s" %
1003
                                   (self.op.iallocator, ial.info),
1004
                                   errors.ECODE_NORES)
1005

    
1006
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1007

    
1008
    elif self.op.remote_node is not None:
1009
      assert self.op.mode == constants.NODE_EVAC_SEC
1010
      jobs = [
1011
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1012
                                        remote_node=self.op.remote_node,
1013
                                        disks=[],
1014
                                        mode=constants.REPLACE_DISK_CHG,
1015
                                        early_release=self.op.early_release)]
1016
        for instance_name in self.instance_names]
1017

    
1018
    else:
1019
      raise errors.ProgrammerError("No iallocator or remote node")
1020

    
1021
    return ResultWithJobs(jobs)
1022

    
1023

    
1024
class LUNodeMigrate(LogicalUnit):
1025
  """Migrate all instances from a node.
1026

1027
  """
1028
  HPATH = "node-migrate"
1029
  HTYPE = constants.HTYPE_NODE
1030
  REQ_BGL = False
1031

    
1032
  def CheckArguments(self):
1033
    pass
1034

    
1035
  def ExpandNames(self):
1036
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1037

    
1038
    self.share_locks = ShareAll()
1039
    self.needed_locks = {
1040
      locking.LEVEL_NODE: [self.op.node_name],
1041
      }
1042

    
1043
  def BuildHooksEnv(self):
1044
    """Build hooks env.
1045

1046
    This runs on the master, the primary and all the secondaries.
1047

1048
    """
1049
    return {
1050
      "NODE_NAME": self.op.node_name,
1051
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1052
      }
1053

    
1054
  def BuildHooksNodes(self):
1055
    """Build hooks nodes.
1056

1057
    """
1058
    nl = [self.cfg.GetMasterNode()]
1059
    return (nl, nl)
1060

    
1061
  def CheckPrereq(self):
1062
    pass
1063

    
1064
  def Exec(self, feedback_fn):
1065
    # Prepare jobs for migration instances
1066
    allow_runtime_changes = self.op.allow_runtime_changes
1067
    jobs = [
1068
      [opcodes.OpInstanceMigrate(instance_name=inst.name,
1069
                                 mode=self.op.mode,
1070
                                 live=self.op.live,
1071
                                 iallocator=self.op.iallocator,
1072
                                 target_node=self.op.target_node,
1073
                                 allow_runtime_changes=allow_runtime_changes,
1074
                                 ignore_ipolicy=self.op.ignore_ipolicy)]
1075
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
1076

    
1077
    # TODO: Run iallocator in this opcode and pass correct placement options to
1078
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1079
    # running the iallocator and the actual migration, a good consistency model
1080
    # will have to be found.
1081

    
1082
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1083
            frozenset([self.op.node_name]))
1084

    
1085
    return ResultWithJobs(jobs)
1086

    
1087

    
1088
def _GetStorageTypeArgs(cfg, storage_type):
1089
  """Returns the arguments for a storage type.
1090

1091
  """
1092
  # Special case for file storage
1093
  if storage_type == constants.ST_FILE:
1094
    # storage.FileStorage wants a list of storage directories
1095
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1096

    
1097
  return []
1098

    
1099

    
1100
class LUNodeModifyStorage(NoHooksLU):
1101
  """Logical unit for modifying a storage volume on a node.
1102

1103
  """
1104
  REQ_BGL = False
1105

    
1106
  def CheckArguments(self):
1107
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1108

    
1109
    storage_type = self.op.storage_type
1110

    
1111
    try:
1112
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1113
    except KeyError:
1114
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1115
                                 " modified" % storage_type,
1116
                                 errors.ECODE_INVAL)
1117

    
1118
    diff = set(self.op.changes.keys()) - modifiable
1119
    if diff:
1120
      raise errors.OpPrereqError("The following fields can not be modified for"
1121
                                 " storage units of type '%s': %r" %
1122
                                 (storage_type, list(diff)),
1123
                                 errors.ECODE_INVAL)
1124

    
1125
  def ExpandNames(self):
1126
    self.needed_locks = {
1127
      locking.LEVEL_NODE: self.op.node_name,
1128
      }
1129

    
1130
  def Exec(self, feedback_fn):
1131
    """Computes the list of nodes and their attributes.
1132

1133
    """
1134
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1135
    result = self.rpc.call_storage_modify(self.op.node_name,
1136
                                          self.op.storage_type, st_args,
1137
                                          self.op.name, self.op.changes)
1138
    result.Raise("Failed to modify storage unit '%s' on %s" %
1139
                 (self.op.name, self.op.node_name))
1140

    
1141

    
1142
class NodeQuery(QueryBase):
1143
  FIELDS = query.NODE_FIELDS
1144

    
1145
  def ExpandNames(self, lu):
1146
    lu.needed_locks = {}
1147
    lu.share_locks = ShareAll()
1148

    
1149
    if self.names:
1150
      self.wanted = GetWantedNodes(lu, self.names)
1151
    else:
1152
      self.wanted = locking.ALL_SET
1153

    
1154
    self.do_locking = (self.use_locking and
1155
                       query.NQ_LIVE in self.requested_data)
1156

    
1157
    if self.do_locking:
1158
      # If any non-static field is requested we need to lock the nodes
1159
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1160
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1161

    
1162
  def DeclareLocks(self, lu, level):
1163
    pass
1164

    
1165
  def _GetQueryData(self, lu):
1166
    """Computes the list of nodes and their attributes.
1167

1168
    """
1169
    all_info = lu.cfg.GetAllNodesInfo()
1170

    
1171
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1172

    
1173
    # Gather data as requested
1174
    if query.NQ_LIVE in self.requested_data:
1175
      # filter out non-vm_capable nodes
1176
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1177

    
1178
      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1179
      # FIXME: This currently maps everything to lvm, this should be more
1180
      # flexible
1181
      vg_req = rpc.BuildVgInfoQuery(lu.cfg)
1182
      default_hypervisor = lu.cfg.GetHypervisorType()
1183
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1184
      hvspecs = [(default_hypervisor, hvparams)]
1185
      node_data = lu.rpc.call_node_info(toquery_nodes, vg_req,
1186
                                        hvspecs, es_flags)
1187
      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1188
                       for (name, nresult) in node_data.items()
1189
                       if not nresult.fail_msg and nresult.payload)
1190
    else:
1191
      live_data = None
1192

    
1193
    if query.NQ_INST in self.requested_data:
1194
      node_to_primary = dict([(name, set()) for name in nodenames])
1195
      node_to_secondary = dict([(name, set()) for name in nodenames])
1196

    
1197
      inst_data = lu.cfg.GetAllInstancesInfo()
1198

    
1199
      for inst in inst_data.values():
1200
        if inst.primary_node in node_to_primary:
1201
          node_to_primary[inst.primary_node].add(inst.name)
1202
        for secnode in inst.secondary_nodes:
1203
          if secnode in node_to_secondary:
1204
            node_to_secondary[secnode].add(inst.name)
1205
    else:
1206
      node_to_primary = None
1207
      node_to_secondary = None
1208

    
1209
    if query.NQ_OOB in self.requested_data:
1210
      oob_support = dict((name, bool(SupportsOob(lu.cfg, node)))
1211
                         for name, node in all_info.iteritems())
1212
    else:
1213
      oob_support = None
1214

    
1215
    if query.NQ_GROUP in self.requested_data:
1216
      groups = lu.cfg.GetAllNodeGroupsInfo()
1217
    else:
1218
      groups = {}
1219

    
1220
    return query.NodeQueryData([all_info[name] for name in nodenames],
1221
                               live_data, lu.cfg.GetMasterNode(),
1222
                               node_to_primary, node_to_secondary, groups,
1223
                               oob_support, lu.cfg.GetClusterInfo())
1224

    
1225

    
1226
class LUNodeQuery(NoHooksLU):
1227
  """Logical unit for querying nodes.
1228

1229
  """
1230
  # pylint: disable=W0142
1231
  REQ_BGL = False
1232

    
1233
  def CheckArguments(self):
1234
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1235
                         self.op.output_fields, self.op.use_locking)
1236

    
1237
  def ExpandNames(self):
1238
    self.nq.ExpandNames(self)
1239

    
1240
  def DeclareLocks(self, level):
1241
    self.nq.DeclareLocks(self, level)
1242

    
1243
  def Exec(self, feedback_fn):
1244
    return self.nq.OldStyleQuery(self)
1245

    
1246

    
1247
def _CheckOutputFields(static, dynamic, selected):
1248
  """Checks whether all selected fields are valid.
1249

1250
  @type static: L{utils.FieldSet}
1251
  @param static: static fields set
1252
  @type dynamic: L{utils.FieldSet}
1253
  @param dynamic: dynamic fields set
1254

1255
  """
1256
  f = utils.FieldSet()
1257
  f.Extend(static)
1258
  f.Extend(dynamic)
1259

    
1260
  delta = f.NonMatching(selected)
1261
  if delta:
1262
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1263
                               % ",".join(delta), errors.ECODE_INVAL)
1264

    
1265

    
1266
class LUNodeQueryvols(NoHooksLU):
1267
  """Logical unit for getting volumes on node(s).
1268

1269
  """
1270
  REQ_BGL = False
1271
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1272
  _FIELDS_STATIC = utils.FieldSet("node")
1273

    
1274
  def CheckArguments(self):
1275
    _CheckOutputFields(static=self._FIELDS_STATIC,
1276
                       dynamic=self._FIELDS_DYNAMIC,
1277
                       selected=self.op.output_fields)
1278

    
1279
  def ExpandNames(self):
1280
    self.share_locks = ShareAll()
1281

    
1282
    if self.op.nodes:
1283
      self.needed_locks = {
1284
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1285
        }
1286
    else:
1287
      self.needed_locks = {
1288
        locking.LEVEL_NODE: locking.ALL_SET,
1289
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1290
        }
1291

    
1292
  def Exec(self, feedback_fn):
1293
    """Computes the list of nodes and their attributes.
1294

1295
    """
1296
    nodenames = self.owned_locks(locking.LEVEL_NODE)
1297
    volumes = self.rpc.call_node_volumes(nodenames)
1298

    
1299
    ilist = self.cfg.GetAllInstancesInfo()
1300
    vol2inst = MapInstanceDisksToNodes(ilist.values())
1301

    
1302
    output = []
1303
    for node in nodenames:
1304
      nresult = volumes[node]
1305
      if nresult.offline:
1306
        continue
1307
      msg = nresult.fail_msg
1308
      if msg:
1309
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1310
        continue
1311

    
1312
      node_vols = sorted(nresult.payload,
1313
                         key=operator.itemgetter("dev"))
1314

    
1315
      for vol in node_vols:
1316
        node_output = []
1317
        for field in self.op.output_fields:
1318
          if field == "node":
1319
            val = node
1320
          elif field == "phys":
1321
            val = vol["dev"]
1322
          elif field == "vg":
1323
            val = vol["vg"]
1324
          elif field == "name":
1325
            val = vol["name"]
1326
          elif field == "size":
1327
            val = int(float(vol["size"]))
1328
          elif field == "instance":
1329
            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1330
          else:
1331
            raise errors.ParameterError(field)
1332
          node_output.append(str(val))
1333

    
1334
        output.append(node_output)
1335

    
1336
    return output
1337

    
1338

    
1339
class LUNodeQueryStorage(NoHooksLU):
1340
  """Logical unit for getting information on storage units on node(s).
1341

1342
  """
1343
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1344
  REQ_BGL = False
1345

    
1346
  def CheckArguments(self):
1347
    _CheckOutputFields(static=self._FIELDS_STATIC,
1348
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1349
                       selected=self.op.output_fields)
1350

    
1351
  def ExpandNames(self):
1352
    self.share_locks = ShareAll()
1353

    
1354
    if self.op.nodes:
1355
      self.needed_locks = {
1356
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1357
        }
1358
    else:
1359
      self.needed_locks = {
1360
        locking.LEVEL_NODE: locking.ALL_SET,
1361
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1362
        }
1363

    
1364
  def Exec(self, feedback_fn):
1365
    """Computes the list of nodes and their attributes.
1366

1367
    """
1368
    self.nodes = self.owned_locks(locking.LEVEL_NODE)
1369

    
1370
    # Always get name to sort by
1371
    if constants.SF_NAME in self.op.output_fields:
1372
      fields = self.op.output_fields[:]
1373
    else:
1374
      fields = [constants.SF_NAME] + self.op.output_fields
1375

    
1376
    # Never ask for node or type as it's only known to the LU
1377
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1378
      while extra in fields:
1379
        fields.remove(extra)
1380

    
1381
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1382
    name_idx = field_idx[constants.SF_NAME]
1383

    
1384
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1385
    data = self.rpc.call_storage_list(self.nodes,
1386
                                      self.op.storage_type, st_args,
1387
                                      self.op.name, fields)
1388

    
1389
    result = []
1390

    
1391
    for node in utils.NiceSort(self.nodes):
1392
      nresult = data[node]
1393
      if nresult.offline:
1394
        continue
1395

    
1396
      msg = nresult.fail_msg
1397
      if msg:
1398
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1399
        continue
1400

    
1401
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1402

    
1403
      for name in utils.NiceSort(rows.keys()):
1404
        row = rows[name]
1405

    
1406
        out = []
1407

    
1408
        for field in self.op.output_fields:
1409
          if field == constants.SF_NODE:
1410
            val = node
1411
          elif field == constants.SF_TYPE:
1412
            val = self.op.storage_type
1413
          elif field in field_idx:
1414
            val = row[field_idx[field]]
1415
          else:
1416
            raise errors.ParameterError(field)
1417

    
1418
          out.append(val)
1419

    
1420
        result.append(out)
1421

    
1422
    return result
1423

    
1424

    
1425
class LUNodeRemove(LogicalUnit):
1426
  """Logical unit for removing a node.
1427

1428
  """
1429
  HPATH = "node-remove"
1430
  HTYPE = constants.HTYPE_NODE
1431

    
1432
  def BuildHooksEnv(self):
1433
    """Build hooks env.
1434

1435
    """
1436
    return {
1437
      "OP_TARGET": self.op.node_name,
1438
      "NODE_NAME": self.op.node_name,
1439
      }
1440

    
1441
  def BuildHooksNodes(self):
1442
    """Build hooks nodes.
1443

1444
    This doesn't run on the target node in the pre phase as a failed
1445
    node would then be impossible to remove.
1446

1447
    """
1448
    all_nodes = self.cfg.GetNodeList()
1449
    try:
1450
      all_nodes.remove(self.op.node_name)
1451
    except ValueError:
1452
      pass
1453
    return (all_nodes, all_nodes)
1454

    
1455
  def CheckPrereq(self):
1456
    """Check prerequisites.
1457

1458
    This checks:
1459
     - the node exists in the configuration
1460
     - it does not have primary or secondary instances
1461
     - it's not the master
1462

1463
    Any errors are signaled by raising errors.OpPrereqError.
1464

1465
    """
1466
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1467
    node = self.cfg.GetNodeInfo(self.op.node_name)
1468
    assert node is not None
1469

    
1470
    masternode = self.cfg.GetMasterNode()
1471
    if node.name == masternode:
1472
      raise errors.OpPrereqError("Node is the master node, failover to another"
1473
                                 " node is required", errors.ECODE_INVAL)
1474

    
1475
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1476
      if node.name in instance.all_nodes:
1477
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1478
                                   " please remove first" % instance_name,
1479
                                   errors.ECODE_INVAL)
1480
    self.op.node_name = node.name
1481
    self.node = node
1482

    
1483
  def Exec(self, feedback_fn):
1484
    """Removes the node from the cluster.
1485

1486
    """
1487
    node = self.node
1488
    logging.info("Stopping the node daemon and removing configs from node %s",
1489
                 node.name)
1490

    
1491
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1492

    
1493
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1494
      "Not owning BGL"
1495

    
1496
    # Promote nodes to master candidate as needed
1497
    AdjustCandidatePool(self, exceptions=[node.name])
1498
    self.context.RemoveNode(node.name)
1499

    
1500
    # Run post hooks on the node before it's removed
1501
    RunPostHook(self, node.name)
1502

    
1503
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1504
    msg = result.fail_msg
1505
    if msg:
1506
      self.LogWarning("Errors encountered on the remote node while leaving"
1507
                      " the cluster: %s", msg)
1508

    
1509
    # Remove node from our /etc/hosts
1510
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1511
      master_node = self.cfg.GetMasterNode()
1512
      result = self.rpc.call_etc_hosts_modify(master_node,
1513
                                              constants.ETC_HOSTS_REMOVE,
1514
                                              node.name, None)
1515
      result.Raise("Can't update hosts file with new host data")
1516
      RedistributeAncillaryFiles(self)
1517

    
1518

    
1519
class LURepairNodeStorage(NoHooksLU):
1520
  """Repairs the volume group on a node.
1521

1522
  """
1523
  REQ_BGL = False
1524

    
1525
  def CheckArguments(self):
1526
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1527

    
1528
    storage_type = self.op.storage_type
1529

    
1530
    if (constants.SO_FIX_CONSISTENCY not in
1531
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1532
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1533
                                 " repaired" % storage_type,
1534
                                 errors.ECODE_INVAL)
1535

    
1536
  def ExpandNames(self):
1537
    self.needed_locks = {
1538
      locking.LEVEL_NODE: [self.op.node_name],
1539
      }
1540

    
1541
  def _CheckFaultyDisks(self, instance, node_name):
1542
    """Ensure faulty disks abort the opcode or at least warn."""
1543
    try:
1544
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1545
                                 node_name, True):
1546
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1547
                                   " node '%s'" % (instance.name, node_name),
1548
                                   errors.ECODE_STATE)
1549
    except errors.OpPrereqError, err:
1550
      if self.op.ignore_consistency:
1551
        self.LogWarning(str(err.args[0]))
1552
      else:
1553
        raise
1554

    
1555
  def CheckPrereq(self):
1556
    """Check prerequisites.
1557

1558
    """
1559
    # Check whether any instance on this node has faulty disks
1560
    for inst in _GetNodeInstances(self.cfg, self.op.node_name):
1561
      if not inst.disks_active:
1562
        continue
1563
      check_nodes = set(inst.all_nodes)
1564
      check_nodes.discard(self.op.node_name)
1565
      for inst_node_name in check_nodes:
1566
        self._CheckFaultyDisks(inst, inst_node_name)
1567

    
1568
  def Exec(self, feedback_fn):
1569
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1570
                (self.op.name, self.op.node_name))
1571

    
1572
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1573
    result = self.rpc.call_storage_execute(self.op.node_name,
1574
                                           self.op.storage_type, st_args,
1575
                                           self.op.name,
1576
                                           constants.SO_FIX_CONSISTENCY)
1577
    result.Raise("Failed to repair storage unit '%s' on %s" %
1578
                 (self.op.name, self.op.node_name))