Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 1c3231aa

History | View | Annotate | Download (58 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
142

    
143
  def CheckPrereq(self):
144
    """Check prerequisites.
145

146
    This checks:
147
     - the new node is not already in the config
148
     - it is resolvable
149
     - its parameters (single/dual homed) matches the cluster
150

151
    Any errors are signaled by raising errors.OpPrereqError.
152

153
    """
154
    cfg = self.cfg
155
    hostname = self.hostname
156
    node_name = hostname.name
157
    primary_ip = self.op.primary_ip = hostname.ip
158
    if self.op.secondary_ip is None:
159
      if self.primary_ip_family == netutils.IP6Address.family:
160
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
161
                                   " IPv4 address must be given as secondary",
162
                                   errors.ECODE_INVAL)
163
      self.op.secondary_ip = primary_ip
164

    
165
    secondary_ip = self.op.secondary_ip
166
    if not netutils.IP4Address.IsValid(secondary_ip):
167
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
168
                                 " address" % secondary_ip, errors.ECODE_INVAL)
169

    
170
    existing_node_info = cfg.GetNodeInfoByName(node_name)
171
    if not self.op.readd and existing_node_info is not None:
172
      raise errors.OpPrereqError("Node %s is already in the configuration" %
173
                                 node_name, errors.ECODE_EXISTS)
174
    elif self.op.readd and existing_node_info is None:
175
      raise errors.OpPrereqError("Node %s is not in the configuration" %
176
                                 node_name, errors.ECODE_NOENT)
177

    
178
    self.changed_primary_ip = False
179

    
180
    for existing_node in cfg.GetAllNodesInfo().values():
181
      if self.op.readd and node_name == existing_node.name:
182
        if existing_node.secondary_ip != secondary_ip:
183
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
184
                                     " address configuration as before",
185
                                     errors.ECODE_INVAL)
186
        if existing_node.primary_ip != primary_ip:
187
          self.changed_primary_ip = True
188

    
189
        continue
190

    
191
      if (existing_node.primary_ip == primary_ip or
192
          existing_node.secondary_ip == primary_ip or
193
          existing_node.primary_ip == secondary_ip or
194
          existing_node.secondary_ip == secondary_ip):
195
        raise errors.OpPrereqError("New node ip address(es) conflict with"
196
                                   " existing node %s" % existing_node.name,
197
                                   errors.ECODE_NOTUNIQUE)
198

    
199
    # After this 'if' block, None is no longer a valid value for the
200
    # _capable op attributes
201
    if self.op.readd:
202
      assert existing_node_info is not None, \
203
        "Can't retrieve locked node %s" % node_name
204
      for attr in self._NFLAGS:
205
        if getattr(self.op, attr) is None:
206
          setattr(self.op, attr, getattr(existing_node_info, attr))
207
    else:
208
      for attr in self._NFLAGS:
209
        if getattr(self.op, attr) is None:
210
          setattr(self.op, attr, True)
211

    
212
    if self.op.readd and not self.op.vm_capable:
213
      pri, sec = cfg.GetNodeInstances(existing_node_info.uuid)
214
      if pri or sec:
215
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
216
                                   " flag set to false, but it already holds"
217
                                   " instances" % node_name,
218
                                   errors.ECODE_STATE)
219

    
220
    # check that the type of the node (single versus dual homed) is the
221
    # same as for the master
222
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
223
    master_singlehomed = myself.secondary_ip == myself.primary_ip
224
    newbie_singlehomed = secondary_ip == primary_ip
225
    if master_singlehomed != newbie_singlehomed:
226
      if master_singlehomed:
227
        raise errors.OpPrereqError("The master has no secondary ip but the"
228
                                   " new node has one",
229
                                   errors.ECODE_INVAL)
230
      else:
231
        raise errors.OpPrereqError("The master has a secondary ip but the"
232
                                   " new node doesn't have one",
233
                                   errors.ECODE_INVAL)
234

    
235
    # checks reachability
236
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
237
      raise errors.OpPrereqError("Node not reachable by ping",
238
                                 errors.ECODE_ENVIRON)
239

    
240
    if not newbie_singlehomed:
241
      # check reachability from my secondary ip to newbie's secondary ip
242
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
243
                              source=myself.secondary_ip):
244
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
245
                                   " based ping to node daemon port",
246
                                   errors.ECODE_ENVIRON)
247

    
248
    if self.op.readd:
249
      exceptions = [existing_node_info.uuid]
250
    else:
251
      exceptions = []
252

    
253
    if self.op.master_capable:
254
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
255
    else:
256
      self.master_candidate = False
257

    
258
    if self.op.readd:
259
      self.new_node = existing_node_info
260
    else:
261
      node_group = cfg.LookupNodeGroup(self.op.group)
262
      self.new_node = objects.Node(name=node_name,
263
                                   primary_ip=primary_ip,
264
                                   secondary_ip=secondary_ip,
265
                                   master_candidate=self.master_candidate,
266
                                   offline=False, drained=False,
267
                                   group=node_group, ndparams={})
268

    
269
    if self.op.ndparams:
270
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
271
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
272
                           "node", "cluster or group")
273

    
274
    if self.op.hv_state:
275
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
276

    
277
    if self.op.disk_state:
278
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
279

    
280
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
281
    #       it a property on the base class.
282
    rpcrunner = rpc.DnsOnlyRunner()
283
    result = rpcrunner.call_version([node_name])[node_name]
284
    result.Raise("Can't get version information from node %s" % node_name)
285
    if constants.PROTOCOL_VERSION == result.payload:
286
      logging.info("Communication to node %s fine, sw version %s match",
287
                   node_name, result.payload)
288
    else:
289
      raise errors.OpPrereqError("Version mismatch master version %s,"
290
                                 " node version %s" %
291
                                 (constants.PROTOCOL_VERSION, result.payload),
292
                                 errors.ECODE_ENVIRON)
293

    
294
    vg_name = cfg.GetVGName()
295
    if vg_name is not None:
296
      vparams = {constants.NV_PVLIST: [vg_name]}
297
      excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
298
      cname = self.cfg.GetClusterName()
299
      result = rpcrunner.call_node_verify_light(
300
          [node_name], vparams, cname, cfg.GetClusterInfo().hvparams)[node_name]
301
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302
      if errmsgs:
303
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
304
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
305

    
306
  def Exec(self, feedback_fn):
