Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 9d276e93

History | View | Annotate | Download (59.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
142

    
143
  def CheckPrereq(self):
144
    """Check prerequisites.
145

146
    This checks:
147
     - the new node is not already in the config
148
     - it is resolvable
149
     - its parameters (single/dual homed) matches the cluster
150

151
    Any errors are signaled by raising errors.OpPrereqError.
152

153
    """
154
    node_name = self.hostname.name
155
    self.op.primary_ip = self.hostname.ip
156
    if self.op.secondary_ip is None:
157
      if self.primary_ip_family == netutils.IP6Address.family:
158
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159
                                   " IPv4 address must be given as secondary",
160
                                   errors.ECODE_INVAL)
161
      self.op.secondary_ip = self.op.primary_ip
162

    
163
    secondary_ip = self.op.secondary_ip
164
    if not netutils.IP4Address.IsValid(secondary_ip):
165
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166
                                 " address" % secondary_ip, errors.ECODE_INVAL)
167

    
168
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169
    if not self.op.readd and existing_node_info is not None:
170
      raise errors.OpPrereqError("Node %s is already in the configuration" %
171
                                 node_name, errors.ECODE_EXISTS)
172
    elif self.op.readd and existing_node_info is None:
173
      raise errors.OpPrereqError("Node %s is not in the configuration" %
174
                                 node_name, errors.ECODE_NOENT)
175

    
176
    self.changed_primary_ip = False
177

    
178
    for existing_node in self.cfg.GetAllNodesInfo().values():
179
      if self.op.readd and node_name == existing_node.name:
180
        if existing_node.secondary_ip != secondary_ip:
181
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
182
                                     " address configuration as before",
183
                                     errors.ECODE_INVAL)
184
        if existing_node.primary_ip != self.op.primary_ip:
185
          self.changed_primary_ip = True
186

    
187
        continue
188

    
189
      if (existing_node.primary_ip == self.op.primary_ip or
190
          existing_node.secondary_ip == self.op.primary_ip or
191
          existing_node.primary_ip == secondary_ip or
192
          existing_node.secondary_ip == secondary_ip):
193
        raise errors.OpPrereqError("New node ip address(es) conflict with"
194
                                   " existing node %s" % existing_node.name,
195
                                   errors.ECODE_NOTUNIQUE)
196

    
197
    # After this 'if' block, None is no longer a valid value for the
198
    # _capable op attributes
199
    if self.op.readd:
200
      assert existing_node_info is not None, \
201
        "Can't retrieve locked node %s" % node_name
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, getattr(existing_node_info, attr))
205
    else:
206
      for attr in self._NFLAGS:
207
        if getattr(self.op, attr) is None:
208
          setattr(self.op, attr, True)
209

    
210
    if self.op.readd and not self.op.vm_capable:
211
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212
      if pri or sec:
213
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214
                                   " flag set to false, but it already holds"
215
                                   " instances" % node_name,
216
                                   errors.ECODE_STATE)
217

    
218
    # check that the type of the node (single versus dual homed) is the
219
    # same as for the master
220
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221
    master_singlehomed = myself.secondary_ip == myself.primary_ip
222
    newbie_singlehomed = secondary_ip == self.op.primary_ip
223
    if master_singlehomed != newbie_singlehomed:
224
      if master_singlehomed:
225
        raise errors.OpPrereqError("The master has no secondary ip but the"
226
                                   " new node has one",
227
                                   errors.ECODE_INVAL)
228
      else:
229
        raise errors.OpPrereqError("The master has a secondary ip but the"
230
                                   " new node doesn't have one",
231
                                   errors.ECODE_INVAL)
232

    
233
    # checks reachability
234
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235
      raise errors.OpPrereqError("Node not reachable by ping",
236
                                 errors.ECODE_ENVIRON)
237

    
238
    if not newbie_singlehomed:
239
      # check reachability from my secondary ip to newbie's secondary ip
240
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241
                              source=myself.secondary_ip):
242
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243
                                   " based ping to node daemon port",
244
                                   errors.ECODE_ENVIRON)
245

    
246
    if self.op.readd:
247
      exceptions = [existing_node_info.uuid]
248
    else:
249
      exceptions = []
250

    
251
    if self.op.master_capable:
252
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253
    else:
254
      self.master_candidate = False
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      node_group = self.cfg.LookupNodeGroup(self.op.group)
260
      self.new_node = objects.Node(name=node_name,
261
                                   primary_ip=self.op.primary_ip,
262
                                   secondary_ip=secondary_ip,
263
                                   master_candidate=self.master_candidate,
264
                                   offline=False, drained=False,
265
                                   group=node_group, ndparams={})
266

    
267
    if self.op.ndparams:
268
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270
                           "node", "cluster or group")
271

    
272
    if self.op.hv_state:
273
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274

    
275
    if self.op.disk_state:
276
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277

    
278
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279
    #       it a property on the base class.
280
    rpcrunner = rpc.DnsOnlyRunner()
281
    result = rpcrunner.call_version([node_name])[node_name]
282
    result.Raise("Can't get version information from node %s" % node_name)
283
    if constants.PROTOCOL_VERSION == result.payload:
284
      logging.info("Communication to node %s fine, sw version %s match",
285
                   node_name, result.payload)
286
    else:
287
      raise errors.OpPrereqError("Version mismatch master version %s,"
288
                                 " node version %s" %
289
                                 (constants.PROTOCOL_VERSION, result.payload),
290
                                 errors.ECODE_ENVIRON)
291

    
292
    vg_name = self.cfg.GetVGName()
293
    if vg_name is not None:
294
      vparams = {constants.NV_PVLIST: [vg_name]}
295
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296
      cname = self.cfg.GetClusterName()
297
      result = rpcrunner.call_node_verify_light(
298
          [node_name], vparams, cname,
299
          self.cfg.GetClusterInfo().hvparams)[node_name]
300
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301
      if errmsgs:
302
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
304

    
305
  def Exec(self, feedback_fn):
306
    """Adds the new node to the cluster.
307

308
    """
309
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310
      "Not owning BGL"
311

    
312
    # We adding a new node so we assume it's powered
313
    self.new_node.powered = True
314

    
315
    # for re-adds, reset the offline/drained/master-candidate flags;
316
    # we need to reset here, otherwise offline would prevent RPC calls
317
    # later in the procedure; this also means that if the re-add
318
    # fails, we are left with a non-offlined, broken node
