Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ c7dd65be

History | View | Annotate | Download (56.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: string
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
74
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75

76
  """
77
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
78
  result.Raise("Failure checking secondary ip on node %s" % node,
79
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
80
  if not result.payload:
81
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82
           " please fix and re-run this command" % secondary_ip)
83
    if prereq:
84
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85
    else:
86
      raise errors.OpExecError(msg)
87

    
88

    
89
class LUNodeAdd(LogicalUnit):
90
  """Logical unit for adding node to the cluster.
91

92
  """
93
  HPATH = "node-add"
94
  HTYPE = constants.HTYPE_NODE
95
  _NFLAGS = ["master_capable", "vm_capable"]
96

    
97
  def CheckArguments(self):
98
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99
    # validate/normalize the node name
100
    self.hostname = netutils.GetHostname(name=self.op.node_name,
101
                                         family=self.primary_ip_family)
102
    self.op.node_name = self.hostname.name
103

    
104
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
105
      raise errors.OpPrereqError("Cannot readd the master node",
106
                                 errors.ECODE_STATE)
107

    
108
    if self.op.readd and self.op.group:
109
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
110
                                 " being readded", errors.ECODE_INVAL)
111

    
112
  def BuildHooksEnv(self):
113
    """Build hooks env.
114

115
    This will run on all nodes before, and on all nodes + the new node after.
116

117
    """
118
    return {
119
      "OP_TARGET": self.op.node_name,
120
      "NODE_NAME": self.op.node_name,
121
      "NODE_PIP": self.op.primary_ip,
122
      "NODE_SIP": self.op.secondary_ip,
123
      "MASTER_CAPABLE": str(self.op.master_capable),
124
      "VM_CAPABLE": str(self.op.vm_capable),
125
      }
126

    
127
  def BuildHooksNodes(self):
128
    """Build hooks nodes.
129

130
    """
131
    # Exclude added node
132
    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
133
    post_nodes = pre_nodes + [self.op.node_name, ]
134

    
135
    return (pre_nodes, post_nodes)
136

    
137
  def CheckPrereq(self):
138
    """Check prerequisites.
139

140
    This checks:
141
     - the new node is not already in the config
142
     - it is resolvable
143
     - its parameters (single/dual homed) matches the cluster
144

145
    Any errors are signaled by raising errors.OpPrereqError.
146

147
    """
148
    cfg = self.cfg
149
    hostname = self.hostname
150
    node = hostname.name
151
    primary_ip = self.op.primary_ip = hostname.ip
152
    if self.op.secondary_ip is None:
153
      if self.primary_ip_family == netutils.IP6Address.family:
154
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
155
                                   " IPv4 address must be given as secondary",
156
                                   errors.ECODE_INVAL)
157
      self.op.secondary_ip = primary_ip
158

    
159
    secondary_ip = self.op.secondary_ip
160
    if not netutils.IP4Address.IsValid(secondary_ip):
161
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
162
                                 " address" % secondary_ip, errors.ECODE_INVAL)
163

    
164
    node_list = cfg.GetNodeList()
165
    if not self.op.readd and node in node_list:
166
      raise errors.OpPrereqError("Node %s is already in the configuration" %
167
                                 node, errors.ECODE_EXISTS)
168
    elif self.op.readd and node not in node_list:
169
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
170
                                 errors.ECODE_NOENT)
171

    
172
    self.changed_primary_ip = False
173

    
174
    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
175
      if self.op.readd and node == existing_node_name:
176
        if existing_node.secondary_ip != secondary_ip:
177
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
178
                                     " address configuration as before",
179
                                     errors.ECODE_INVAL)
180
        if existing_node.primary_ip != primary_ip:
181
          self.changed_primary_ip = True
182

    
183
        continue
184

    
185
      if (existing_node.primary_ip == primary_ip or
186
          existing_node.secondary_ip == primary_ip or
187
          existing_node.primary_ip == secondary_ip or
188
          existing_node.secondary_ip == secondary_ip):
189
        raise errors.OpPrereqError("New node ip address(es) conflict with"
190
                                   " existing node %s" % existing_node.name,
191
                                   errors.ECODE_NOTUNIQUE)
192

    
193
    # After this 'if' block, None is no longer a valid value for the
194
    # _capable op attributes
195
    if self.op.readd:
196
      old_node = self.cfg.GetNodeInfo(node)
197
      assert old_node is not None, "Can't retrieve locked node %s" % node
198
      for attr in self._NFLAGS:
199
        if getattr(self.op, attr) is None:
200
          setattr(self.op, attr, getattr(old_node, attr))
201
    else:
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, True)
205

    
206
    if self.op.readd and not self.op.vm_capable:
207
      pri, sec = cfg.GetNodeInstances(node)
208
      if pri or sec:
209
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
210
                                   " flag set to false, but it already holds"
211
                                   " instances" % node,
212
                                   errors.ECODE_STATE)
213

    
214
    # check that the type of the node (single versus dual homed) is the
215
    # same as for the master
216
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
217
    master_singlehomed = myself.secondary_ip == myself.primary_ip
218
    newbie_singlehomed = secondary_ip == primary_ip
219
    if master_singlehomed != newbie_singlehomed:
220
      if master_singlehomed:
221
        raise errors.OpPrereqError("The master has no secondary ip but the"
222
                                   " new node has one",
223
                                   errors.ECODE_INVAL)
224
      else:
225
        raise errors.OpPrereqError("The master has a secondary ip but the"
226
                                   " new node doesn't have one",
227
                                   errors.ECODE_INVAL)
228

    
229
    # checks reachability
230
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
231
      raise errors.OpPrereqError("Node not reachable by ping",
232
                                 errors.ECODE_ENVIRON)
233

    
234
    if not newbie_singlehomed:
235
      # check reachability from my secondary ip to newbie's secondary ip
236
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
237
                              source=myself.secondary_ip):
238
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
239
                                   " based ping to node daemon port",
240
                                   errors.ECODE_ENVIRON)
241

    
242
    if self.op.readd:
243
      exceptions = [node]
244
    else:
245
      exceptions = []
246

    
247
    if self.op.master_capable:
248
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
249
    else:
250
      self.master_candidate = False
251

    
252
    if self.op.readd:
253
      self.new_node = old_node
254
    else:
255
      node_group = cfg.LookupNodeGroup(self.op.group)
256
      self.new_node = objects.Node(name=node,
257
                                   primary_ip=primary_ip,
258
                                   secondary_ip=secondary_ip,
259
                                   master_candidate=self.master_candidate,
260
                                   offline=False, drained=False,
261
                                   group=node_group, ndparams={})
262

    
263
    if self.op.ndparams:
264
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
265
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
266
                           "node", "cluster or group")
267

    
268
    if self.op.hv_state:
269
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
270

    
271
    if self.op.disk_state:
272
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
273

    
274
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
275
    #       it a property on the base class.
276
    rpcrunner = rpc.DnsOnlyRunner()
277
    result = rpcrunner.call_version([node])[node]
278
    result.Raise("Can't get version information from node %s" % node)
279
    if constants.PROTOCOL_VERSION == result.payload:
280
      logging.info("Communication to node %s fine, sw version %s match",
281
                   node, result.payload)
282
    else:
283
      raise errors.OpPrereqError("Version mismatch master version %s,"
284
                                 " node version %s" %
285
                                 (constants.PROTOCOL_VERSION, result.payload),
286
                                 errors.ECODE_ENVIRON)
287

    
288
    vg_name = cfg.GetVGName()
289
    if vg_name is not None:
290
      vparams = {constants.NV_PVLIST: [vg_name]}
291
      excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
292
      cname = self.cfg.GetClusterName()
293
      result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
294
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
295
      if errmsgs:
296
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
297
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
298

    
299
  def Exec(self, feedback_fn):
300
    """Adds the new node to the cluster.
301

302
    """
303
    new_node = self.new_node
304
    node = new_node.name
305

    
306
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
307
      "Not owning BGL"
308

    
309
    # We adding a new node so we assume it's powered
310
    new_node.powered = True
311

    
312
    # for re-adds, reset the offline/drained/master-candidate flags;
313
    # we need to reset here, otherwise offline would prevent RPC calls
314
    # later in the procedure; this also means that if the re-add
315
    # fails, we are left with a non-offlined, broken node
316
    if self.op.readd:
317
      new_node.drained = new_node.offline = False # pylint: disable=W0201
318
      self.LogInfo("Readding a node, the offline/drained flags were reset")
319
      # if we demote the node, we do cleanup later in the procedure
320
      new_node.master_candidate = self.master_candidate
321
      if self.changed_primary_ip:
322
        new_node.primary_ip = self.op.primary_ip
323

    
324
    # copy the master/vm_capable flags
325
    for attr in self._NFLAGS:
326
      setattr(new_node, attr, getattr(self.op, attr))
327

    
328
    # notify the user about any possible mc promotion
329
    if new_node.master_candidate:
330
      self.LogInfo("Node will be a master candidate")
331

    
332
    if self.op.ndparams:
333
      new_node.ndparams = self.op.ndparams
334
    else:
335
      new_node.ndparams = {}
336

    
337
    if self.op.hv_state:
338
      new_node.hv_state_static = self.new_hv_state
339

    
340
    if self.op.disk_state:
341
      new_node.disk_state_static = self.new_disk_state
342

    
343
    # Add node to our /etc/hosts, and add key to known_hosts
344
    if self.cfg.GetClusterInfo().modify_etc_hosts:
345
      master_node = self.cfg.GetMasterNode()
346
      result = self.rpc.call_etc_hosts_modify(master_node,
347
                                              constants.ETC_HOSTS_ADD,
348
                                              self.hostname.name,
349
                                              self.hostname.ip)
350
      result.Raise("Can't update hosts file with new host data")
351

    
352
    if new_node.secondary_ip != new_node.primary_ip:
353
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
354
                               False)
355

    
356
    node_verify_list = [self.cfg.GetMasterNode()]
357
    node_verify_param = {
358
      constants.NV_NODELIST: ([node], {}),
359
      # TODO: do a node-net-test as well?
360
    }
361

    
362
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
363
                                       self.cfg.GetClusterName())
364
    for verifier in node_verify_list:
365
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
366
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
367
      if nl_payload:
368
        for failed in nl_payload:
369
          feedback_fn("ssh/hostname verification failed"
370
                      " (checking from %s): %s" %
371
                      (verifier, nl_payload[failed]))
372
        raise errors.OpExecError("ssh/hostname verification failed")
373

    
374
    if self.op.readd:
375
      RedistributeAncillaryFiles(self)
376
      self.context.ReaddNode(new_node)
377
      # make sure we redistribute the config
378
      self.cfg.Update(new_node, feedback_fn)
379
      # and make sure the new node will not have old files around
380
      if not new_node.master_candidate:
381
        result = self.rpc.call_node_demote_from_mc(new_node.name)
382
        result.Warn("Node failed to demote itself from master candidate status",
383
                    self.LogWarning)
384
    else:
385
      RedistributeAncillaryFiles(self, additional_nodes=[node],
386
                                 additional_vm=self.op.vm_capable)
387
      self.context.AddNode(new_node, self.proc.GetECId())
388

    
389

    
390
class LUNodeSetParams(LogicalUnit):
391
  """Modifies the parameters of a node.
392

393
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
394
      to the node role (as _ROLE_*)
395
  @cvar _R2F: a dictionary from node role to tuples of flags
396
  @cvar _FLAGS: a list of attribute names corresponding to the flags
397

398
  """
399
  HPATH = "node-modify"
400
  HTYPE = constants.HTYPE_NODE
401
  REQ_BGL = False
402
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
403
  _F2R = {
404
    (True, False, False): _ROLE_CANDIDATE,
405
    (False, True, False): _ROLE_DRAINED,
406
    (False, False, True): _ROLE_OFFLINE,
407
    (False, False, False): _ROLE_REGULAR,
408
    }
409
  _R2F = dict((v, k) for k, v in _F2R.items())
410
  _FLAGS = ["master_candidate", "drained", "offline"]
411

    
412
  def CheckArguments(self):
413
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
414
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
415
                self.op.master_capable, self.op.vm_capable,
416
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
417
                self.op.disk_state]
418
    if all_mods.count(None) == len(all_mods):
419
      raise errors.OpPrereqError("Please pass at least one modification",
420
                                 errors.ECODE_INVAL)
421
    if all_mods.count(True) > 1:
422
      raise errors.OpPrereqError("Can't set the node into more than one"
423
                                 " state at the same time",
424
                                 errors.ECODE_INVAL)
425

    
426
    # Boolean value that tells us whether we might be demoting from MC
427
    self.might_demote = (self.op.master_candidate is False or
428
                         self.op.offline is True or
429
                         self.op.drained is True or
430
                         self.op.master_capable is False)
431

    
432
    if self.op.secondary_ip:
433
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
434
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
435
                                   " address" % self.op.secondary_ip,
436
                                   errors.ECODE_INVAL)
437

    
438
    self.lock_all = self.op.auto_promote and self.might_demote
439
    self.lock_instances = self.op.secondary_ip is not None
440

    
441
  def _InstanceFilter(self, instance):
442
    """Filter for getting affected instances.
443

444
    """
445
    return (instance.disk_template in constants.DTS_INT_MIRROR and
446
            self.op.node_name in instance.all_nodes)
447

    
448
  def ExpandNames(self):
449
    if self.lock_all:
450
      self.needed_locks = {
451
        locking.LEVEL_NODE: locking.ALL_SET,
452

    
453
        # Block allocations when all nodes are locked
454
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
455
        }
456
    else:
457
      self.needed_locks = {
458
        locking.LEVEL_NODE: self.op.node_name,
459
        }
460

    
461
    # Since modifying a node can have severe effects on currently running
462
    # operations the resource lock is at least acquired in shared mode
463
    self.needed_locks[locking.LEVEL_NODE_RES] = \
464
      self.needed_locks[locking.LEVEL_NODE]
465

    
466
    # Get all locks except nodes in shared mode; they are not used for anything
467
    # but read-only access
468
    self.share_locks = ShareAll()
469
    self.share_locks[locking.LEVEL_NODE] = 0
470
    self.share_locks[locking.LEVEL_NODE_RES] = 0
471
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
472

    
473
    if self.lock_instances:
474
      self.needed_locks[locking.LEVEL_INSTANCE] = \
475
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
476

    
477
  def BuildHooksEnv(self):
478
    """Build hooks env.
479

480
    This runs on the master node.
481

482
    """
483
    return {
484
      "OP_TARGET": self.op.node_name,
485
      "MASTER_CANDIDATE": str(self.op.master_candidate),
486
      "OFFLINE": str(self.op.offline),
487
      "DRAINED": str(self.op.drained),
488
      "MASTER_CAPABLE": str(self.op.master_capable),
489
      "VM_CAPABLE": str(self.op.vm_capable),
490
      }
491

    
492
  def BuildHooksNodes(self):
493
    """Build hooks nodes.
494

495
    """
496
    nl = [self.cfg.GetMasterNode(), self.op.node_name]
497
    return (nl, nl)
498

    
499
  def CheckPrereq(self):
500
    """Check prerequisites.
501

502
    This only checks the instance list against the existing names.
503

504
    """
505
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
506

    
507
    if self.lock_instances:
508
      affected_instances = \
509
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
510

    
511
      # Verify instance locks
512
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
513
      wanted_instances = frozenset(affected_instances.keys())
514
      if wanted_instances - owned_instances:
515
        raise errors.OpPrereqError("Instances affected by changing node %s's"
516
                                   " secondary IP address have changed since"
517
                                   " locks were acquired, wanted '%s', have"
518
                                   " '%s'; retry the operation" %
519
                                   (self.op.node_name,
520
                                    utils.CommaJoin(wanted_instances),
521
                                    utils.CommaJoin(owned_instances)),
522
                                   errors.ECODE_STATE)
523
    else:
524
      affected_instances = None
525

    
526
    if (self.op.master_candidate is not None or
527
        self.op.drained is not None or
528
        self.op.offline is not None):
529
      # we can't change the master's node flags
530
      if self.op.node_name == self.cfg.GetMasterNode():
531
        raise errors.OpPrereqError("The master role can be changed"
532
                                   " only via master-failover",
533
                                   errors.ECODE_INVAL)
534

    
535
    if self.op.master_candidate and not node.master_capable:
536
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
537
                                 " it a master candidate" % node.name,
538
                                 errors.ECODE_STATE)
539

    
540
    if self.op.vm_capable is False:
541
      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
542
      if ipri or isec:
543
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
544
                                   " the vm_capable flag" % node.name,
545
                                   errors.ECODE_STATE)
546

    
547
    if node.master_candidate and self.might_demote and not self.lock_all:
548
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
549
      # check if after removing the current node, we're missing master
550
      # candidates
551
      (mc_remaining, mc_should, _) = \
552
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
553
      if mc_remaining < mc_should:
554
        raise errors.OpPrereqError("Not enough master candidates, please"
555
                                   " pass auto promote option to allow"
556
                                   " promotion (--auto-promote or RAPI"
557
                                   " auto_promote=True)", errors.ECODE_STATE)
558

    
559
    self.old_flags = old_flags = (node.master_candidate,
560
                                  node.drained, node.offline)
561
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
562
    self.old_role = old_role = self._F2R[old_flags]
563

    
564
    # Check for ineffective changes
565
    for attr in self._FLAGS:
566
      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
567
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
568
        setattr(self.op, attr, None)
569

    
570
    # Past this point, any flag change to False means a transition
571
    # away from the respective state, as only real changes are kept
572

    
573
    # TODO: We might query the real power state if it supports OOB
574
    if SupportsOob(self.cfg, node):
575
      if self.op.offline is False and not (node.powered or
576
                                           self.op.powered is True):
577
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
578
                                    " offline status can be reset") %
579
                                   self.op.node_name, errors.ECODE_STATE)
580
    elif self.op.powered is not None:
581
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
582
                                  " as it does not support out-of-band"
583
                                  " handling") % self.op.node_name,
584
                                 errors.ECODE_STATE)
585

    
586
    # If we're being deofflined/drained, we'll MC ourself if needed
587
    if (self.op.drained is False or self.op.offline is False or
588
        (self.op.master_capable and not node.master_capable)):
589
      if _DecideSelfPromotion(self):
590
        self.op.master_candidate = True
591
        self.LogInfo("Auto-promoting node to master candidate")
592

    
593
    # If we're no longer master capable, we'll demote ourselves from MC
594
    if self.op.master_capable is False and node.master_candidate:
595
      self.LogInfo("Demoting from master candidate")
596
      self.op.master_candidate = False
597

    
598
    # Compute new role
599
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
600
    if self.op.master_candidate:
601
      new_role = self._ROLE_CANDIDATE
602
    elif self.op.drained:
603
      new_role = self._ROLE_DRAINED
604
    elif self.op.offline:
605
      new_role = self._ROLE_OFFLINE
606
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
607
      # False is still in new flags, which means we're un-setting (the
608
      # only) True flag
609
      new_role = self._ROLE_REGULAR
610
    else: # no new flags, nothing, keep old role
611
      new_role = old_role
612

    
613
    self.new_role = new_role
614

    
615
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
616
      # Trying to transition out of offline status
617
      result = self.rpc.call_version([node.name])[node.name]
618
      if result.fail_msg:
619
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
620
                                   " to report its version: %s" %
621
                                   (node.name, result.fail_msg),
622
                                   errors.ECODE_STATE)
623
      else:
624
        self.LogWarning("Transitioning node from offline to online state"
625
                        " without using re-add. Please make sure the node"
626
                        " is healthy!")
627

    
628
    # When changing the secondary ip, verify if this is a single-homed to
629
    # multi-homed transition or vice versa, and apply the relevant
630
    # restrictions.
631
    if self.op.secondary_ip:
632
      # Ok even without locking, because this can't be changed by any LU
633
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
634
      master_singlehomed = master.secondary_ip == master.primary_ip
635
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
636
        if self.op.force and node.name == master.name:
637
          self.LogWarning("Transitioning from single-homed to multi-homed"
638
                          " cluster; all nodes will require a secondary IP"
639
                          " address")
640
        else:
641
          raise errors.OpPrereqError("Changing the secondary ip on a"
642
                                     " single-homed cluster requires the"
643
                                     " --force option to be passed, and the"
644
                                     " target node to be the master",
645
                                     errors.ECODE_INVAL)
646
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
647
        if self.op.force and node.name == master.name:
648
          self.LogWarning("Transitioning from multi-homed to single-homed"
649
                          " cluster; secondary IP addresses will have to be"
650
                          " removed")
651
        else:
652
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
653
                                     " same as the primary IP on a multi-homed"
654
                                     " cluster, unless the --force option is"
655
                                     " passed, and the target node is the"
656
                                     " master", errors.ECODE_INVAL)
657

    
658
      assert not (frozenset(affected_instances) -
659
                  self.owned_locks(locking.LEVEL_INSTANCE))
660

    
661
      if node.offline:
662
        if affected_instances:
663
          msg = ("Cannot change secondary IP address: offline node has"
664
                 " instances (%s) configured to use it" %
665
                 utils.CommaJoin(affected_instances.keys()))
666
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
667
      else:
668
        # On online nodes, check that no instances are running, and that
669
        # the node has the new ip and we can reach it.
670
        for instance in affected_instances.values():
671
          CheckInstanceState(self, instance, INSTANCE_DOWN,
672
                             msg="cannot change secondary ip")
673

    
674
        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
675
        if master.name != node.name:
676
          # check reachability from master secondary ip to new secondary ip
677
          if not netutils.TcpPing(self.op.secondary_ip,
678
                                  constants.DEFAULT_NODED_PORT,
679
                                  source=master.secondary_ip):
680
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
681
                                       " based ping to node daemon port",
682
                                       errors.ECODE_ENVIRON)
683

    
684
    if self.op.ndparams:
685
      new_ndparams = GetUpdatedParams(self.node.ndparams, self.op.ndparams)
686
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
687
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
688
                           "node", "cluster or group")
689
      self.new_ndparams = new_ndparams
690

    
691
    if self.op.hv_state:
692
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
693
                                                self.node.hv_state_static)
694

    
695
    if self.op.disk_state:
696
      self.new_disk_state = \
697
        MergeAndVerifyDiskState(self.op.disk_state,
698
                                self.node.disk_state_static)
699

    
700
  def Exec(self, feedback_fn):
701
    """Modifies a node.
702

703
    """
704
    node = self.node
705
    old_role = self.old_role
706
    new_role = self.new_role
707

    
708
    result = []
709

    
710
    if self.op.ndparams:
711
      node.ndparams = self.new_ndparams
712

    
713
    if self.op.powered is not None:
714
      node.powered = self.op.powered
715

    
716
    if self.op.hv_state:
717
      node.hv_state_static = self.new_hv_state
718

    
719
    if self.op.disk_state:
720
      node.disk_state_static = self.new_disk_state
721

    
722
    for attr in ["master_capable", "vm_capable"]:
723
      val = getattr(self.op, attr)
724
      if val is not None:
725
        setattr(node, attr, val)
726
        result.append((attr, str(val)))
727

    
728
    if new_role != old_role:
729
      # Tell the node to demote itself, if no longer MC and not offline
730
      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
731
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
732
        if msg:
733
          self.LogWarning("Node failed to demote itself: %s", msg)
734

    
735
      new_flags = self._R2F[new_role]
736
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
737
        if of != nf:
738
          result.append((desc, str(nf)))
739
      (node.master_candidate, node.drained, node.offline) = new_flags
740

    
741
      # we locked all nodes, we adjust the CP before updating this node
742
      if self.lock_all:
743
        AdjustCandidatePool(self, [node.name])
744

    
745
    if self.op.secondary_ip:
746
      node.secondary_ip = self.op.secondary_ip
747
      result.append(("secondary_ip", self.op.secondary_ip))
748

    
749
    # this will trigger configuration file update, if needed
750
    self.cfg.Update(node, feedback_fn)
751

    
752
    # this will trigger job queue propagation or cleanup if the mc
753
    # flag changed
754
    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
755
      self.context.ReaddNode(node)
756

    
757
    return result
758

    
759

    
760
class LUNodePowercycle(NoHooksLU):
761
  """Powercycles a node.
762

763
  """
764
  REQ_BGL = False
765

    
766
  def CheckArguments(self):
767
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
768
    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
769
      raise errors.OpPrereqError("The node is the master and the force"
770
                                 " parameter was not set",
771
                                 errors.ECODE_INVAL)
772

    
773
  def ExpandNames(self):
774
    """Locking for PowercycleNode.
775

776
    This is a last-resort option and shouldn't block on other
777
    jobs. Therefore, we grab no locks.
778

779
    """
780
    self.needed_locks = {}
781

    
782
  def Exec(self, feedback_fn):
783
    """Reboots a node.
784

785
    """
786
    result = self.rpc.call_node_powercycle(self.op.node_name,
787
                                           self.cfg.GetHypervisorType())
788
    result.Raise("Failed to schedule the reboot")
789
    return result.payload
790

    
791

    
792
def _GetNodeInstancesInner(cfg, fn):
793
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
794

    
795

    
796
def _GetNodePrimaryInstances(cfg, node_name):
797
  """Returns primary instances on a node.
798

799
  """
800
  return _GetNodeInstancesInner(cfg,
801
                                lambda inst: node_name == inst.primary_node)
802

    
803

    
804
def _GetNodeSecondaryInstances(cfg, node_name):
805
  """Returns secondary instances on a node.
806

807
  """
808
  return _GetNodeInstancesInner(cfg,
809
                                lambda inst: node_name in inst.secondary_nodes)
810

    
811

    
812
def _GetNodeInstances(cfg, node_name):
813
  """Returns a list of all primary and secondary instances on a node.
814

815
  """
816

    
817
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
818

    
819

    
820
class LUNodeEvacuate(NoHooksLU):
821
  """Evacuates instances off a list of nodes.
822

823
  """
824
  REQ_BGL = False
825

    
826
  _MODE2IALLOCATOR = {
827
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
828
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
829
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
830
    }
831
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
832
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
833
          constants.IALLOCATOR_NEVAC_MODES)
834

    
835
  def CheckArguments(self):
836
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
837

    
838
  def ExpandNames(self):
839
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
840

    
841
    if self.op.remote_node is not None:
842
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
843
      assert self.op.remote_node
844

    
845
      if self.op.remote_node == self.op.node_name:
846
        raise errors.OpPrereqError("Can not use evacuated node as a new"
847
                                   " secondary node", errors.ECODE_INVAL)
848

    
849
      if self.op.mode != constants.NODE_EVAC_SEC:
850
        raise errors.OpPrereqError("Without the use of an iallocator only"
851
                                   " secondary instances can be evacuated",
852
                                   errors.ECODE_INVAL)
853

    
854
    # Declare locks
855
    self.share_locks = ShareAll()
856
    self.needed_locks = {
857
      locking.LEVEL_INSTANCE: [],
858
      locking.LEVEL_NODEGROUP: [],
859
      locking.LEVEL_NODE: [],
860
      }
861

    
862
    # Determine nodes (via group) optimistically, needs verification once locks
863
    # have been acquired
864
    self.lock_nodes = self._DetermineNodes()
865

    
866
  def _DetermineNodes(self):
867
    """Gets the list of nodes to operate on.
868

869
    """
870
    if self.op.remote_node is None:
871
      # Iallocator will choose any node(s) in the same group
872
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
873
    else:
874
      group_nodes = frozenset([self.op.remote_node])
875

    
876
    # Determine nodes to be locked
877
    return set([self.op.node_name]) | group_nodes
878

    
879
  def _DetermineInstances(self):
880
    """Builds list of instances to operate on.
881

882
    """
883
    assert self.op.mode in constants.NODE_EVAC_MODES
884

    
885
    if self.op.mode == constants.NODE_EVAC_PRI:
886
      # Primary instances only
887
      inst_fn = _GetNodePrimaryInstances
888
      assert self.op.remote_node is None, \
889
        "Evacuating primary instances requires iallocator"
890
    elif self.op.mode == constants.NODE_EVAC_SEC:
891
      # Secondary instances only
892
      inst_fn = _GetNodeSecondaryInstances
893
    else:
894
      # All instances
895
      assert self.op.mode == constants.NODE_EVAC_ALL
896
      inst_fn = _GetNodeInstances
897
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
898
      # per instance
899
      raise errors.OpPrereqError("Due to an issue with the iallocator"
900
                                 " interface it is not possible to evacuate"
901
                                 " all instances at once; specify explicitly"
902
                                 " whether to evacuate primary or secondary"
903
                                 " instances",
904
                                 errors.ECODE_INVAL)
905

    
906
    return inst_fn(self.cfg, self.op.node_name)
907

    
908
  def DeclareLocks(self, level):
909
    if level == locking.LEVEL_INSTANCE:
910
      # Lock instances optimistically, needs verification once node and group
911
      # locks have been acquired
912
      self.needed_locks[locking.LEVEL_INSTANCE] = \
913
        set(i.name for i in self._DetermineInstances())
914

    
915
    elif level == locking.LEVEL_NODEGROUP:
916
      # Lock node groups for all potential target nodes optimistically, needs
917
      # verification once nodes have been acquired
918
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
919
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
920

    
921
    elif level == locking.LEVEL_NODE:
922
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
923

    
924
  def CheckPrereq(self):
925
    # Verify locks
926
    owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
927
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
928
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
929

    
930
    need_nodes = self._DetermineNodes()
931

    
932
    if not owned_nodes.issuperset(need_nodes):
933
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
934
                                 " locks were acquired, current nodes are"
935
                                 " are '%s', used to be '%s'; retry the"
936
                                 " operation" %
937
                                 (self.op.node_name,
938
                                  utils.CommaJoin(need_nodes),
939
                                  utils.CommaJoin(owned_nodes)),
940
                                 errors.ECODE_STATE)
941

    
942
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
943
    if owned_groups != wanted_groups:
944
      raise errors.OpExecError("Node groups changed since locks were acquired,"
945
                               " current groups are '%s', used to be '%s';"
946
                               " retry the operation" %
947
                               (utils.CommaJoin(wanted_groups),
948
                                utils.CommaJoin(owned_groups)))
949

    
950
    # Determine affected instances
951
    self.instances = self._DetermineInstances()
952
    self.instance_names = [i.name for i in self.instances]
953

    
954
    if set(self.instance_names) != owned_instances:
955
      raise errors.OpExecError("Instances on node '%s' changed since locks"
956
                               " were acquired, current instances are '%s',"
957
                               " used to be '%s'; retry the operation" %
958
                               (self.op.node_name,
959
                                utils.CommaJoin(self.instance_names),
960
                                utils.CommaJoin(owned_instances)))
961

    
962
    if self.instance_names:
963
      self.LogInfo("Evacuating instances from node '%s': %s",
964
                   self.op.node_name,
965
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
966
    else:
967
      self.LogInfo("No instances to evacuate from node '%s'",
968
                   self.op.node_name)
969

    
970
    if self.op.remote_node is not None:
971
      for i in self.instances:
972
        if i.primary_node == self.op.remote_node:
973
          raise errors.OpPrereqError("Node %s is the primary node of"
974
                                     " instance %s, cannot use it as"
975
                                     " secondary" %
976
                                     (self.op.remote_node, i.name),
977
                                     errors.ECODE_INVAL)
978

    
979
  def Exec(self, feedback_fn):
980
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
981

    
982
    if not self.instance_names:
983
      # No instances to evacuate
984
      jobs = []
985

    
986
    elif self.op.iallocator is not None:
987
      # TODO: Implement relocation to other group
988
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
989
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
990
                                     instances=list(self.instance_names))
991
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
992

    
993
      ial.Run(self.op.iallocator)
994

    
995
      if not ial.success:
996
        raise errors.OpPrereqError("Can't compute node evacuation using"
997
                                   " iallocator '%s': %s" %
998
                                   (self.op.iallocator, ial.info),
999
                                   errors.ECODE_NORES)
1000

    
1001
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1002

    
1003
    elif self.op.remote_node is not None:
1004
      assert self.op.mode == constants.NODE_EVAC_SEC
1005
      jobs = [
1006
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1007
                                        remote_node=self.op.remote_node,
1008
                                        disks=[],
1009
                                        mode=constants.REPLACE_DISK_CHG,
1010
                                        early_release=self.op.early_release)]
1011
        for instance_name in self.instance_names]
1012

    
1013
    else:
1014
      raise errors.ProgrammerError("No iallocator or remote node")
1015

    
1016
    return ResultWithJobs(jobs)
1017

    
1018

    
1019
class LUNodeMigrate(LogicalUnit):
1020
  """Migrate all instances from a node.
1021

1022
  """
1023
  HPATH = "node-migrate"
1024
  HTYPE = constants.HTYPE_NODE
1025
  REQ_BGL = False
1026

    
1027
  def CheckArguments(self):
1028
    pass
1029

    
1030
  def ExpandNames(self):
1031
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1032

    
1033
    self.share_locks = ShareAll()
1034
    self.needed_locks = {
1035
      locking.LEVEL_NODE: [self.op.node_name],
1036
      }
1037

    
1038
  def BuildHooksEnv(self):
1039
    """Build hooks env.
1040

1041
    This runs on the master, the primary and all the secondaries.
1042

1043
    """
1044
    return {
1045
      "NODE_NAME": self.op.node_name,
1046
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1047
      }
1048

    
1049
  def BuildHooksNodes(self):
1050
    """Build hooks nodes.
1051

1052
    """
1053
    nl = [self.cfg.GetMasterNode()]
1054
    return (nl, nl)
1055

    
1056
  def CheckPrereq(self):
1057
    pass
1058

    
1059
  def Exec(self, feedback_fn):
1060
    # Prepare jobs for migration instances
1061
    allow_runtime_changes = self.op.allow_runtime_changes
1062
    jobs = [
1063
      [opcodes.OpInstanceMigrate(instance_name=inst.name,
1064
                                 mode=self.op.mode,
1065
                                 live=self.op.live,
1066
                                 iallocator=self.op.iallocator,
1067
                                 target_node=self.op.target_node,
1068
                                 allow_runtime_changes=allow_runtime_changes,
1069
                                 ignore_ipolicy=self.op.ignore_ipolicy)]
1070
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
1071

    
1072
    # TODO: Run iallocator in this opcode and pass correct placement options to
1073
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1074
    # running the iallocator and the actual migration, a good consistency model
1075
    # will have to be found.
1076

    
1077
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1078
            frozenset([self.op.node_name]))
1079

    
1080
    return ResultWithJobs(jobs)
1081

    
1082

    
1083
def _GetStorageTypeArgs(cfg, storage_type):
1084
  """Returns the arguments for a storage type.
1085

1086
  """
1087
  # Special case for file storage
1088
  if storage_type == constants.ST_FILE:
1089
    # storage.FileStorage wants a list of storage directories
1090
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1091

    
1092
  return []
1093

    
1094

    
1095
class LUNodeModifyStorage(NoHooksLU):
1096
  """Logical unit for modifying a storage volume on a node.
1097

1098
  """
1099
  REQ_BGL = False
1100

    
1101
  def CheckArguments(self):
1102
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1103

    
1104
    storage_type = self.op.storage_type
1105

    
1106
    try:
1107
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1108
    except KeyError:
1109
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1110
                                 " modified" % storage_type,
1111
                                 errors.ECODE_INVAL)
1112

    
1113
    diff = set(self.op.changes.keys()) - modifiable
1114
    if diff:
1115
      raise errors.OpPrereqError("The following fields can not be modified for"
1116
                                 " storage units of type '%s': %r" %
1117
                                 (storage_type, list(diff)),
1118
                                 errors.ECODE_INVAL)
1119

    
1120
  def ExpandNames(self):
1121
    self.needed_locks = {
1122
      locking.LEVEL_NODE: self.op.node_name,
1123
      }
1124

    
1125
  def Exec(self, feedback_fn):
1126
    """Computes the list of nodes and their attributes.
1127

1128
    """
1129
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1130
    result = self.rpc.call_storage_modify(self.op.node_name,
1131
                                          self.op.storage_type, st_args,
1132
                                          self.op.name, self.op.changes)
1133
    result.Raise("Failed to modify storage unit '%s' on %s" %
1134
                 (self.op.name, self.op.node_name))
1135

    
1136

    
1137
class NodeQuery(QueryBase):
1138
  FIELDS = query.NODE_FIELDS
1139

    
1140
  def ExpandNames(self, lu):
1141
    lu.needed_locks = {}
1142
    lu.share_locks = ShareAll()
1143

    
1144
    if self.names:
1145
      self.wanted = GetWantedNodes(lu, self.names)
1146
    else:
1147
      self.wanted = locking.ALL_SET
1148

    
1149
    self.do_locking = (self.use_locking and
1150
                       query.NQ_LIVE in self.requested_data)
1151

    
1152
    if self.do_locking:
1153
      # If any non-static field is requested we need to lock the nodes
1154
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1155
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1156

    
1157
  def DeclareLocks(self, lu, level):
1158
    pass
1159

    
1160
  def _GetQueryData(self, lu):
1161
    """Computes the list of nodes and their attributes.
1162

1163
    """
1164
    all_info = lu.cfg.GetAllNodesInfo()
1165

    
1166
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1167

    
1168
    # Gather data as requested
1169
    if query.NQ_LIVE in self.requested_data:
1170
      # filter out non-vm_capable nodes
1171
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1172

    
1173
      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1174
      # FIXME: This currently maps everything to lvm, this should be more
1175
      # flexible
1176
      storage_units = [(constants.ST_LVM_VG, lu.cfg.GetVGName())]
1177
      node_data = lu.rpc.call_node_info(toquery_nodes, storage_units,
1178
                                        [lu.cfg.GetHypervisorType()], es_flags)
1179
      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1180
                       for (name, nresult) in node_data.items()
1181
                       if not nresult.fail_msg and nresult.payload)
1182
    else:
1183
      live_data = None
1184

    
1185
    if query.NQ_INST in self.requested_data:
1186
      node_to_primary = dict([(name, set()) for name in nodenames])
1187
      node_to_secondary = dict([(name, set()) for name in nodenames])
1188

    
1189
      inst_data = lu.cfg.GetAllInstancesInfo()
1190

    
1191
      for inst in inst_data.values():
1192
        if inst.primary_node in node_to_primary:
1193
          node_to_primary[inst.primary_node].add(inst.name)
1194
        for secnode in inst.secondary_nodes:
1195
          if secnode in node_to_secondary:
1196
            node_to_secondary[secnode].add(inst.name)
1197
    else:
1198
      node_to_primary = None
1199
      node_to_secondary = None
1200

    
1201
    if query.NQ_OOB in self.requested_data:
1202
      oob_support = dict((name, bool(SupportsOob(lu.cfg, node)))
1203
                         for name, node in all_info.iteritems())
1204
    else:
1205
      oob_support = None
1206

    
1207
    if query.NQ_GROUP in self.requested_data:
1208
      groups = lu.cfg.GetAllNodeGroupsInfo()
1209
    else:
1210
      groups = {}
1211

    
1212
    return query.NodeQueryData([all_info[name] for name in nodenames],
1213
                               live_data, lu.cfg.GetMasterNode(),
1214
                               node_to_primary, node_to_secondary, groups,
1215
                               oob_support, lu.cfg.GetClusterInfo())
1216

    
1217

    
1218
class LUNodeQuery(NoHooksLU):
1219
  """Logical unit for querying nodes.
1220

1221
  """
1222
  # pylint: disable=W0142
1223
  REQ_BGL = False
1224

    
1225
  def CheckArguments(self):
1226
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1227
                         self.op.output_fields, self.op.use_locking)
1228

    
1229
  def ExpandNames(self):
1230
    self.nq.ExpandNames(self)
1231

    
1232
  def DeclareLocks(self, level):
1233
    self.nq.DeclareLocks(self, level)
1234

    
1235
  def Exec(self, feedback_fn):
1236
    return self.nq.OldStyleQuery(self)
1237

    
1238

    
1239
def _CheckOutputFields(static, dynamic, selected):
1240
  """Checks whether all selected fields are valid.
1241

1242
  @type static: L{utils.FieldSet}
1243
  @param static: static fields set
1244
  @type dynamic: L{utils.FieldSet}
1245
  @param dynamic: dynamic fields set
1246

1247
  """
1248
  f = utils.FieldSet()
1249
  f.Extend(static)
1250
  f.Extend(dynamic)
1251

    
1252
  delta = f.NonMatching(selected)
1253
  if delta:
1254
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1255
                               % ",".join(delta), errors.ECODE_INVAL)
1256

    
1257

    
1258
class LUNodeQueryvols(NoHooksLU):
1259
  """Logical unit for getting volumes on node(s).
1260

1261
  """
1262
  REQ_BGL = False
1263
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1264
  _FIELDS_STATIC = utils.FieldSet("node")
1265

    
1266
  def CheckArguments(self):
1267
    _CheckOutputFields(static=self._FIELDS_STATIC,
1268
                       dynamic=self._FIELDS_DYNAMIC,
1269
                       selected=self.op.output_fields)
1270

    
1271
  def ExpandNames(self):
1272
    self.share_locks = ShareAll()
1273

    
1274
    if self.op.nodes:
1275
      self.needed_locks = {
1276
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1277
        }
1278
    else:
1279
      self.needed_locks = {
1280
        locking.LEVEL_NODE: locking.ALL_SET,
1281
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1282
        }
1283

    
1284
  def Exec(self, feedback_fn):
1285
    """Computes the list of nodes and their attributes.
1286

1287
    """
1288
    nodenames = self.owned_locks(locking.LEVEL_NODE)
1289
    volumes = self.rpc.call_node_volumes(nodenames)
1290

    
1291
    ilist = self.cfg.GetAllInstancesInfo()
1292
    vol2inst = MapInstanceDisksToNodes(ilist.values())
1293

    
1294
    output = []
1295
    for node in nodenames:
1296
      nresult = volumes[node]
1297
      if nresult.offline:
1298
        continue
1299
      msg = nresult.fail_msg
1300
      if msg:
1301
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1302
        continue
1303

    
1304
      node_vols = sorted(nresult.payload,
1305
                         key=operator.itemgetter("dev"))
1306

    
1307
      for vol in node_vols:
1308
        node_output = []
1309
        for field in self.op.output_fields:
1310
          if field == "node":
1311
            val = node
1312
          elif field == "phys":
1313
            val = vol["dev"]
1314
          elif field == "vg":
1315
            val = vol["vg"]
1316
          elif field == "name":
1317
            val = vol["name"]
1318
          elif field == "size":
1319
            val = int(float(vol["size"]))
1320
          elif field == "instance":
1321
            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1322
          else:
1323
            raise errors.ParameterError(field)
1324
          node_output.append(str(val))
1325

    
1326
        output.append(node_output)
1327

    
1328
    return output
1329

    
1330

    
1331
class LUNodeQueryStorage(NoHooksLU):
1332
  """Logical unit for getting information on storage units on node(s).
1333

1334
  """
1335
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1336
  REQ_BGL = False
1337

    
1338
  def CheckArguments(self):
1339
    _CheckOutputFields(static=self._FIELDS_STATIC,
1340
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1341
                       selected=self.op.output_fields)
1342

    
1343
  def ExpandNames(self):
1344
    self.share_locks = ShareAll()
1345

    
1346
    if self.op.nodes:
1347
      self.needed_locks = {
1348
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1349
        }
1350
    else:
1351
      self.needed_locks = {
1352
        locking.LEVEL_NODE: locking.ALL_SET,
1353
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1354
        }
1355

    
1356
  def Exec(self, feedback_fn):
1357
    """Computes the list of nodes and their attributes.
1358

1359
    """
1360
    self.nodes = self.owned_locks(locking.LEVEL_NODE)
1361

    
1362
    # Always get name to sort by
1363
    if constants.SF_NAME in self.op.output_fields:
1364
      fields = self.op.output_fields[:]
1365
    else:
1366
      fields = [constants.SF_NAME] + self.op.output_fields
1367

    
1368
    # Never ask for node or type as it's only known to the LU
1369
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1370
      while extra in fields:
1371
        fields.remove(extra)
1372

    
1373
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1374
    name_idx = field_idx[constants.SF_NAME]
1375

    
1376
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1377
    data = self.rpc.call_storage_list(self.nodes,
1378
                                      self.op.storage_type, st_args,
1379
                                      self.op.name, fields)
1380

    
1381
    result = []
1382

    
1383
    for node in utils.NiceSort(self.nodes):
1384
      nresult = data[node]
1385
      if nresult.offline:
1386
        continue
1387

    
1388
      msg = nresult.fail_msg
1389
      if msg:
1390
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1391
        continue
1392

    
1393
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1394

    
1395
      for name in utils.NiceSort(rows.keys()):
1396
        row = rows[name]
1397

    
1398
        out = []
1399

    
1400
        for field in self.op.output_fields:
1401
          if field == constants.SF_NODE:
1402
            val = node
1403
          elif field == constants.SF_TYPE:
1404
            val = self.op.storage_type
1405
          elif field in field_idx:
1406
            val = row[field_idx[field]]
1407
          else:
1408
            raise errors.ParameterError(field)
1409

    
1410
          out.append(val)
1411

    
1412
        result.append(out)
1413

    
1414
    return result
1415

    
1416

    
1417
class LUNodeRemove(LogicalUnit):
1418
  """Logical unit for removing a node.
1419

1420
  """
1421
  HPATH = "node-remove"
1422
  HTYPE = constants.HTYPE_NODE
1423

    
1424
  def BuildHooksEnv(self):
1425
    """Build hooks env.
1426

1427
    """
1428
    return {
1429
      "OP_TARGET": self.op.node_name,
1430
      "NODE_NAME": self.op.node_name,
1431
      }
1432

    
1433
  def BuildHooksNodes(self):
1434
    """Build hooks nodes.
1435

1436
    This doesn't run on the target node in the pre phase as a failed
1437
    node would then be impossible to remove.
1438

1439
    """
1440
    all_nodes = self.cfg.GetNodeList()
1441
    try:
1442
      all_nodes.remove(self.op.node_name)
1443
    except ValueError:
1444
      pass
1445
    return (all_nodes, all_nodes)
1446

    
1447
  def CheckPrereq(self):
1448
    """Check prerequisites.
1449

1450
    This checks:
1451
     - the node exists in the configuration
1452
     - it does not have primary or secondary instances
1453
     - it's not the master
1454

1455
    Any errors are signaled by raising errors.OpPrereqError.
1456

1457
    """
1458
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1459
    node = self.cfg.GetNodeInfo(self.op.node_name)
1460
    assert node is not None
1461

    
1462
    masternode = self.cfg.GetMasterNode()
1463
    if node.name == masternode:
1464
      raise errors.OpPrereqError("Node is the master node, failover to another"
1465
                                 " node is required", errors.ECODE_INVAL)
1466

    
1467
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1468
      if node.name in instance.all_nodes:
1469
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1470
                                   " please remove first" % instance_name,
1471
                                   errors.ECODE_INVAL)
1472
    self.op.node_name = node.name
1473
    self.node = node
1474

    
1475
  def Exec(self, feedback_fn):
1476
    """Removes the node from the cluster.
1477

1478
    """
1479
    node = self.node
1480
    logging.info("Stopping the node daemon and removing configs from node %s",
1481
                 node.name)
1482

    
1483
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1484

    
1485
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1486
      "Not owning BGL"
1487

    
1488
    # Promote nodes to master candidate as needed
1489
    AdjustCandidatePool(self, exceptions=[node.name])
1490
    self.context.RemoveNode(node.name)
1491

    
1492
    # Run post hooks on the node before it's removed
1493
    RunPostHook(self, node.name)
1494

    
1495
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1496
    msg = result.fail_msg
1497
    if msg:
1498
      self.LogWarning("Errors encountered on the remote node while leaving"
1499
                      " the cluster: %s", msg)
1500

    
1501
    # Remove node from our /etc/hosts
1502
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1503
      master_node = self.cfg.GetMasterNode()
1504
      result = self.rpc.call_etc_hosts_modify(master_node,
1505
                                              constants.ETC_HOSTS_REMOVE,
1506
                                              node.name, None)
1507
      result.Raise("Can't update hosts file with new host data")
1508
      RedistributeAncillaryFiles(self)
1509

    
1510

    
1511
class LURepairNodeStorage(NoHooksLU):
1512
  """Repairs the volume group on a node.
1513

1514
  """
1515
  REQ_BGL = False
1516

    
1517
  def CheckArguments(self):
1518
    self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1519

    
1520
    storage_type = self.op.storage_type
1521

    
1522
    if (constants.SO_FIX_CONSISTENCY not in
1523
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1524
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1525
                                 " repaired" % storage_type,
1526
                                 errors.ECODE_INVAL)
1527

    
1528
  def ExpandNames(self):
1529
    self.needed_locks = {
1530
      locking.LEVEL_NODE: [self.op.node_name],
1531
      }
1532

    
1533
  def _CheckFaultyDisks(self, instance, node_name):
1534
    """Ensure faulty disks abort the opcode or at least warn."""
1535
    try:
1536
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1537
                                 node_name, True):
1538
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1539
                                   " node '%s'" % (instance.name, node_name),
1540
                                   errors.ECODE_STATE)
1541
    except errors.OpPrereqError, err:
1542
      if self.op.ignore_consistency:
1543
        self.LogWarning(str(err.args[0]))
1544
      else:
1545
        raise
1546

    
1547
  def CheckPrereq(self):
1548
    """Check prerequisites.
1549

1550
    """
1551
    # Check whether any instance on this node has faulty disks
1552
    for inst in _GetNodeInstances(self.cfg, self.op.node_name):
1553
      if not inst.disks_active:
1554
        continue
1555
      check_nodes = set(inst.all_nodes)
1556
      check_nodes.discard(self.op.node_name)
1557
      for inst_node_name in check_nodes:
1558
        self._CheckFaultyDisks(inst, inst_node_name)
1559

    
1560
  def Exec(self, feedback_fn):
1561
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1562
                (self.op.name, self.op.node_name))
1563

    
1564
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1565
    result = self.rpc.call_storage_execute(self.op.node_name,
1566
                                           self.op.storage_type, st_args,
1567
                                           self.op.name,
1568
                                           constants.SO_FIX_CONSISTENCY)
1569
    result.Raise("Failed to repair storage unit '%s' on %s" %
1570
                 (self.op.name, self.op.node_name))