307
    """Adds the new node to the cluster.
308

309
    """
310
    new_node = self.new_node
311
    node_name = new_node.name
312

    
313
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
314
      "Not owning BGL"
315

    
316
    # We adding a new node so we assume it's powered
317
    new_node.powered = True
318

    
319
    # for re-adds, reset the offline/drained/master-candidate flags;
320
    # we need to reset here, otherwise offline would prevent RPC calls
321
    # later in the procedure; this also means that if the re-add
322
    # fails, we are left with a non-offlined, broken node
323
    if self.op.readd:
324
      new_node.drained = new_node.offline = False # pylint: disable=W0201
325
      self.LogInfo("Readding a node, the offline/drained flags were reset")
326
      # if we demote the node, we do cleanup later in the procedure
327
      new_node.master_candidate = self.master_candidate
328
      if self.changed_primary_ip:
329
        new_node.primary_ip = self.op.primary_ip
330

    
331
    # copy the master/vm_capable flags
332
    for attr in self._NFLAGS:
333
      setattr(new_node, attr, getattr(self.op, attr))
334

    
335
    # notify the user about any possible mc promotion
336
    if new_node.master_candidate:
337
      self.LogInfo("Node will be a master candidate")
338

    
339
    if self.op.ndparams:
340
      new_node.ndparams = self.op.ndparams
341
    else:
342
      new_node.ndparams = {}
343

    
344
    if self.op.hv_state:
345
      new_node.hv_state_static = self.new_hv_state
346

    
347
    if self.op.disk_state:
348
      new_node.disk_state_static = self.new_disk_state
349

    
350
    # Add node to our /etc/hosts, and add key to known_hosts
351
    if self.cfg.GetClusterInfo().modify_etc_hosts:
352
      master_node = self.cfg.GetMasterNode()
353
      result = self.rpc.call_etc_hosts_modify(
354
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
355
                 self.hostname.ip)
356
      result.Raise("Can't update hosts file with new host data")
357

    
358
    if new_node.secondary_ip != new_node.primary_ip:
359
      _CheckNodeHasSecondaryIP(self, new_node, new_node.secondary_ip, False)
360

    
361
    node_verifier_uuids = [self.cfg.GetMasterNode()]
362
    node_verify_param = {
363
      constants.NV_NODELIST: ([node_name], {}),
364
      # TODO: do a node-net-test as well?
365
    }
366

    
367
    result = self.rpc.call_node_verify(
368
               node_verifier_uuids, node_verify_param,
369
               self.cfg.GetClusterName(),
370
               self.cfg.GetClusterInfo().hvparams)
371
    for verifier in node_verifier_uuids:
372
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
373
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
374
      if nl_payload:
375
        for failed in nl_payload:
376
          feedback_fn("ssh/hostname verification failed"
377
                      " (checking from %s): %s" %
378
                      (verifier, nl_payload[failed]))
379
        raise errors.OpExecError("ssh/hostname verification failed")
380

    
381
    if self.op.readd:
382
      self.context.ReaddNode(new_node)
383
      RedistributeAncillaryFiles(self)
384
      # make sure we redistribute the config
385
      self.cfg.Update(new_node, feedback_fn)
386
      # and make sure the new node will not have old files around
387
      if not new_node.master_candidate:
388
        result = self.rpc.call_node_demote_from_mc(new_node.uuid)
389
        result.Warn("Node failed to demote itself from master candidate status",
390
                    self.LogWarning)
391
    else:
392
      self.context.AddNode(new_node, self.proc.GetECId())
393
      RedistributeAncillaryFiles(self)
394

    
395

    
396
class LUNodeSetParams(LogicalUnit):
397
  """Modifies the parameters of a node.
398

399
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
400
      to the node role (as _ROLE_*)
401
  @cvar _R2F: a dictionary from node role to tuples of flags
402
  @cvar _FLAGS: a list of attribute names corresponding to the flags
403

404
  """
405
  HPATH = "node-modify"
406
  HTYPE = constants.HTYPE_NODE
407
  REQ_BGL = False
408
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
409
  _F2R = {
410
    (True, False, False): _ROLE_CANDIDATE,
411
    (False, True, False): _ROLE_DRAINED,
412
    (False, False, True): _ROLE_OFFLINE,
413
    (False, False, False): _ROLE_REGULAR,
414
    }
415
  _R2F = dict((v, k) for k, v in _F2R.items())
416
  _FLAGS = ["master_candidate", "drained", "offline"]
417

    
418
  def CheckArguments(self):
419
    (self.op.node_uuid, self.op.node_name) = \
420
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
421
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
422
                self.op.master_capable, self.op.vm_capable,
423
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
424
                self.op.disk_state]
425
    if all_mods.count(None) == len(all_mods):
426
      raise errors.OpPrereqError("Please pass at least one modification",
427
                                 errors.ECODE_INVAL)
428
    if all_mods.count(True) > 1:
429
      raise errors.OpPrereqError("Can't set the node into more than one"
430
                                 " state at the same time",
431
                                 errors.ECODE_INVAL)
432

    
433
    # Boolean value that tells us whether we might be demoting from MC
434
    self.might_demote = (self.op.master_candidate is False or
435
                         self.op.offline is True or
436
                         self.op.drained is True or
437
                         self.op.master_capable is False)
438

    
439
    if self.op.secondary_ip:
440
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
441
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
442
                                   " address" % self.op.secondary_ip,
443
                                   errors.ECODE_INVAL)
444

    
445
    self.lock_all = self.op.auto_promote and self.might_demote
446
    self.lock_instances = self.op.secondary_ip is not None
447

    
448
  def _InstanceFilter(self, instance):
449
    """Filter for getting affected instances.
450

451
    """
452
    return (instance.disk_template in constants.DTS_INT_MIRROR and
453
            self.op.node_uuid in instance.all_nodes)
454

    
455
  def ExpandNames(self):
456
    if self.lock_all:
457
      self.needed_locks = {
458
        locking.LEVEL_NODE: locking.ALL_SET,
459

    
460
        # Block allocations when all nodes are locked
461
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462
        }
463
    else:
464
      self.needed_locks = {
465
        locking.LEVEL_NODE: self.op.node_uuid,
466
        }
467

    
468
    # Since modifying a node can have severe effects on currently running
469
    # operations the resource lock is at least acquired in shared mode
470
    self.needed_locks[locking.LEVEL_NODE_RES] = \