319
    if self.op.readd:
320
      self.new_node.drained = False
321
      self.LogInfo("Readding a node, the offline/drained flags were reset")
322
      # if we demote the node, we do cleanup later in the procedure
323
      self.new_node.master_candidate = self.master_candidate
324
      if self.changed_primary_ip:
325
        self.new_node.primary_ip = self.op.primary_ip
326

    
327
    # copy the master/vm_capable flags
328
    for attr in self._NFLAGS:
329
      setattr(self.new_node, attr, getattr(self.op, attr))
330

    
331
    # notify the user about any possible mc promotion
332
    if self.new_node.master_candidate:
333
      self.LogInfo("Node will be a master candidate")
334

    
335
    if self.op.ndparams:
336
      self.new_node.ndparams = self.op.ndparams
337
    else:
338
      self.new_node.ndparams = {}
339

    
340
    if self.op.hv_state:
341
      self.new_node.hv_state_static = self.new_hv_state
342

    
343
    if self.op.disk_state:
344
      self.new_node.disk_state_static = self.new_disk_state
345

    
346
    # Add node to our /etc/hosts, and add key to known_hosts
347
    if self.cfg.GetClusterInfo().modify_etc_hosts:
348
      master_node = self.cfg.GetMasterNode()
349
      result = self.rpc.call_etc_hosts_modify(
350
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
351
                 self.hostname.ip)
352
      result.Raise("Can't update hosts file with new host data")
353

    
354
    if self.new_node.secondary_ip != self.new_node.primary_ip:
355
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
356
                               False)
357

    
358
    node_verifier_uuids = [self.cfg.GetMasterNode()]
359
    node_verify_param = {
360
      constants.NV_NODELIST: ([self.new_node.name], {}),
361
      # TODO: do a node-net-test as well?
362
    }
363

    
364
    result = self.rpc.call_node_verify(
365
               node_verifier_uuids, node_verify_param,
366
               self.cfg.GetClusterName(),
367
               self.cfg.GetClusterInfo().hvparams)
368
    for verifier in node_verifier_uuids:
369
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
370
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
371
      if nl_payload:
372
        for failed in nl_payload:
373
          feedback_fn("ssh/hostname verification failed"
374
                      " (checking from %s): %s" %
375
                      (verifier, nl_payload[failed]))
376
        raise errors.OpExecError("ssh/hostname verification failed")
377

    
378
    if self.op.readd:
379
      self.context.ReaddNode(self.new_node)
380
      RedistributeAncillaryFiles(self)
381
      # make sure we redistribute the config
382
      self.cfg.Update(self.new_node, feedback_fn)
383
      # and make sure the new node will not have old files around
384
      if not self.new_node.master_candidate:
385
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386
        result.Warn("Node failed to demote itself from master candidate status",
387
                    self.LogWarning)
388
    else:
389
      self.context.AddNode(self.new_node, self.proc.GetECId())
390
      RedistributeAncillaryFiles(self)
391

    
392

    
393
class LUNodeSetParams(LogicalUnit):
394
  """Modifies the parameters of a node.
395

396
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397
      to the node role (as _ROLE_*)
398
  @cvar _R2F: a dictionary from node role to tuples of flags
399
  @cvar _FLAGS: a list of attribute names corresponding to the flags
400

401
  """
402
  HPATH = "node-modify"
403
  HTYPE = constants.HTYPE_NODE
404
  REQ_BGL = False
405
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
406
  _F2R = {
407
    (True, False, False): _ROLE_CANDIDATE,
408
    (False, True, False): _ROLE_DRAINED,
409
    (False, False, True): _ROLE_OFFLINE,
410
    (False, False, False): _ROLE_REGULAR,
411
    }
412
  _R2F = dict((v, k) for k, v in _F2R.items())
413
  _FLAGS = ["master_candidate", "drained", "offline"]
414

    
415
  def CheckArguments(self):
416
    (self.op.node_uuid, self.op.node_name) = \
417
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419
                self.op.master_capable, self.op.vm_capable,
420
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
421
                self.op.disk_state]
422
    if all_mods.count(None) == len(all_mods):
423
      raise errors.OpPrereqError("Please pass at least one modification",
424
                                 errors.ECODE_INVAL)
425
    if all_mods.count(True) > 1:
426
      raise errors.OpPrereqError("Can't set the node into more than one"
427
                                 " state at the same time",
428
                                 errors.ECODE_INVAL)
429

    
430
    # Boolean value that tells us whether we might be demoting from MC
431
    self.might_demote = (self.op.master_candidate is False or
432
                         self.op.offline is True or
433
                         self.op.drained is True or
434
                         self.op.master_capable is False)
435

    
436
    if self.op.secondary_ip:
437
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439
                                   " address" % self.op.secondary_ip,
440
                                   errors.ECODE_INVAL)
441

    
442
    self.lock_all = self.op.auto_promote and self.might_demote
443
    self.lock_instances = self.op.secondary_ip is not None
444

    
445
  def _InstanceFilter(self, instance):
446
    """Filter for getting affected instances.
447

448
    """
449
    return (instance.disk_template in constants.DTS_INT_MIRROR and
450
            self.op.node_uuid in instance.all_nodes)
451

    
452
  def ExpandNames(self):
453
    if self.lock_all:
454
      self.needed_locks = {
455
        locking.LEVEL_NODE: locking.ALL_SET,
456

    
457
        # Block allocations when all nodes are locked
458
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
459
        }
460
    else:
461
      self.needed_locks = {
462
        locking.LEVEL_NODE: self.op.node_uuid,
463
        }
464

    
465
    # Since modifying a node can have severe effects on currently running
466
    # operations the resource lock is at least acquired in shared mode
467
    self.needed_locks[locking.LEVEL_NODE_RES] = \
468
      self.needed_locks[locking.LEVEL_NODE]
469

    
470
    # Get all locks except nodes in shared mode; they are not used for anything
471
    # but read-only access
472
    self.share_locks = ShareAll()
473
    self.share_locks[locking.LEVEL_NODE] = 0
474
    self.share_locks[locking.LEVEL_NODE_RES] = 0
475
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
476

    
477
    if self.lock_instances:
478
      self.needed_locks[locking.LEVEL_INSTANCE] = \
479
        self.cfg.GetInstanceNames(
480
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
481

    
482
  def BuildHooksEnv(self):
483
    """Build hooks env.
484

485
    This runs on the master node.
486

487
    """
488
    return {
489
      "OP_TARGET": self.op.node_name,
490
      "MASTER_CANDIDATE": str(self.op.master_candidate),
491
      "OFFLINE": str(self.op.offline),
492
      "DRAINED": str(self.op.drained),
493
      "MASTER_CAPABLE": str(self.op.master_capable),
494
      "VM_CAPABLE": str(self.op.vm_capable),
495
      }
496

    
497
  def BuildHooksNodes(self):
498
    """Build hooks nodes.
499

500
    """
501
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
502
    return (nl, nl)
503

    
504
  def CheckPrereq(self):
505
    """Check prerequisites.
506

507
    This only checks the instance list against the existing names.
508

509
    """
510
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
511
    if self.lock_instances:
512
      affected_instances = \
513
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
514

    
515
      # Verify instance locks
516
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
517
      wanted_instance_names = frozenset([inst.name for inst in
518
                                         affected_instances.values()])
519
      if wanted_instance_names - owned_instance_names:
520
        raise errors.OpPrereqError("Instances affected by changing node %s's"
521
                                   " secondary IP address have changed since"
522
                                   " locks were acquired, wanted '%s', have"
523
                                   " '%s'; retry the operation" %
524
                                   (node.name,
525
                                    utils.CommaJoin(wanted_instance_names),
526
                                    utils.CommaJoin(owned_instance_names)),
527
                                   errors.ECODE_STATE)
528
    else:
529
      affected_instances = None
530

    
531
    if (self.op.master_candidate is not None or
532
        self.op.drained is not None or
533
        self.op.offline is not None):
534
      # we can't change the master's node flags
535
      if node.uuid == self.cfg.GetMasterNode():
536
        raise errors.OpPrereqError("The master role can be changed"
537
                                   " only via master-failover",
538
                                   errors.ECODE_INVAL)
539

    
540
    if self.op.master_candidate and not node.master_capable:
541
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
542
                                 " it a master candidate" % node.name,
543
                                 errors.ECODE_STATE)
544

    
545
    if self.op.vm_capable is False:
546
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
547
      if ipri or isec:
548
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
549
                                   " the vm_capable flag" % node.name,
550
                                   errors.ECODE_STATE)
551

    
552
    if node.master_candidate and self.might_demote and not self.lock_all:
553
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
554
      # check if after removing the current node, we're missing master
555
      # candidates
556
      (mc_remaining, mc_should, _) = \
557
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
558
      if mc_remaining < mc_should:
559
        raise errors.OpPrereqError("Not enough master candidates, please"
560
                                   " pass auto promote option to allow"
561
                                   " promotion (--auto-promote or RAPI"
562
                                   " auto_promote=True)", errors.ECODE_STATE)
563

    
564
    self.old_flags = old_flags = (node.master_candidate,
565
                                  node.drained, node.offline)
566
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
567
    self.old_role = old_role = self._F2R[old_flags]
568

    
569
    # Check for ineffective changes
570
    for attr in self._FLAGS:
571
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
572
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
573
        setattr(self.op, attr, None)
574

    
575
    # Past this point, any flag change to False means a transition
576
    # away from the respective state, as only real changes are kept
577

    
578
    # TODO: We might query the real power state if it supports OOB
579
    if SupportsOob(self.cfg, node):
580
      if self.op.offline is False and not (node.powered or
581
                                           self.op.powered is True):
582
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
583
                                    " offline status can be reset") %
584
                                   self.op.node_name, errors.ECODE_STATE)
585
    elif self.op.powered is not None:
586
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
587
                                  " as it does not support out-of-band"
588
                                  " handling") % self.op.node_name,
589
                                 errors.ECODE_STATE)
590

    
591
    # If we're being deofflined/drained, we'll MC ourself if needed
592
    if (self.op.drained is False or self.op.offline is False or
593
        (self.op.master_capable and not node.master_capable)):
594
      if _DecideSelfPromotion(self):
595
        self.op.master_candidate = True
596
        self.LogInfo("Auto-promoting node to master candidate")
597

    
598
    # If we're no longer master capable, we'll demote ourselves from MC
599
    if self.op.master_capable is False and node.master_candidate:
600
      self.LogInfo("Demoting from master candidate")
601
      self.op.master_candidate = False
602

    
603
    # Compute new role
604
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
605
    if self.op.master_candidate:
606
      new_role = self._ROLE_CANDIDATE
607
    elif self.op.drained:
608
      new_role = self._ROLE_DRAINED
609
    elif self.op.offline:
610
      new_role = self._ROLE_OFFLINE
611
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
612
      # False is still in new flags, which means we're un-setting (the
613
      # only) True flag
614
      new_role = self._ROLE_REGULAR
615
    else: # no new flags, nothing, keep old role
616
      new_role = old_role
617

    
618
    self.new_role = new_role
619

    
620
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
621
      # Trying to transition out of offline status
622
      result = self.rpc.call_version([node.uuid])[node.uuid]
623
      if result.fail_msg:
624
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
625
                                   " to report its version: %s" %
626
                                   (node.name, result.fail_msg),
627
                                   errors.ECODE_STATE)
628
      else:
629
        self.LogWarning("Transitioning node from offline to online state"
630
                        " without using re-add. Please make sure the node"
631
                        " is healthy!")
632

    
633
    # When changing the secondary ip, verify if this is a single-homed to
634
    # multi-homed transition or vice versa, and apply the relevant
635
    # restrictions.
636
    if self.op.secondary_ip:
637
      # Ok even without locking, because this can't be changed by any LU
638
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
639
      master_singlehomed = master.secondary_ip == master.primary_ip
640
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
641
        if self.op.force and node.uuid == master.uuid:
642
          self.LogWarning("Transitioning from single-homed to multi-homed"
643
                          " cluster; all nodes will require a secondary IP"
644
                          " address")
645
        else:
646
          raise errors.OpPrereqError("Changing the secondary ip on a"
647
                                     " single-homed cluster requires the"
648
                                     " --force option to be passed, and the"
649
                                     " target node to be the master",
650
                                     errors.ECODE_INVAL)
651
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
652
        if self.op.force and node.uuid == master.uuid:
653
          self.LogWarning("Transitioning from multi-homed to single-homed"
654
                          " cluster; secondary IP addresses will have to be"
655
                          " removed")
656
        else:
657
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
658
                                     " same as the primary IP on a multi-homed"