471
      self.needed_locks[locking.LEVEL_NODE]
472

    
473
    # Get all locks except nodes in shared mode; they are not used for anything
474
    # but read-only access
475
    self.share_locks = ShareAll()
476
    self.share_locks[locking.LEVEL_NODE] = 0
477
    self.share_locks[locking.LEVEL_NODE_RES] = 0
478
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
479

    
480
    if self.lock_instances:
481
      self.needed_locks[locking.LEVEL_INSTANCE] = \
482
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
483

    
484
  def BuildHooksEnv(self):
485
    """Build hooks env.
486

487
    This runs on the master node.
488

489
    """
490
    return {
491
      "OP_TARGET": self.op.node_name,
492
      "MASTER_CANDIDATE": str(self.op.master_candidate),
493
      "OFFLINE": str(self.op.offline),
494
      "DRAINED": str(self.op.drained),
495
      "MASTER_CAPABLE": str(self.op.master_capable),
496
      "VM_CAPABLE": str(self.op.vm_capable),
497
      }
498

    
499
  def BuildHooksNodes(self):
500
    """Build hooks nodes.
501

502
    """
503
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
504
    return (nl, nl)
505

    
506
  def CheckPrereq(self):
507
    """Check prerequisites.
508

509
    This only checks the instance list against the existing names.
510

511
    """
512
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
513
    if self.lock_instances:
514
      affected_instances = \
515
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
516

    
517
      # Verify instance locks
518
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
519
      wanted_instances = frozenset(affected_instances.keys())
520
      if wanted_instances - owned_instances:
521
        raise errors.OpPrereqError("Instances affected by changing node %s's"
522
                                   " secondary IP address have changed since"
523
                                   " locks were acquired, wanted '%s', have"
524
                                   " '%s'; retry the operation" %
525
                                   (node.name,
526
                                    utils.CommaJoin(wanted_instances),
527
                                    utils.CommaJoin(owned_instances)),
528
                                   errors.ECODE_STATE)
529
    else:
530
      affected_instances = None
531

    
532
    if (self.op.master_candidate is not None or
533
        self.op.drained is not None or
534
        self.op.offline is not None):
535
      # we can't change the master's node flags
536
      if node.uuid == self.cfg.GetMasterNode():
537
        raise errors.OpPrereqError("The master role can be changed"
538
                                   " only via master-failover",
539
                                   errors.ECODE_INVAL)
540

    
541
    if self.op.master_candidate and not node.master_capable:
542
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543
                                 " it a master candidate" % node.name,
544
                                 errors.ECODE_STATE)
545

    
546
    if self.op.vm_capable is False:
547
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548
      if ipri or isec:
549
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550
                                   " the vm_capable flag" % node.name,
551
                                   errors.ECODE_STATE)
552

    
553
    if node.master_candidate and self.might_demote and not self.lock_all:
554
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
555
      # check if after removing the current node, we're missing master
556
      # candidates
557
      (mc_remaining, mc_should, _) = \
558
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559
      if mc_remaining < mc_should:
560
        raise errors.OpPrereqError("Not enough master candidates, please"
561
                                   " pass auto promote option to allow"
562
                                   " promotion (--auto-promote or RAPI"
563
                                   " auto_promote=True)", errors.ECODE_STATE)
564

    
565
    self.old_flags = old_flags = (node.master_candidate,
566
                                  node.drained, node.offline)
567
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568
    self.old_role = old_role = self._F2R[old_flags]
569

    
570
    # Check for ineffective changes
571
    for attr in self._FLAGS:
572
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
573
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574
        setattr(self.op, attr, None)
575

    
576
    # Past this point, any flag change to False means a transition
577
    # away from the respective state, as only real changes are kept
578

    
579
    # TODO: We might query the real power state if it supports OOB
580
    if SupportsOob(self.cfg, node):
581
      if self.op.offline is False and not (node.powered or
582
                                           self.op.powered is True):
583
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584
                                    " offline status can be reset") %
585
                                   self.op.node_name, errors.ECODE_STATE)
586
    elif self.op.powered is not None:
587
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
588
                                  " as it does not support out-of-band"
589
                                  " handling") % self.op.node_name,
590
                                 errors.ECODE_STATE)
591

    
592
    # If we're being deofflined/drained, we'll MC ourself if needed
593
    if (self.op.drained is False or self.op.offline is False or
594
        (self.op.master_capable and not node.master_capable)):
595
      if _DecideSelfPromotion(self):
596
        self.op.master_candidate = True
597
        self.LogInfo("Auto-promoting node to master candidate")
598

    
599
    # If we're no longer master capable, we'll demote ourselves from MC
600
    if self.op.master_capable is False and node.master_candidate:
601
      self.LogInfo("Demoting from master candidate")
602
      self.op.master_candidate = False
603

    
604
    # Compute new role
605
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606
    if self.op.master_candidate:
607
      new_role = self._ROLE_CANDIDATE
608
    elif self.op.drained:
609
      new_role = self._ROLE_DRAINED
610
    elif self.op.offline:
611
      new_role = self._ROLE_OFFLINE
612
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613
      # False is still in new flags, which means we're un-setting (the
614
      # only) True flag
615
      new_role = self._ROLE_REGULAR
616
    else: # no new flags, nothing, keep old role
617
      new_role = old_role
618

    
619
    self.new_role = new_role
620

    
621
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
622
      # Trying to transition out of offline status
623
      result = self.rpc.call_version([node.uuid])[node.uuid]
624
      if result.fail_msg:
625
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626
                                   " to report its version: %s" %
627
                                   (node.name, result.fail_msg),
628
                                   errors.ECODE_STATE)
629
      else:
630
        self.LogWarning("Transitioning node from offline to online state"
631
                        " without using re-add. Please make sure the node"
632
                        " is healthy!")
633

    
634
    # When changing the secondary ip, verify if this is a single-homed to
635
    # multi-homed transition or vice versa, and apply the relevant
636
    # restrictions.
637
    if self.op.secondary_ip:
638
      # Ok even without locking, because this can't be changed by any LU
639
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640
      master_singlehomed = master.secondary_ip == master.primary_ip
641
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642
        if self.op.force and node.uuid == master.uuid:
643
          self.LogWarning("Transitioning from single-homed to multi-homed"
644
                          " cluster; all nodes will require a secondary IP"
645
                          " address")