659
                                     " cluster, unless the --force option is"
660
                                     " passed, and the target node is the"
661
                                     " master", errors.ECODE_INVAL)
662

    
663
      assert not (set([inst.name for inst in affected_instances.values()]) -
664
                  self.owned_locks(locking.LEVEL_INSTANCE))
665

    
666
      if node.offline:
667
        if affected_instances:
668
          msg = ("Cannot change secondary IP address: offline node has"
669
                 " instances (%s) configured to use it" %
670
                 utils.CommaJoin(
671
                   [inst.name for inst in affected_instances.values()]))
672
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
673
      else:
674
        # On online nodes, check that no instances are running, and that
675
        # the node has the new ip and we can reach it.
676
        for instance in affected_instances.values():
677
          CheckInstanceState(self, instance, INSTANCE_DOWN,
678
                             msg="cannot change secondary ip")
679

    
680
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681
        if master.uuid != node.uuid:
682
          # check reachability from master secondary ip to new secondary ip
683
          if not netutils.TcpPing(self.op.secondary_ip,
684
                                  constants.DEFAULT_NODED_PORT,
685
                                  source=master.secondary_ip):
686
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687
                                       " based ping to node daemon port",
688
                                       errors.ECODE_ENVIRON)
689

    
690
    if self.op.ndparams:
691
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694
                           "node", "cluster or group")
695
      self.new_ndparams = new_ndparams
696

    
697
    if self.op.hv_state:
698
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699
                                                node.hv_state_static)
700

    
701
    if self.op.disk_state:
702
      self.new_disk_state = \
703
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
704

    
705
  def Exec(self, feedback_fn):
706
    """Modifies a node.
707

708
    """
709
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
710
    result = []
711

    
712
    if self.op.ndparams:
713
      node.ndparams = self.new_ndparams
714

    
715
    if self.op.powered is not None:
716
      node.powered = self.op.powered
717

    
718
    if self.op.hv_state:
719
      node.hv_state_static = self.new_hv_state
720

    
721
    if self.op.disk_state:
722
      node.disk_state_static = self.new_disk_state
723

    
724
    for attr in ["master_capable", "vm_capable"]:
725
      val = getattr(self.op, attr)
726
      if val is not None:
727
        setattr(node, attr, val)
728
        result.append((attr, str(val)))
729

    
730
    if self.new_role != self.old_role:
731
      # Tell the node to demote itself, if no longer MC and not offline
732
      if self.old_role == self._ROLE_CANDIDATE and \
733
          self.new_role != self._ROLE_OFFLINE:
734
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
735
        if msg:
736
          self.LogWarning("Node failed to demote itself: %s", msg)
737

    
738
      new_flags = self._R2F[self.new_role]
739
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
740
        if of != nf:
741
          result.append((desc, str(nf)))
742
      (node.master_candidate, node.drained, node.offline) = new_flags
743

    
744
      # we locked all nodes, we adjust the CP before updating this node
745
      if self.lock_all:
746
        AdjustCandidatePool(self, [node.uuid])
747

    
748
    if self.op.secondary_ip:
749
      node.secondary_ip = self.op.secondary_ip
750
      result.append(("secondary_ip", self.op.secondary_ip))
751

    
752
    # this will trigger configuration file update, if needed
753
    self.cfg.Update(node, feedback_fn)
754

    
755
    # this will trigger job queue propagation or cleanup if the mc
756
    # flag changed
757
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
758
      self.context.ReaddNode(node)
759

    
760
    return result
761

    
762

    
763
class LUNodePowercycle(NoHooksLU):
764
  """Powercycles a node.
765

766
  """
767
  REQ_BGL = False
768

    
769
  def CheckArguments(self):
770
    (self.op.node_uuid, self.op.node_name) = \
771
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
772

    
773
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
774
      raise errors.OpPrereqError("The node is the master and the force"
775
                                 " parameter was not set",
776
                                 errors.ECODE_INVAL)
777

    
778
  def ExpandNames(self):
779
    """Locking for PowercycleNode.
780

781
    This is a last-resort option and shouldn't block on other
782
    jobs. Therefore, we grab no locks.
783

784
    """
785
    self.needed_locks = {}
786

    
787
  def Exec(self, feedback_fn):
788
    """Reboots a node.
789

790
    """
791
    default_hypervisor = self.cfg.GetHypervisorType()
792
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
793
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
794
                                           default_hypervisor,
795
                                           hvparams)
796
    result.Raise("Failed to schedule the reboot")
797
    return result.payload
798

    
799

    
800
def _GetNodeInstancesInner(cfg, fn):
801
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
802

    
803

    
804
def _GetNodePrimaryInstances(cfg, node_uuid):
805
  """Returns primary instances on a node.
806

807
  """
808
  return _GetNodeInstancesInner(cfg,
809
                                lambda inst: node_uuid == inst.primary_node)
810

    
811

    
812
def _GetNodeSecondaryInstances(cfg, node_uuid):
813
  """Returns secondary instances on a node.
814

815
  """
816
  return _GetNodeInstancesInner(cfg,
817
                                lambda inst: node_uuid in inst.secondary_nodes)
818

    
819

    
820
def _GetNodeInstances(cfg, node_uuid):
821
  """Returns a list of all primary and secondary instances on a node.
822

823
  """
824

    
825
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
826

    
827

    
828
class LUNodeEvacuate(NoHooksLU):
829
  """Evacuates instances off a list of nodes.
830

831
  """
832
  REQ_BGL = False
833

    
834
  _MODE2IALLOCATOR = {
835
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
836
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
837
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
838
    }
839
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
840
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
841
          constants.IALLOCATOR_NEVAC_MODES)
842

    
843
  def CheckArguments(self):
844
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
845

    
846
  def ExpandNames(self):
847
    (self.op.node_uuid, self.op.node_name) = \
848
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
849

    
850
    if self.op.remote_node is not None:
851
      (self.op.remote_node_uuid, self.op.remote_node) = \
852
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
853
                              self.op.remote_node)
854
      assert self.op.remote_node
855

    
856
      if self.op.node_uuid == self.op.remote_node_uuid:
857
        raise errors.OpPrereqError("Can not use evacuated node as a new"
858
                                   " secondary node", errors.ECODE_INVAL)
859

    
860
      if self.op.mode != constants.NODE_EVAC_SEC:
861
        raise errors.OpPrereqError("Without the use of an iallocator only"
862
                                   " secondary instances can be evacuated",
863
                                   errors.ECODE_INVAL)
864

    
865
    # Declare locks
866
    self.share_locks = ShareAll()
867
    self.needed_locks = {
868
      locking.LEVEL_INSTANCE: [],
869
      locking.LEVEL_NODEGROUP: [],
870
      locking.LEVEL_NODE: [],
871
      }
872

    
873
    # Determine nodes (via group) optimistically, needs verification once locks
874
    # have been acquired
875
    self.lock_nodes = self._DetermineNodes()
876

    
877
  def _DetermineNodes(self):
878
    """Gets the list of node UUIDs to operate on.
879

880
    """
881
    if self.op.remote_node is None:
882
      # Iallocator will choose any node(s) in the same group
883
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
884
    else:
885
      group_nodes = frozenset([self.op.remote_node_uuid])
886

    
887
    # Determine nodes to be locked
888
    return set([self.op.node_uuid]) | group_nodes
889

    
890
  def _DetermineInstances(self):
891
    """Builds list of instances to operate on.
892

893
    """
894
    assert self.op.mode in constants.NODE_EVAC_MODES
895

    
896
    if self.op.mode == constants.NODE_EVAC_PRI:
897
      # Primary instances only
898
      inst_fn = _GetNodePrimaryInstances
899
      assert self.op.remote_node is None, \
900
        "Evacuating primary instances requires iallocator"
901
    elif self.op.mode == constants.NODE_EVAC_SEC:
902
      # Secondary instances only
903
      inst_fn = _GetNodeSecondaryInstances
904
    else:
905
      # All instances
906
      assert self.op.mode == constants.NODE_EVAC_ALL
907
      inst_fn = _GetNodeInstances
908
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
909
      # per instance
910
      raise errors.OpPrereqError("Due to an issue with the iallocator"
911
                                 " interface it is not possible to evacuate"
912
                                 " all instances at once; specify explicitly"
913
                                 " whether to evacuate primary or secondary"
914
                                 " instances",
915
                                 errors.ECODE_INVAL)
916

    
917
    return inst_fn(self.cfg, self.op.node_uuid)
918

    
919
  def DeclareLocks(self, level):
920
    if level == locking.LEVEL_INSTANCE:
921
      # Lock instances optimistically, needs verification once node and group
922
      # locks have been acquired
923
      self.needed_locks[locking.LEVEL_INSTANCE] = \
924
        set(i.name for i in self._DetermineInstances())
925

    
926
    elif level == locking.LEVEL_NODEGROUP:
927
      # Lock node groups for all potential target nodes optimistically, needs
928
      # verification once nodes have been acquired
929
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
930
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
931

    
932
    elif level == locking.LEVEL_NODE:
933
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
934

    
935
  def CheckPrereq(self):
936
    # Verify locks
937
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
938
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
939
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
940

    
941
    need_nodes = self._DetermineNodes()
942

    
943
    if not owned_nodes.issuperset(need_nodes):
944
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
945
                                 " locks were acquired, current nodes are"
946
                                 " are '%s', used to be '%s'; retry the"
947
                                 " operation" %
948
                                 (self.op.node_name,
949
                                  utils.CommaJoin(need_nodes),
950
                                  utils.CommaJoin(owned_nodes)),
951
                                 errors.ECODE_STATE)
952

    
953
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
954
    if owned_groups != wanted_groups:
955
      raise errors.OpExecError("Node groups changed since locks were acquired,"
956
                               " current groups are '%s', used to be '%s';"
957
                               " retry the operation" %
958
                               (utils.CommaJoin(wanted_groups),
959
                                utils.CommaJoin(owned_groups)))
960

    
961
    # Determine affected instances
962
    self.instances = self._DetermineInstances()
963
    self.instance_names = [i.name for i in self.instances]
964

    
965
    if set(self.instance_names) != owned_instance_names:
966
      raise errors.OpExecError("Instances on node '%s' changed since locks"
967
                               " were acquired, current instances are '%s',"
968
                               " used to be '%s'; retry the operation" %
969
                               (self.op.node_name,
970
                                utils.CommaJoin(self.instance_names),
971
                                utils.CommaJoin(owned_instance_names)))
972

    
973
    if self.instance_names:
974
      self.LogInfo("Evacuating instances from node '%s': %s",
975
                   self.op.node_name,
976
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
977
    else:
978
      self.LogInfo("No instances to evacuate from node '%s'",
979
                   self.op.node_name)
980

    
981
    if self.op.remote_node is not None:
982
      for i in self.instances:
983
        if i.primary_node == self.op.remote_node_uuid:
984
          raise errors.OpPrereqError("Node %s is the primary node of"
985
                                     " instance %s, cannot use it as"
986
                                     " secondary" %
987
                                     (self.op.remote_node, i.name),
988
                                     errors.ECODE_INVAL)
989

    
990
  def Exec(self, feedback_fn):
991
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
992

    
993
    if not self.instance_names:
994
      # No instances to evacuate
995
      jobs = []
996

    
997
    elif self.op.iallocator is not None:
998
      # TODO: Implement relocation to other group
999
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1000
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1001
                                     instances=list(self.instance_names))
1002
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1003

    
1004
      ial.Run(self.op.iallocator)
1005

    
1006
      if not ial.success:
1007
        raise errors.OpPrereqError("Can't compute node evacuation using"
1008
                                   " iallocator '%s': %s" %
1009
                                   (self.op.iallocator, ial.info),
1010
                                   errors.ECODE_NORES)
1011

    
1012
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1013

    
1014
    elif self.op.remote_node is not None:
1015
      assert self.op.mode == constants.NODE_EVAC_SEC
1016
      jobs = [
1017
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1018
                                        remote_node=self.op.remote_node,
1019
                                        disks=[],
1020
                                        mode=constants.REPLACE_DISK_CHG,
1021
                                        early_release=self.op.early_release)]
1022
        for instance_name in self.instance_names]
1023

    
1024
    else:
1025
      raise errors.ProgrammerError("No iallocator or remote node")
1026

    
1027
    return ResultWithJobs(jobs)
1028

    
1029

    
1030
class LUNodeMigrate(LogicalUnit):
1031
  """Migrate all instances from a node.
1032

1033
  """
1034
  HPATH = "node-migrate"
1035
  HTYPE = constants.HTYPE_NODE
1036
  REQ_BGL = False
1037

    
1038
  def CheckArguments(self):
1039
    pass
1040

    
1041
  def ExpandNames(self):
1042
    (self.op.node_uuid, self.op.node_name) = \
1043
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1044

    
1045
    self.share_locks = ShareAll()
1046
    self.needed_locks = {
1047
      locking.LEVEL_NODE: [self.op.node_uuid],
1048
      }
1049

    
1050
  def BuildHooksEnv(self):
1051
    """Build hooks env.
1052

1053
    This runs on the master, the primary and all the secondaries.
1054

1055
    """
1056
    return {
1057
      "NODE_NAME": self.op.node_name,
1058
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1059
      }
1060

    
1061
  def BuildHooksNodes(self):
1062
    """Build hooks nodes.
1063

1064
    """
1065
    nl = [self.cfg.GetMasterNode()]
1066
    return (nl, nl)
1067

    
1068
  def CheckPrereq(self):
1069
    pass
1070

    
1071
  def Exec(self, feedback_fn):
1072
    # Prepare jobs for migration instances
1073
    jobs = [
1074
      [opcodes.OpInstanceMigrate(
1075
        instance_name=inst.name,
1076
        mode=self.op.mode,
1077
        live=self.op.live,
1078
        iallocator=self.op.iallocator,
1079
        target_node=self.op.target_node,
1080
        allow_runtime_changes=self.op.allow_runtime_changes,
1081
        ignore_ipolicy=self.op.ignore_ipolicy)]
1082
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1083

    
1084
    # TODO: Run iallocator in this opcode and pass correct placement options to
1085
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1086
    # running the iallocator and the actual migration, a good consistency model
1087
    # will have to be found.
1088

    
1089
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1090
            frozenset([self.op.node_uuid]))
1091

    
1092
    return ResultWithJobs(jobs)
1093

    
1094

    
1095
def _GetStorageTypeArgs(cfg, storage_type):
1096
  """Returns the arguments for a storage type.
1097

1098
  """
1099
  # Special case for file storage
1100
  if storage_type == constants.ST_FILE:
1101
    # storage.FileStorage wants a list of storage directories
1102
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1103

    
1104
  return []
1105

    
1106

    
1107
class LUNodeModifyStorage(NoHooksLU):
1108
  """Logical unit for modifying a storage volume on a node.
1109

1110
  """
1111
  REQ_BGL = False
1112

    
1113
  def CheckArguments(self):
1114
    (self.op.node_uuid, self.op.node_name) = \
1115
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1116

    
1117
    storage_type = self.op.storage_type
1118

    
1119
    try:
1120
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1121
    except KeyError:
1122
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1123
                                 " modified" % storage_type,
1124
                                 errors.ECODE_INVAL)
1125

    
1126
    diff = set(self.op.changes.keys()) - modifiable
1127
    if diff:
1128
      raise errors.OpPrereqError("The following fields can not be modified for"
1129
                                 " storage units of type '%s': %r" %
1130
                                 (storage_type, list(diff)),
1131
                                 errors.ECODE_INVAL)
1132

    
1133
  def CheckPrereq(self):
1134
    """Check prerequisites.
1135

1136
    """
1137
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1138

    
1139
  def ExpandNames(self):
1140
    self.needed_locks = {
1141
      locking.LEVEL_NODE: self.op.node_uuid,
1142
      }
1143

    
1144
  def Exec(self, feedback_fn):
1145
    """Computes the list of nodes and their attributes.
1146

1147
    """
1148
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1149
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1150
                                          self.op.storage_type, st_args,
1151
                                          self.op.name, self.op.changes)
1152
    result.Raise("Failed to modify storage unit '%s' on %s" %
1153
                 (self.op.name, self.op.node_name))
1154

    
1155

    
1156
class NodeQuery(QueryBase):
1157
  FIELDS = query.NODE_FIELDS
1158

    
1159
  def ExpandNames(self, lu):
1160
    lu.needed_locks = {}
1161
    lu.share_locks = ShareAll()
1162

    
1163
    if self.names:
1164
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1165
    else:
1166
      self.wanted = locking.ALL_SET
1167

    
1168
    self.do_locking = (self.use_locking and
1169
                       query.NQ_LIVE in self.requested_data)
1170

    
1171
    if self.do_locking:
1172
      # If any non-static field is requested we need to lock the nodes
1173
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1174
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1175

    
1176
  def DeclareLocks(self, lu, level):
1177
    pass
1178

    
1179
  def _GetQueryData(self, lu):
1180
    """Computes the list of nodes and their attributes.
1181

1182
    """
1183
    all_info = lu.cfg.GetAllNodesInfo()
1184

    
1185
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1186

    
1187
    # Gather data as requested
1188
    if query.NQ_LIVE in self.requested_data:
1189
      # filter out non-vm_capable nodes
1190
      toquery_node_uuids = [node.uuid for node in all_info.values()
1191
                            if node.vm_capable and node.uuid in node_uuids]
1192
      lvm_enabled = utils.storage.IsLvmEnabled(
1193
          lu.cfg.GetClusterInfo().enabled_disk_templates)
1194
      # FIXME: this per default asks for storage space information for all
1195
      # enabled disk templates. Fix this by making it possible to specify
1196
      # space report fields for specific disk templates.
1197
      raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1198
          lu.cfg, include_spindles=lvm_enabled)
1199
      storage_units = rpc.PrepareStorageUnitsForNodes(
1200
          lu.cfg, raw_storage_units, toquery_node_uuids)
1201
      default_hypervisor = lu.cfg.GetHypervisorType()
1202
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1203
      hvspecs = [(default_hypervisor, hvparams)]
1204
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1205
                                        hvspecs)
1206
      live_data = dict(
1207
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1208
                                        require_spindles=lvm_enabled))
1209
          for (uuid, nresult) in node_data.items()
1210
          if not nresult.fail_msg and nresult.payload)
1211
    else:
1212
      live_data = None
1213

    
1214
    if query.NQ_INST in self.requested_data:
1215
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1216
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1217

    
1218
      inst_data = lu.cfg.GetAllInstancesInfo()
1219
      inst_uuid_to_inst_name = {}
1220

    
1221
      for inst in inst_data.values():
1222
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1223
        if inst.primary_node in node_to_primary:
1224
          node_to_primary[inst.primary_node].add(inst.uuid)
1225
        for secnode in inst.secondary_nodes:
1226
          if secnode in node_to_secondary:
1227
            node_to_secondary[secnode].add(inst.uuid)
1228
    else:
1229
      node_to_primary = None
1230
      node_to_secondary = None
1231
      inst_uuid_to_inst_name = None
1232

    
1233
    if query.NQ_OOB in self.requested_data:
1234
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1235
                         for uuid, node in all_info.iteritems())
1236
    else:
1237
      oob_support = None
1238

    
1239
    if query.NQ_GROUP in self.requested_data:
1240
      groups = lu.cfg.GetAllNodeGroupsInfo()
1241
    else:
1242
      groups = {}
1243

    
1244
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1245
                               live_data, lu.cfg.GetMasterNode(),
1246
                               node_to_primary, node_to_secondary,
1247
                               inst_uuid_to_inst_name, groups, oob_support,
1248
                               lu.cfg.GetClusterInfo())
1249

    
1250

    
1251
class LUNodeQuery(NoHooksLU):
1252
  """Logical unit for querying nodes.
1253

1254
  """
1255
  # pylint: disable=W0142
1256
  REQ_BGL = False
1257

    
1258
  def CheckArguments(self):
1259
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1260
                         self.op.output_fields, self.op.use_locking)
1261

    
1262
  def ExpandNames(self):
1263
    self.nq.ExpandNames(self)
1264

    
1265
  def DeclareLocks(self, level):
1266
    self.nq.DeclareLocks(self, level)
1267

    
1268
  def Exec(self, feedback_fn):
1269
    return self.nq.OldStyleQuery(self)
1270

    
1271

    
1272
def _CheckOutputFields(static, dynamic, selected):
1273
  """Checks whether all selected fields are valid.
1274

1275
  @type static: L{utils.FieldSet}
1276
  @param static: static fields set
1277
  @type dynamic: L{utils.FieldSet}
1278
  @param dynamic: dynamic fields set
1279

1280
  """
1281
  f = utils.FieldSet()
1282
  f.Extend(static)
1283
  f.Extend(dynamic)
1284

    
1285
  delta = f.NonMatching(selected)
1286
  if delta:
1287
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1288
                               % ",".join(delta), errors.ECODE_INVAL)
1289

    
1290

    
1291
class LUNodeQueryvols(NoHooksLU):
1292
  """Logical unit for getting volumes on node(s).
1293

1294
  """
1295
  REQ_BGL = False
1296
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1297
  _FIELDS_STATIC = utils.FieldSet("node")
1298

    
1299
  def CheckArguments(self):
1300
    _CheckOutputFields(static=self._FIELDS_STATIC,
1301
                       dynamic=self._FIELDS_DYNAMIC,
1302
                       selected=self.op.output_fields)
1303

    
1304
  def ExpandNames(self):
1305
    self.share_locks = ShareAll()
1306

    
1307
    if self.op.nodes:
1308
      self.needed_locks = {
1309
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1310
        }
1311
    else:
1312
      self.needed_locks = {
1313
        locking.LEVEL_NODE: locking.ALL_SET,
1314
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1315
        }
1316

    
1317
  def Exec(self, feedback_fn):
1318
    """Computes the list of nodes and their attributes.
1319

1320
    """
1321
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1322
    volumes = self.rpc.call_node_volumes(node_uuids)
1323

    
1324
    ilist = self.cfg.GetAllInstancesInfo()
1325
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1326

    
1327
    output = []
1328
    for node_uuid in node_uuids:
1329
      nresult = volumes[node_uuid]
1330
      if nresult.offline:
1331
        continue
1332
      msg = nresult.fail_msg
1333
      if msg:
1334
        self.LogWarning("Can't compute volume data on node %s: %s",
1335
                        self.cfg.GetNodeName(node_uuid), msg)
1336
        continue
1337

    
1338
      node_vols = sorted(nresult.payload,
1339
                         key=operator.itemgetter("dev"))
1340

    
1341
      for vol in node_vols:
1342
        node_output = []
1343
        for field in self.op.output_fields:
1344
          if field == "node":
1345
            val = self.cfg.GetNodeName(node_uuid)
1346
          elif field == "phys":
1347
            val = vol["dev"]
1348
          elif field == "vg":
1349
            val = vol["vg"]
1350
          elif field == "name":
1351
            val = vol["name"]
1352
          elif field == "size":
1353
            val = int(float(vol["size"]))
1354
          elif field == "instance":
1355
            inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1356
                                None)
1357
            if inst is not None:
1358
              val = inst.name
1359
            else:
1360
              val = "-"
1361
          else:
1362
            raise errors.ParameterError(field)
1363
          node_output.append(str(val))
1364

    
1365
        output.append(node_output)
1366

    
1367
    return output
1368

    
1369

    
1370
class LUNodeQueryStorage(NoHooksLU):
1371
  """Logical unit for getting information on storage units on node(s).
1372

1373
  """
1374
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1375
  REQ_BGL = False
1376

    
1377
  def CheckArguments(self):
1378
    _CheckOutputFields(static=self._FIELDS_STATIC,
1379
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1380
                       selected=self.op.output_fields)
1381

    
1382
  def ExpandNames(self):
1383
    self.share_locks = ShareAll()
1384

    
1385
    if self.op.nodes:
1386
      self.needed_locks = {
1387
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1388
        }
1389
    else:
1390
      self.needed_locks = {
1391
        locking.LEVEL_NODE: locking.ALL_SET,
1392
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1393
        }
1394

    
1395
  def CheckPrereq(self):
1396
    """Check prerequisites.
1397

1398
    """
1399
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1400

    
1401
  def Exec(self, feedback_fn):
1402
    """Computes the list of nodes and their attributes.
1403

1404
    """
1405
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1406

    
1407
    # Always get name to sort by
1408
    if constants.SF_NAME in self.op.output_fields:
1409
      fields = self.op.output_fields[:]
1410
    else:
1411
      fields = [constants.SF_NAME] + self.op.output_fields
1412

    
1413
    # Never ask for node or type as it's only known to the LU
1414
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1415
      while extra in fields:
1416
        fields.remove(extra)
1417

    
1418
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1419
    name_idx = field_idx[constants.SF_NAME]
1420

    
1421
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1422
    data = self.rpc.call_storage_list(self.node_uuids,
1423
                                      self.op.storage_type, st_args,
1424
                                      self.op.name, fields)
1425

    
1426
    result = []