646
        else:
647
          raise errors.OpPrereqError("Changing the secondary ip on a"
648
                                     " single-homed cluster requires the"
649
                                     " --force option to be passed, and the"
650
                                     " target node to be the master",
651
                                     errors.ECODE_INVAL)
652
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653
        if self.op.force and node.uuid == master.uuid:
654
          self.LogWarning("Transitioning from multi-homed to single-homed"
655
                          " cluster; secondary IP addresses will have to be"
656
                          " removed")
657
        else:
658
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659
                                     " same as the primary IP on a multi-homed"
660
                                     " cluster, unless the --force option is"
661
                                     " passed, and the target node is the"
662
                                     " master", errors.ECODE_INVAL)
663

    
664
      assert not (frozenset(affected_instances) -
665
                  self.owned_locks(locking.LEVEL_INSTANCE))
666

    
667
      if node.offline:
668
        if affected_instances:
669
          msg = ("Cannot change secondary IP address: offline node has"
670
                 " instances (%s) configured to use it" %
671
                 utils.CommaJoin(affected_instances.keys()))
672
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
673
      else:
674
        # On online nodes, check that no instances are running, and that
675
        # the node has the new ip and we can reach it.
676
        for instance in affected_instances.values():
677
          CheckInstanceState(self, instance, INSTANCE_DOWN,
678
                             msg="cannot change secondary ip")
679

    
680
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681
        if master.uuid != node.uuid:
682
          # check reachability from master secondary ip to new secondary ip
683
          if not netutils.TcpPing(self.op.secondary_ip,
684
                                  constants.DEFAULT_NODED_PORT,
685
                                  source=master.secondary_ip):
686
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687
                                       " based ping to node daemon port",
688
                                       errors.ECODE_ENVIRON)
689

    
690
    if self.op.ndparams:
691
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694
                           "node", "cluster or group")
695
      self.new_ndparams = new_ndparams
696

    
697
    if self.op.hv_state:
698
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699
                                                node.hv_state_static)
700

    
701
    if self.op.disk_state:
702
      self.new_disk_state = \
703
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
704

    
705
  def Exec(self, feedback_fn):
706
    """Modifies a node.
707

708
    """
709
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
710
    old_role = self.old_role
711
    new_role = self.new_role
712

    
713
    result = []
714

    
715
    if self.op.ndparams:
716
      node.ndparams = self.new_ndparams
717

    
718
    if self.op.powered is not None:
719
      node.powered = self.op.powered
720

    
721
    if self.op.hv_state:
722
      node.hv_state_static = self.new_hv_state
723

    
724
    if self.op.disk_state:
725
      node.disk_state_static = self.new_disk_state
726

    
727
    for attr in ["master_capable", "vm_capable"]:
728
      val = getattr(self.op, attr)
729
      if val is not None:
730
        setattr(node, attr, val)
731
        result.append((attr, str(val)))
732

    
733
    if new_role != old_role:
734
      # Tell the node to demote itself, if no longer MC and not offline
735
      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
736
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
737
        if msg:
738
          self.LogWarning("Node failed to demote itself: %s", msg)
739

    
740
      new_flags = self._R2F[new_role]
741
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
742
        if of != nf:
743
          result.append((desc, str(nf)))
744
      (node.master_candidate, node.drained, node.offline) = new_flags
745

    
746
      # we locked all nodes, we adjust the CP before updating this node
747
      if self.lock_all:
748
        AdjustCandidatePool(self, [node.uuid])
749

    
750
    if self.op.secondary_ip:
751
      node.secondary_ip = self.op.secondary_ip
752
      result.append(("secondary_ip", self.op.secondary_ip))
753

    
754
    # this will trigger configuration file update, if needed
755
    self.cfg.Update(node, feedback_fn)
756

    
757
    # this will trigger job queue propagation or cleanup if the mc
758
    # flag changed
759
    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
760
      self.context.ReaddNode(node)
761

    
762
    return result
763

    
764

    
765
class LUNodePowercycle(NoHooksLU):
766
  """Powercycles a node.
767

768
  """
769
  REQ_BGL = False
770

    
771
  def CheckArguments(self):
772
    (self.op.node_uuid, self.op.node_name) = \
773
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
774

    
775
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
776
      raise errors.OpPrereqError("The node is the master and the force"
777
                                 " parameter was not set",
778
                                 errors.ECODE_INVAL)
779

    
780
  def ExpandNames(self):
781
    """Locking for PowercycleNode.
782

783
    This is a last-resort option and shouldn't block on other
784
    jobs. Therefore, we grab no locks.
785

786
    """
787
    self.needed_locks = {}
788

    
789
  def Exec(self, feedback_fn):
790
    """Reboots a node.
791

792
    """
793
    default_hypervisor = self.cfg.GetHypervisorType()
794
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
795
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
796
                                           default_hypervisor,
797
                                           hvparams)
798
    result.Raise("Failed to schedule the reboot")
799
    return result.payload
800

    
801

    
802
def _GetNodeInstancesInner(cfg, fn):
803
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
804

    
805

    
806
def _GetNodePrimaryInstances(cfg, node_uuid):
807
  """Returns primary instances on a node.
808

809
  """
810
  return _GetNodeInstancesInner(cfg,
811
                                lambda inst: node_uuid == inst.primary_node)
812

    
813

    
814
def _GetNodeSecondaryInstances(cfg, node_uuid):
815
  """Returns secondary instances on a node.
816

817
  """
818
  return _GetNodeInstancesInner(cfg,
819
                                lambda inst: node_uuid in inst.secondary_nodes)
820

    
821

    
822
def _GetNodeInstances(cfg, node_uuid):
823
  """Returns a list of all primary and secondary instances on a node.
824

825
  """
826

    
827
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
828

    
829

    
830
class LUNodeEvacuate(NoHooksLU):
831
  """Evacuates instances off a list of nodes.
832

833
  """
834
  REQ_BGL = False
835

    
836
  _MODE2IALLOCATOR = {
837
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
838
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
839
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
840
    }
841
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
842
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
843
          constants.IALLOCATOR_NEVAC_MODES)
844

    
845
  def CheckArguments(self):
846
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
847

    
848
  def ExpandNames(self):
849
    (self.op.node_uuid, self.op.node_name) = \
850
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
851

    
852
    if self.op.remote_node is not None:
853
      (self.op.remote_node_uuid, self.op.remote_node) = \
854
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
855
                              self.op.remote_node)
856
      assert self.op.remote_node
857

    
858
      if self.op.node_uuid == self.op.remote_node_uuid:
859
        raise errors.OpPrereqError("Can not use evacuated node as a new"
860
                                   " secondary node", errors.ECODE_INVAL)
861

    
862
      if self.op.mode != constants.NODE_EVAC_SEC:
863
        raise errors.OpPrereqError("Without the use of an iallocator only"
864
                                   " secondary instances can be evacuated",
865
                                   errors.ECODE_INVAL)
866

    
867
    # Declare locks
868
    self.share_locks = ShareAll()
869
    self.needed_locks = {
870
      locking.LEVEL_INSTANCE: [],
871
      locking.LEVEL_NODEGROUP: [],
872
      locking.LEVEL_NODE: [],
873
      }
874

    
875
    # Determine nodes (via group) optimistically, needs verification once locks
876
    # have been acquired
877
    self.lock_nodes = self._DetermineNodes()
878

    
879
  def _DetermineNodes(self):
880
    """Gets the list of node UUIDs to operate on.
881

882
    """
883
    if self.op.remote_node is None:
884
      # Iallocator will choose any node(s) in the same group
885
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
886
    else:
887
      group_nodes = frozenset([self.op.remote_node_uuid])
888

    
889
    # Determine nodes to be locked
890
    return set([self.op.node_uuid]) | group_nodes
891

    
892
  def _DetermineInstances(self):
893
    """Builds list of instances to operate on.
894

895
    """
896
    assert self.op.mode in constants.NODE_EVAC_MODES
897

    
898
    if self.op.mode == constants.NODE_EVAC_PRI:
899
      # Primary instances only
900
      inst_fn = _GetNodePrimaryInstances
901
      assert self.op.remote_node is None, \
902
        "Evacuating primary instances requires iallocator"
903
    elif self.op.mode == constants.NODE_EVAC_SEC:
904
      # Secondary instances only
905
      inst_fn = _GetNodeSecondaryInstances
906
    else:
907
      # All instances
908
      assert self.op.mode == constants.NODE_EVAC_ALL
909
      inst_fn = _GetNodeInstances
910
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
911
      # per instance
912
      raise errors.OpPrereqError("Due to an issue with the iallocator"
913
                                 " interface it is not possible to evacuate"
914
                                 " all instances at once; specify explicitly"
915
                                 " whether to evacuate primary or secondary"
916
                                 " instances",
917
                                 errors.ECODE_INVAL)
918

    
919
    return inst_fn(self.cfg, self.op.node_uuid)
920

    
921
  def DeclareLocks(self, level):
922
    if level == locking.LEVEL_INSTANCE:
923
      # Lock instances optimistically, needs verification once node and group
924
      # locks have been acquired
925
      self.needed_locks[locking.LEVEL_INSTANCE] = \
926
        set(i.name for i in self._DetermineInstances())
927

    
928
    elif level == locking.LEVEL_NODEGROUP:
929
      # Lock node groups for all potential target nodes optimistically, needs
930
      # verification once nodes have been acquired
931
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
932
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
933

    
934
    elif level == locking.LEVEL_NODE:
935
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
936

    
937
  def CheckPrereq(self):
938
    # Verify locks
939
    owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
940
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
941
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
942

    
943
    need_nodes = self._DetermineNodes()
944

    
945
    if not owned_nodes.issuperset(need_nodes):
946
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
947
                                 " locks were acquired, current nodes are"
948
                                 " are '%s', used to be '%s'; retry the"
949
                                 " operation" %
950
                                 (self.op.node_name,
951
                                  utils.CommaJoin(need_nodes),
952
                                  utils.CommaJoin(owned_nodes)),
953
                                 errors.ECODE_STATE)
954

    
955
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
956
    if owned_groups != wanted_groups:
957
      raise errors.OpExecError("Node groups changed since locks were acquired,"
958
                               " current groups are '%s', used to be '%s';"
959
                               " retry the operation" %
960
                               (utils.CommaJoin(wanted_groups),
961
                                utils.CommaJoin(owned_groups)))
962

    
963
    # Determine affected instances
964
    self.instances = self._DetermineInstances()
965
    self.instance_names = [i.name for i in self.instances]
966

    
967
    if set(self.instance_names) != owned_instances:
968
      raise errors.OpExecError("Instances on node '%s' changed since locks"
969
                               " were acquired, current instances are '%s',"
970
                               " used to be '%s'; retry the operation" %
971
                               (self.op.node_name,
972
                                utils.CommaJoin(self.instance_names),
973
                                utils.CommaJoin(owned_instances)))
974

    
975
    if self.instance_names:
976
      self.LogInfo("Evacuating instances from node '%s': %s",
977
                   self.op.node_name,
978
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
979
    else:
980
      self.LogInfo("No instances to evacuate from node '%s'",
981
                   self.op.node_name)
982

    
983
    if self.op.remote_node is not None:
984
      for i in self.instances:
985
        if i.primary_node == self.op.remote_node_uuid:
986
          raise errors.OpPrereqError("Node %s is the primary node of"
987
                                     " instance %s, cannot use it as"
988
                                     " secondary" %
989
                                     (self.op.remote_node, i.name),
990
                                     errors.ECODE_INVAL)
991

    
992
  def Exec(self, feedback_fn):
993
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
994

    
995
    if not self.instance_names:
996
      # No instances to evacuate
997
      jobs = []
998

    
999
    elif self.op.iallocator is not None:
1000
      # TODO: Implement relocation to other group
1001
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1002
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1003
                                     instances=list(self.instance_names))
1004
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1005

    
1006
      ial.Run(self.op.iallocator)
1007

    
1008
      if not ial.success:
1009
        raise errors.OpPrereqError("Can't compute node evacuation using"
1010
                                   " iallocator '%s': %s" %
1011
                                   (self.op.iallocator, ial.info),
1012
                                   errors.ECODE_NORES)
1013

    
1014
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1015

    
1016
    elif self.op.remote_node is not None:
1017
      assert self.op.mode == constants.NODE_EVAC_SEC
1018
      jobs = [
1019
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1020
                                        remote_node=self.op.remote_node,
1021
                                        disks=[],
1022
                                        mode=constants.REPLACE_DISK_CHG,
1023
                                        early_release=self.op.early_release)]