1427

    
1428
    for node_uuid in utils.NiceSort(self.node_uuids):
1429
      node_name = self.cfg.GetNodeName(node_uuid)
1430
      nresult = data[node_uuid]
1431
      if nresult.offline:
1432
        continue
1433

    
1434
      msg = nresult.fail_msg
1435
      if msg:
1436
        self.LogWarning("Can't get storage data from node %s: %s",
1437
                        node_name, msg)
1438
        continue
1439

    
1440
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1441

    
1442
      for name in utils.NiceSort(rows.keys()):
1443
        row = rows[name]
1444

    
1445
        out = []
1446

    
1447
        for field in self.op.output_fields:
1448
          if field == constants.SF_NODE:
1449
            val = node_name
1450
          elif field == constants.SF_TYPE:
1451
            val = self.op.storage_type
1452
          elif field in field_idx:
1453
            val = row[field_idx[field]]
1454
          else:
1455
            raise errors.ParameterError(field)
1456

    
1457
          out.append(val)
1458

    
1459
        result.append(out)
1460

    
1461
    return result
1462

    
1463

    
1464
class LUNodeRemove(LogicalUnit):
1465
  """Logical unit for removing a node.
1466

1467
  """
1468
  HPATH = "node-remove"
1469
  HTYPE = constants.HTYPE_NODE
1470

    
1471
  def BuildHooksEnv(self):
1472
    """Build hooks env.
1473

1474
    """
1475
    return {
1476
      "OP_TARGET": self.op.node_name,
1477
      "NODE_NAME": self.op.node_name,
1478
      }
1479

    
1480
  def BuildHooksNodes(self):
1481
    """Build hooks nodes.
1482

1483
    This doesn't run on the target node in the pre phase as a failed
1484
    node would then be impossible to remove.
1485

1486
    """
1487
    all_nodes = self.cfg.GetNodeList()
1488
    try:
1489
      all_nodes.remove(self.op.node_uuid)
1490
    except ValueError:
1491
      pass
1492
    return (all_nodes, all_nodes)
1493

    
1494
  def CheckPrereq(self):
1495
    """Check prerequisites.
1496

1497
    This checks:
1498
     - the node exists in the configuration
1499
     - it does not have primary or secondary instances
1500
     - it's not the master
1501

1502
    Any errors are signaled by raising errors.OpPrereqError.
1503

1504
    """
1505
    (self.op.node_uuid, self.op.node_name) = \
1506
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1507
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1508
    assert node is not None
1509

    
1510
    masternode = self.cfg.GetMasterNode()
1511
    if node.uuid == masternode:
1512
      raise errors.OpPrereqError("Node is the master node, failover to another"
1513
                                 " node is required", errors.ECODE_INVAL)
1514

    
1515
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1516
      if node.uuid in instance.all_nodes:
1517
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1518
                                   " please remove first" % instance.name,
1519
                                   errors.ECODE_INVAL)
1520
    self.op.node_name = node.name
1521
    self.node = node
1522

    
1523
  def Exec(self, feedback_fn):
1524
    """Removes the node from the cluster.
1525

1526
    """
1527
    logging.info("Stopping the node daemon and removing configs from node %s",
1528
                 self.node.name)
1529

    
1530
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1531

    
1532
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1533
      "Not owning BGL"
1534

    
1535
    # Promote nodes to master candidate as needed
1536
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1537
    self.context.RemoveNode(self.node)
1538

    
1539
    # Run post hooks on the node before it's removed
1540
    RunPostHook(self, self.node.name)
1541

    
1542
    # we have to call this by name rather than by UUID, as the node is no longer
1543
    # in the config
1544
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1545
    msg = result.fail_msg
1546
    if msg:
1547
      self.LogWarning("Errors encountered on the remote node while leaving"
1548
                      " the cluster: %s", msg)
1549

    
1550
    # Remove node from our /etc/hosts
1551
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1552
      master_node_uuid = self.cfg.GetMasterNode()
1553
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1554
                                              constants.ETC_HOSTS_REMOVE,
1555
                                              self.node.name, None)
1556
      result.Raise("Can't update hosts file with new host data")
1557
      RedistributeAncillaryFiles(self)
1558

    
1559

    
1560
class LURepairNodeStorage(NoHooksLU):
1561
  """Repairs the volume group on a node.
1562

1563
  """
1564
  REQ_BGL = False
1565

    
1566
  def CheckArguments(self):
1567
    (self.op.node_uuid, self.op.node_name) = \
1568
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1569

    
1570
    storage_type = self.op.storage_type
1571

    
1572
    if (constants.SO_FIX_CONSISTENCY not in
1573
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1574
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1575
                                 " repaired" % storage_type,
1576
                                 errors.ECODE_INVAL)
1577

    
1578
  def ExpandNames(self):
1579
    self.needed_locks = {
1580
      locking.LEVEL_NODE: [self.op.node_uuid],
1581
      }
1582

    
1583
  def _CheckFaultyDisks(self, instance, node_uuid):
1584
    """Ensure faulty disks abort the opcode or at least warn."""
1585
    try:
1586
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1587
                                 node_uuid, True):
1588
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1589
                                   " node '%s'" %
1590
                                   (instance.name,
1591
                                    self.cfg.GetNodeName(node_uuid)),
1592
                                   errors.ECODE_STATE)
1593
    except errors.OpPrereqError, err:
1594
      if self.op.ignore_consistency:
1595
        self.LogWarning(str(err.args[0]))
1596
      else:
1597
        raise
1598

    
1599
  def CheckPrereq(self):
1600
    """Check prerequisites.
1601

1602
    """
1603
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1604

    
1605
    # Check whether any instance on this node has faulty disks
1606
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1607
      if not inst.disks_active:
1608
        continue
1609
      check_nodes = set(inst.all_nodes)
1610
      check_nodes.discard(self.op.node_uuid)
1611
      for inst_node_uuid in check_nodes:
1612
        self._CheckFaultyDisks(inst, inst_node_uuid)
1613

    
1614
  def Exec(self, feedback_fn):
1615
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1616
                (self.op.name, self.op.node_name))
1617

    
1618
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1619
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1620
                                           self.op.storage_type, st_args,
1621
                                           self.op.name,
1622
                                           constants.SO_FIX_CONSISTENCY)
1623
    result.Raise("Failed to repair storage unit '%s' on %s" %
1624
                 (self.op.name, self.op.node_name))