1024
        for instance_name in self.instance_names]
1025

    
1026
    else:
1027
      raise errors.ProgrammerError("No iallocator or remote node")
1028

    
1029
    return ResultWithJobs(jobs)
1030

    
1031

    
1032
class LUNodeMigrate(LogicalUnit):
1033
  """Migrate all instances from a node.
1034

1035
  """
1036
  HPATH = "node-migrate"
1037
  HTYPE = constants.HTYPE_NODE
1038
  REQ_BGL = False
1039

    
1040
  def CheckArguments(self):
1041
    pass
1042

    
1043
  def ExpandNames(self):
1044
    (self.op.node_uuid, self.op.node_name) = \
1045
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1046

    
1047
    self.share_locks = ShareAll()
1048
    self.needed_locks = {
1049
      locking.LEVEL_NODE: [self.op.node_uuid],
1050
      }
1051

    
1052
  def BuildHooksEnv(self):
1053
    """Build hooks env.
1054

1055
    This runs on the master, the primary and all the secondaries.
1056

1057
    """
1058
    return {
1059
      "NODE_NAME": self.op.node_name,
1060
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1061
      }
1062

    
1063
  def BuildHooksNodes(self):
1064
    """Build hooks nodes.
1065

1066
    """
1067
    nl = [self.cfg.GetMasterNode()]
1068
    return (nl, nl)
1069

    
1070
  def CheckPrereq(self):
1071
    pass
1072

    
1073
  def Exec(self, feedback_fn):
1074
    # Prepare jobs for migration instances
1075
    allow_runtime_changes = self.op.allow_runtime_changes
1076
    jobs = [
1077
      [opcodes.OpInstanceMigrate(instance_name=inst.name,
1078
                                 mode=self.op.mode,
1079
                                 live=self.op.live,
1080
                                 iallocator=self.op.iallocator,
1081
                                 target_node=self.op.target_node,
1082
                                 allow_runtime_changes=allow_runtime_changes,
1083
                                 ignore_ipolicy=self.op.ignore_ipolicy)]
1084
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1085

    
1086
    # TODO: Run iallocator in this opcode and pass correct placement options to
1087
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1088
    # running the iallocator and the actual migration, a good consistency model
1089
    # will have to be found.
1090

    
1091
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1092
            frozenset([self.op.node_name]))
1093

    
1094
    return ResultWithJobs(jobs)
1095

    
1096

    
1097
def _GetStorageTypeArgs(cfg, storage_type):
1098
  """Returns the arguments for a storage type.
1099

1100
  """
1101
  # Special case for file storage
1102
  if storage_type == constants.ST_FILE:
1103
    # storage.FileStorage wants a list of storage directories
1104
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1105

    
1106
  return []
1107

    
1108

    
1109
class LUNodeModifyStorage(NoHooksLU):
1110
  """Logical unit for modifying a storage volume on a node.
1111

1112
  """
1113
  REQ_BGL = False
1114

    
1115
  def CheckArguments(self):
1116
    (self.op.node_uuid, self.op.node_name) = \
1117
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1118

    
1119
    storage_type = self.op.storage_type
1120

    
1121
    try:
1122
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1123
    except KeyError:
1124
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1125
                                 " modified" % storage_type,
1126
                                 errors.ECODE_INVAL)
1127

    
1128
    diff = set(self.op.changes.keys()) - modifiable
1129
    if diff:
1130
      raise errors.OpPrereqError("The following fields can not be modified for"
1131
                                 " storage units of type '%s': %r" %
1132
                                 (storage_type, list(diff)),
1133
                                 errors.ECODE_INVAL)
1134

    
1135
  def ExpandNames(self):
1136
    self.needed_locks = {
1137
      locking.LEVEL_NODE: self.op.node_uuid,
1138
      }
1139

    
1140
  def Exec(self, feedback_fn):
1141
    """Computes the list of nodes and their attributes.
1142

1143
    """
1144
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1145
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1146
                                          self.op.storage_type, st_args,
1147
                                          self.op.name, self.op.changes)
1148
    result.Raise("Failed to modify storage unit '%s' on %s" %
1149
                 (self.op.name, self.op.node_name))
1150

    
1151

    
1152
class NodeQuery(QueryBase):
1153
  FIELDS = query.NODE_FIELDS
1154

    
1155
  def ExpandNames(self, lu):
1156
    lu.needed_locks = {}
1157
    lu.share_locks = ShareAll()
1158

    
1159
    if self.names:
1160
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1161
    else:
1162
      self.wanted = locking.ALL_SET
1163

    
1164
    self.do_locking = (self.use_locking and
1165
                       query.NQ_LIVE in self.requested_data)
1166

    
1167
    if self.do_locking:
1168
      # If any non-static field is requested we need to lock the nodes
1169
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1170
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1171

    
1172
  def DeclareLocks(self, lu, level):
1173
    pass
1174

    
1175
  def _GetQueryData(self, lu):
1176
    """Computes the list of nodes and their attributes.
1177

1178
    """
1179
    all_info = lu.cfg.GetAllNodesInfo()
1180

    
1181
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1182

    
1183
    # Gather data as requested
1184
    if query.NQ_LIVE in self.requested_data:
1185
      # filter out non-vm_capable nodes
1186
      toquery_node_uuids = [node.uuid for node in all_info.values()
1187
                            if node.vm_capable and node.uuid in node_uuids]
1188

    
1189
      es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, toquery_node_uuids)
1190
      # FIXME: This currently maps everything to lvm, this should be more
1191
      # flexible
1192
      vg_req = rpc.BuildVgInfoQuery(lu.cfg)
1193
      default_hypervisor = lu.cfg.GetHypervisorType()
1194
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1195
      hvspecs = [(default_hypervisor, hvparams)]
1196
      node_data = lu.rpc.call_node_info(toquery_node_uuids, vg_req,
1197
                                        hvspecs, es_flags)
1198
      live_data = dict((uuid, rpc.MakeLegacyNodeInfo(nresult.payload))
1199
                       for (uuid, nresult) in node_data.items()
1200
                       if not nresult.fail_msg and nresult.payload)
1201
    else:
1202
      live_data = None
1203

    
1204
    if query.NQ_INST in self.requested_data:
1205
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1206
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1207

    
1208
      inst_data = lu.cfg.GetAllInstancesInfo()
1209

    
1210
      for inst in inst_data.values():
1211
        if inst.primary_node in node_to_primary:
1212
          node_to_primary[inst.primary_node].add(inst.name)
1213
        for secnode in inst.secondary_nodes:
1214
          if secnode in node_to_secondary:
1215
            node_to_secondary[secnode].add(inst.name)
1216
    else:
1217
      node_to_primary = None
1218
      node_to_secondary = None
1219

    
1220
    if query.NQ_OOB in self.requested_data:
1221
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1222
                         for uuid, node in all_info.iteritems())
1223
    else:
1224
      oob_support = None
1225

    
1226
    if query.NQ_GROUP in self.requested_data:
1227
      groups = lu.cfg.GetAllNodeGroupsInfo()
1228
    else:
1229
      groups = {}
1230

    
1231
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1232
                               live_data, lu.cfg.GetMasterNode(),
1233
                               node_to_primary, node_to_secondary, groups,
1234
                               oob_support, lu.cfg.GetClusterInfo())
1235

    
1236

    
1237
class LUNodeQuery(NoHooksLU):
1238
  """Logical unit for querying nodes.
1239

1240
  """
1241
  # pylint: disable=W0142
1242
  REQ_BGL = False
1243

    
1244
  def CheckArguments(self):
1245
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1246
                         self.op.output_fields, self.op.use_locking)
1247

    
1248
  def ExpandNames(self):
1249
    self.nq.ExpandNames(self)
1250

    
1251
  def DeclareLocks(self, level):
1252
    self.nq.DeclareLocks(self, level)
1253

    
1254
  def Exec(self, feedback_fn):
1255
    return self.nq.OldStyleQuery(self)
1256

    
1257

    
1258
def _CheckOutputFields(static, dynamic, selected):
1259
  """Checks whether all selected fields are valid.
1260

1261
  @type static: L{utils.FieldSet}
1262
  @param static: static fields set
1263
  @type dynamic: L{utils.FieldSet}
1264
  @param dynamic: dynamic fields set
1265

1266
  """
1267
  f = utils.FieldSet()
1268
  f.Extend(static)
1269
  f.Extend(dynamic)
1270

    
1271
  delta = f.NonMatching(selected)
1272
  if delta:
1273
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1274
                               % ",".join(delta), errors.ECODE_INVAL)
1275

    
1276

    
1277
class LUNodeQueryvols(NoHooksLU):
1278
  """Logical unit for getting volumes on node(s).
1279

1280
  """
1281
  REQ_BGL = False
1282
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1283
  _FIELDS_STATIC = utils.FieldSet("node")
1284

    
1285
  def CheckArguments(self):
1286
    _CheckOutputFields(static=self._FIELDS_STATIC,
1287
                       dynamic=self._FIELDS_DYNAMIC,
1288
                       selected=self.op.output_fields)
1289

    
1290
  def ExpandNames(self):
1291
    self.share_locks = ShareAll()
1292

    
1293
    if self.op.nodes:
1294
      self.needed_locks = {
1295
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1296
        }
1297
    else:
1298
      self.needed_locks = {
1299
        locking.LEVEL_NODE: locking.ALL_SET,
1300
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1301
        }
1302

    
1303
  def Exec(self, feedback_fn):
1304
    """Computes the list of nodes and their attributes.
1305

1306
    """
1307
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1308
    volumes = self.rpc.call_node_volumes(node_uuids)
1309

    
1310
    ilist = self.cfg.GetAllInstancesInfo()
1311
    vol2inst = MapInstanceDisksToNodes(ilist.values())
1312

    
1313
    output = []
1314
    for node_uuid in node_uuids:
1315
      nresult = volumes[node_uuid]
1316
      if nresult.offline:
1317
        continue
1318
      msg = nresult.fail_msg
1319
      if msg:
1320
        self.LogWarning("Can't compute volume data on node %s: %s",
1321
                        self.cfg.GetNodeName(node_uuid), msg)
1322
        continue
1323

    
1324
      node_vols = sorted(nresult.payload,
1325
                         key=operator.itemgetter("dev"))
1326

    
1327
      for vol in node_vols:
1328
        node_output = []
1329
        for field in self.op.output_fields:
1330
          if field == "node":
1331
            val = self.cfg.GetNodeName(node_uuid)
1332
          elif field == "phys":
1333
            val = vol["dev"]
1334
          elif field == "vg":
1335
            val = vol["vg"]
1336
          elif field == "name":
1337
            val = vol["name"]
1338
          elif field == "size":
1339
            val = int(float(vol["size"]))
1340
          elif field == "instance":
1341
            val = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]), "-")
1342
          else:
1343
            raise errors.ParameterError(field)
1344
          node_output.append(str(val))
1345

    
1346
        output.append(node_output)
1347

    
1348
    return output
1349

    
1350

    
1351
class LUNodeQueryStorage(NoHooksLU):
1352
  """Logical unit for getting information on storage units on node(s).
1353

1354
  """
1355
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1356
  REQ_BGL = False
1357

    
1358
  def CheckArguments(self):
1359
    _CheckOutputFields(static=self._FIELDS_STATIC,
1360
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1361
                       selected=self.op.output_fields)
1362

    
1363
  def ExpandNames(self):
1364
    self.share_locks = ShareAll()
1365

    
1366
    if self.op.nodes:
1367
      self.needed_locks = {
1368
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1369
        }
1370
    else:
1371
      self.needed_locks = {
1372
        locking.LEVEL_NODE: locking.ALL_SET,
1373
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1374
        }
1375

    
1376
  def Exec(self, feedback_fn):
1377
    """Computes the list of nodes and their attributes.
1378

1379
    """
1380
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1381

    
1382
    # Always get name to sort by
1383
    if constants.SF_NAME in self.op.output_fields:
1384
      fields = self.op.output_fields[:]
1385
    else:
1386
      fields = [constants.SF_NAME] + self.op.output_fields
1387

    
1388
    # Never ask for node or type as it's only known to the LU
1389
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1390
      while extra in fields:
1391
        fields.remove(extra)
1392

    
1393
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1394
    name_idx = field_idx[constants.SF_NAME]
1395

    
1396
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1397
    data = self.rpc.call_storage_list(self.node_uuids,
1398
                                      self.op.storage_type, st_args,
1399
                                      self.op.name, fields)
1400

    
1401
    result = []
1402

    
1403
    for node_uuid in utils.NiceSort(self.node_uuids):
1404
      node_name = self.cfg.GetNodeName(node_uuid)
1405
      nresult = data[node_uuid]
1406
      if nresult.offline:
1407
        continue
1408

    
1409
      msg = nresult.fail_msg
1410
      if msg:
1411
        self.LogWarning("Can't get storage data from node %s: %s",
1412
                        node_name, msg)
1413
        continue
1414

    
1415
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1416

    
1417
      for name in utils.NiceSort(rows.keys()):
1418
        row = rows[name]
1419

    
1420
        out = []
1421

    
1422
        for field in self.op.output_fields:
1423
          if field == constants.SF_NODE:
1424
            val = node_name
1425
          elif field == constants.SF_TYPE:
1426
            val = self.op.storage_type
1427
          elif field in field_idx:
1428
            val = row[field_idx[field]]
1429
          else:
1430
            raise errors.ParameterError(field)
1431

    
1432
          out.append(val)
1433

    
1434
        result.append(out)
1435

    
1436
    return result
1437

    
1438

    
1439
class LUNodeRemove(LogicalUnit):
1440
  """Logical unit for removing a node.
1441

1442
  """
1443
  HPATH = "node-remove"
1444
  HTYPE = constants.HTYPE_NODE
1445

    
1446
  def BuildHooksEnv(self):
1447
    """Build hooks env.
1448

1449
    """
1450
    return {
1451
      "OP_TARGET": self.op.node_name,
1452
      "NODE_NAME": self.op.node_name,
1453
      }
1454

    
1455
  def BuildHooksNodes(self):
1456
    """Build hooks nodes.
1457

1458
    This doesn't run on the target node in the pre phase as a failed
1459
    node would then be impossible to remove.
1460

1461
    """
1462
    all_nodes = self.cfg.GetNodeList()
1463
    try:
1464
      all_nodes.remove(self.op.node_uuid)
1465
    except ValueError:
1466
      pass
1467
    return (all_nodes, all_nodes)
1468

    
1469
  def CheckPrereq(self):
1470
    """Check prerequisites.
1471

1472
    This checks:
1473
     - the node exists in the configuration
1474
     - it does not have primary or secondary instances
1475
     - it's not the master
1476

1477
    Any errors are signaled by raising errors.OpPrereqError.
1478

1479
    """
1480
    (self.op.node_uuid, self.op.node_name) = \
1481
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1482
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1483
    assert node is not None
1484

    
1485
    masternode = self.cfg.GetMasterNode()
1486
    if node.uuid == masternode:
1487
      raise errors.OpPrereqError("Node is the master node, failover to another"
1488
                                 " node is required", errors.ECODE_INVAL)
1489

    
1490
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1491
      if node.uuid in instance.all_nodes:
1492
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1493
                                   " please remove first" % instance_name,
1494
                                   errors.ECODE_INVAL)
1495
    self.op.node_name = node.name
1496
    self.node = node
1497

    
1498
  def Exec(self, feedback_fn):
1499
    """Removes the node from the cluster.
1500

1501
    """
1502
    node = self.node
1503
    logging.info("Stopping the node daemon and removing configs from node %s",
1504
                 node.name)
1505

    
1506
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1507

    
1508
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1509
      "Not owning BGL"
1510

    
1511
    # Promote nodes to master candidate as needed
1512
    AdjustCandidatePool(self, exceptions=[node.uuid])
1513
    self.context.RemoveNode(node)
1514

    
1515
    # Run post hooks on the node before it's removed
1516
    RunPostHook(self, node.name)
1517

    
1518
    # we have to call this by name rather than by UUID, as the node is no longer
1519
    # in the config
1520
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1521
    msg = result.fail_msg
1522
    if msg:
1523
      self.LogWarning("Errors encountered on the remote node while leaving"
1524
                      " the cluster: %s", msg)
1525

    
1526
    # Remove node from our /etc/hosts
1527
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1528
      master_node_uuid = self.cfg.GetMasterNode()
1529
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1530
                                              constants.ETC_HOSTS_REMOVE,
1531
                                              node.name, None)
1532
      result.Raise("Can't update hosts file with new host data")
1533
      RedistributeAncillaryFiles(self)
1534

    
1535

    
1536
class LURepairNodeStorage(NoHooksLU):
1537
  """Repairs the volume group on a node.
1538

1539
  """
1540
  REQ_BGL = False
1541

    
1542
  def CheckArguments(self):
1543
    (self.op.node_uuid, self.op.node_name) = \
1544
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1545

    
1546
    storage_type = self.op.storage_type
1547

    
1548
    if (constants.SO_FIX_CONSISTENCY not in
1549
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1550
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1551
                                 " repaired" % storage_type,
1552
                                 errors.ECODE_INVAL)
1553

    
1554
  def ExpandNames(self):
1555
    self.needed_locks = {
1556
      locking.LEVEL_NODE: [self.op.node_uuid],
1557
      }
1558

    
1559
  def _CheckFaultyDisks(self, instance, node_uuid):
1560
    """Ensure faulty disks abort the opcode or at least warn."""
1561
    try:
1562
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1563
                                 node_uuid, True):
1564
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1565
                                   " node '%s'" %
1566
                                   (instance.name,
1567
                                    self.cfg.GetNodeName(node_uuid)),
1568
                                   errors.ECODE_STATE)
1569
    except errors.OpPrereqError, err:
1570
      if self.op.ignore_consistency:
1571
        self.LogWarning(str(err.args[0]))
1572
      else:
1573
        raise
1574

    
1575
  def CheckPrereq(self):
1576
    """Check prerequisites.
1577

1578
    """
1579
    # Check whether any instance on this node has faulty disks
1580
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1581
      if not inst.disks_active:
1582
        continue
1583
      check_nodes = set(inst.all_nodes)
1584
      check_nodes.discard(self.op.node_uuid)
1585
      for inst_node_uuid in check_nodes:
1586
        self._CheckFaultyDisks(inst, inst_node_uuid)
1587

    
1588
  def Exec(self, feedback_fn):
1589
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1590
                (self.op.name, self.op.node_name))
1591

    
1592
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1593
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1594
                                           self.op.storage_type, st_args,
1595
                                           self.op.name,
1596
                                           constants.SO_FIX_CONSISTENCY)
1597
    result.Raise("Failed to repair storage unit '%s' on %s" %
1598
                 (self.op.name, self.op.node_name))