Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 4f90370c

History | View | Annotate | Download (60.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
    if self.op.ndparams:
116
      ovs = self.op.ndparams.get(constants.ND_OVS, None)
117
      ovs_name = self.op.ndparams.get(constants.ND_OVS_NAME, None)
118
      ovs_link = self.op.ndparams.get(constants.ND_OVS_LINK, None)
119

    
120
      # OpenvSwitch: Warn user if link is missing
121
      if ovs and not ovs_link:
122
        self.LogInfo("No physical interface for OpenvSwitch was given."
123
                     " OpenvSwitch will not have an outside connection. This"
124
                     " might not be what you want.")
125
      # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
126
      if not ovs and (ovs_name or ovs_link):
127
        raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
128
                                   " OpenvSwitch is not enabled. Please enable"
129
                                   " OpenvSwitch with --ovs",
130
                                   errors.ECODE_INVAL)
131

    
132
  def BuildHooksEnv(self):
133
    """Build hooks env.
134

135
    This will run on all nodes before, and on all nodes + the new node after.
136

137
    """
138
    return {
139
      "OP_TARGET": self.op.node_name,
140
      "NODE_NAME": self.op.node_name,
141
      "NODE_PIP": self.op.primary_ip,
142
      "NODE_SIP": self.op.secondary_ip,
143
      "MASTER_CAPABLE": str(self.op.master_capable),
144
      "VM_CAPABLE": str(self.op.vm_capable),
145
      }
146

    
147
  def BuildHooksNodes(self):
148
    """Build hooks nodes.
149

150
    """
151
    hook_nodes = self.cfg.GetNodeList()
152
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
153
    if new_node_info is not None:
154
      # Exclude added node
155
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
156

    
157
    # add the new node as post hook node by name; it does not have an UUID yet
158
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
159

    
160
  def CheckPrereq(self):
161
    """Check prerequisites.
162

163
    This checks:
164
     - the new node is not already in the config
165
     - it is resolvable
166
     - its parameters (single/dual homed) matches the cluster
167

168
    Any errors are signaled by raising errors.OpPrereqError.
169

170
    """
171
    node_name = self.hostname.name
172
    self.op.primary_ip = self.hostname.ip
173
    if self.op.secondary_ip is None:
174
      if self.primary_ip_family == netutils.IP6Address.family:
175
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
176
                                   " IPv4 address must be given as secondary",
177
                                   errors.ECODE_INVAL)
178
      self.op.secondary_ip = self.op.primary_ip
179

    
180
    secondary_ip = self.op.secondary_ip
181
    if not netutils.IP4Address.IsValid(secondary_ip):
182
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
183
                                 " address" % secondary_ip, errors.ECODE_INVAL)
184

    
185
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
186
    if not self.op.readd and existing_node_info is not None:
187
      raise errors.OpPrereqError("Node %s is already in the configuration" %
188
                                 node_name, errors.ECODE_EXISTS)
189
    elif self.op.readd and existing_node_info is None:
190
      raise errors.OpPrereqError("Node %s is not in the configuration" %
191
                                 node_name, errors.ECODE_NOENT)
192

    
193
    self.changed_primary_ip = False
194

    
195
    for existing_node in self.cfg.GetAllNodesInfo().values():
196
      if self.op.readd and node_name == existing_node.name:
197
        if existing_node.secondary_ip != secondary_ip:
198
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
199
                                     " address configuration as before",
200
                                     errors.ECODE_INVAL)
201
        if existing_node.primary_ip != self.op.primary_ip:
202
          self.changed_primary_ip = True
203

    
204
        continue
205

    
206
      if (existing_node.primary_ip == self.op.primary_ip or
207
          existing_node.secondary_ip == self.op.primary_ip or
208
          existing_node.primary_ip == secondary_ip or
209
          existing_node.secondary_ip == secondary_ip):
210
        raise errors.OpPrereqError("New node ip address(es) conflict with"
211
                                   " existing node %s" % existing_node.name,
212
                                   errors.ECODE_NOTUNIQUE)
213

    
214
    # After this 'if' block, None is no longer a valid value for the
215
    # _capable op attributes
216
    if self.op.readd:
217
      assert existing_node_info is not None, \
218
        "Can't retrieve locked node %s" % node_name
219
      for attr in self._NFLAGS:
220
        if getattr(self.op, attr) is None:
221
          setattr(self.op, attr, getattr(existing_node_info, attr))
222
    else:
223
      for attr in self._NFLAGS:
224
        if getattr(self.op, attr) is None:
225
          setattr(self.op, attr, True)
226

    
227
    if self.op.readd and not self.op.vm_capable:
228
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
229
      if pri or sec:
230
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
231
                                   " flag set to false, but it already holds"
232
                                   " instances" % node_name,
233
                                   errors.ECODE_STATE)
234

    
235
    # check that the type of the node (single versus dual homed) is the
236
    # same as for the master
237
    myself = self.cfg.GetMasterNodeInfo()
238
    master_singlehomed = myself.secondary_ip == myself.primary_ip
239
    newbie_singlehomed = secondary_ip == self.op.primary_ip
240
    if master_singlehomed != newbie_singlehomed:
241
      if master_singlehomed:
242
        raise errors.OpPrereqError("The master has no secondary ip but the"
243
                                   " new node has one",
244
                                   errors.ECODE_INVAL)
245
      else:
246
        raise errors.OpPrereqError("The master has a secondary ip but the"
247
                                   " new node doesn't have one",
248
                                   errors.ECODE_INVAL)
249

    
250
    # checks reachability
251
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
252
      raise errors.OpPrereqError("Node not reachable by ping",
253
                                 errors.ECODE_ENVIRON)
254

    
255
    if not newbie_singlehomed:
256
      # check reachability from my secondary ip to newbie's secondary ip
257
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
258
                              source=myself.secondary_ip):
259
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
260
                                   " based ping to node daemon port",
261
                                   errors.ECODE_ENVIRON)
262

    
263
    if self.op.readd:
264
      exceptions = [existing_node_info.uuid]
265
    else:
266
      exceptions = []
267

    
268
    if self.op.master_capable:
269
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
270
    else:
271
      self.master_candidate = False
272

    
273
    if self.op.readd:
274
      self.new_node = existing_node_info
275
    else:
276
      node_group = self.cfg.LookupNodeGroup(self.op.group)
277
      self.new_node = objects.Node(name=node_name,
278
                                   primary_ip=self.op.primary_ip,
279
                                   secondary_ip=secondary_ip,
280
                                   master_candidate=self.master_candidate,
281
                                   offline=False, drained=False,
282
                                   group=node_group, ndparams={})
283

    
284
    if self.op.ndparams:
285
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
286
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
287
                           "node", "cluster or group")
288

    
289
    if self.op.hv_state:
290
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
291

    
292
    if self.op.disk_state:
293
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
294

    
295
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
296
    #       it a property on the base class.
297
    rpcrunner = rpc.DnsOnlyRunner()
298
    result = rpcrunner.call_version([node_name])[node_name]
299
    result.Raise("Can't get version information from node %s" % node_name)
300
    if constants.PROTOCOL_VERSION == result.payload:
301
      logging.info("Communication to node %s fine, sw version %s match",
302
                   node_name, result.payload)
303
    else:
304
      raise errors.OpPrereqError("Version mismatch master version %s,"
305
                                 " node version %s" %
306
                                 (constants.PROTOCOL_VERSION, result.payload),
307
                                 errors.ECODE_ENVIRON)
308

    
309
    vg_name = self.cfg.GetVGName()
310
    if vg_name is not None:
311
      vparams = {constants.NV_PVLIST: [vg_name]}
312
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
313
      cname = self.cfg.GetClusterName()
314
      result = rpcrunner.call_node_verify_light(
315
          [node_name], vparams, cname,
316
          self.cfg.GetClusterInfo().hvparams)[node_name]
317
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318
      if errmsgs:
319
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
321

    
322
  def Exec(self, feedback_fn):
323
    """Adds the new node to the cluster.
324

325
    """
326
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
327
      "Not owning BGL"
328

    
329
    # We adding a new node so we assume it's powered
330
    self.new_node.powered = True
331

    
332
    # for re-adds, reset the offline/drained/master-candidate flags;
333
    # we need to reset here, otherwise offline would prevent RPC calls
334
    # later in the procedure; this also means that if the re-add
335
    # fails, we are left with a non-offlined, broken node
336
    if self.op.readd:
337
      self.new_node.drained = False
338
      self.LogInfo("Readding a node, the offline/drained flags were reset")
339
      # if we demote the node, we do cleanup later in the procedure
340
      self.new_node.master_candidate = self.master_candidate
341
      if self.changed_primary_ip:
342
        self.new_node.primary_ip = self.op.primary_ip
343

    
344
    # copy the master/vm_capable flags
345
    for attr in self._NFLAGS:
346
      setattr(self.new_node, attr, getattr(self.op, attr))
347

    
348
    # notify the user about any possible mc promotion
349
    if self.new_node.master_candidate:
350
      self.LogInfo("Node will be a master candidate")
351

    
352
    if self.op.ndparams:
353
      self.new_node.ndparams = self.op.ndparams
354
    else:
355
      self.new_node.ndparams = {}
356

    
357
    if self.op.hv_state:
358
      self.new_node.hv_state_static = self.new_hv_state
359

    
360
    if self.op.disk_state:
361
      self.new_node.disk_state_static = self.new_disk_state
362

    
363
    # Add node to our /etc/hosts, and add key to known_hosts
364
    if self.cfg.GetClusterInfo().modify_etc_hosts:
365
      master_node = self.cfg.GetMasterNode()
366
      result = self.rpc.call_etc_hosts_modify(
367
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
368
                 self.hostname.ip)
369
      result.Raise("Can't update hosts file with new host data")
370

    
371
    if self.new_node.secondary_ip != self.new_node.primary_ip:
372
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
373
                               False)
374

    
375
    node_verifier_uuids = [self.cfg.GetMasterNode()]
376
    node_verify_param = {
377
      constants.NV_NODELIST: ([self.new_node.name], {}),
378
      # TODO: do a node-net-test as well?
379
    }
380

    
381
    result = self.rpc.call_node_verify(
382
               node_verifier_uuids, node_verify_param,
383
               self.cfg.GetClusterName(),
384
               self.cfg.GetClusterInfo().hvparams)
385
    for verifier in node_verifier_uuids:
386
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
387
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
388
      if nl_payload:
389
        for failed in nl_payload:
390
          feedback_fn("ssh/hostname verification failed"
391
                      " (checking from %s): %s" %
392
                      (verifier, nl_payload[failed]))
393
        raise errors.OpExecError("ssh/hostname verification failed")
394

    
395
    # OpenvSwitch initialization on the node
396
    ovs = self.new_node.ndparams.get(constants.ND_OVS, None)
397
    ovs_name = self.new_node.ndparams.get(constants.ND_OVS_NAME, None)
398
    ovs_link = self.new_node.ndparams.get(constants.ND_OVS_LINK, None)
399

    
400
    if ovs:
401
      result = self.rpc.call_node_configure_ovs(
402
                 self.new_node.name, ovs_name, ovs_link)
403

    
404
    if self.op.readd:
405
      self.context.ReaddNode(self.new_node)
406
      RedistributeAncillaryFiles(self)
407
      # make sure we redistribute the config
408
      self.cfg.Update(self.new_node, feedback_fn)
409
      # and make sure the new node will not have old files around
410
      if not self.new_node.master_candidate:
411
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
412
        result.Warn("Node failed to demote itself from master candidate status",
413
                    self.LogWarning)
414
    else:
415
      self.context.AddNode(self.new_node, self.proc.GetECId())
416
      RedistributeAncillaryFiles(self)
417

    
418

    
419
class LUNodeSetParams(LogicalUnit):
420
  """Modifies the parameters of a node.
421

422
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
423
      to the node role (as _ROLE_*)
424
  @cvar _R2F: a dictionary from node role to tuples of flags
425
  @cvar _FLAGS: a list of attribute names corresponding to the flags
426

427
  """
428
  HPATH = "node-modify"
429
  HTYPE = constants.HTYPE_NODE
430
  REQ_BGL = False
431
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
432
  _F2R = {
433
    (True, False, False): _ROLE_CANDIDATE,
434
    (False, True, False): _ROLE_DRAINED,
435
    (False, False, True): _ROLE_OFFLINE,
436
    (False, False, False): _ROLE_REGULAR,
437
    }
438
  _R2F = dict((v, k) for k, v in _F2R.items())
439
  _FLAGS = ["master_candidate", "drained", "offline"]
440

    
441
  def CheckArguments(self):
442
    (self.op.node_uuid, self.op.node_name) = \
443
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
444
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
445
                self.op.master_capable, self.op.vm_capable,
446
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
447
                self.op.disk_state]
448
    if all_mods.count(None) == len(all_mods):
449
      raise errors.OpPrereqError("Please pass at least one modification",
450
                                 errors.ECODE_INVAL)
451
    if all_mods.count(True) > 1:
452
      raise errors.OpPrereqError("Can't set the node into more than one"
453
                                 " state at the same time",
454
                                 errors.ECODE_INVAL)
455

    
456
    # Boolean value that tells us whether we might be demoting from MC
457
    self.might_demote = (self.op.master_candidate is False or
458
                         self.op.offline is True or
459
                         self.op.drained is True or
460
                         self.op.master_capable is False)
461

    
462
    if self.op.secondary_ip:
463
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
464
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
465
                                   " address" % self.op.secondary_ip,
466
                                   errors.ECODE_INVAL)
467

    
468
    self.lock_all = self.op.auto_promote and self.might_demote
469
    self.lock_instances = self.op.secondary_ip is not None
470

    
471
  def _InstanceFilter(self, instance):
472
    """Filter for getting affected instances.
473

474
    """
475
    return (instance.disk_template in constants.DTS_INT_MIRROR and
476
            self.op.node_uuid in instance.all_nodes)
477

    
478
  def ExpandNames(self):
479
    if self.lock_all:
480
      self.needed_locks = {
481
        locking.LEVEL_NODE: locking.ALL_SET,
482

    
483
        # Block allocations when all nodes are locked
484
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
485
        }
486
    else:
487
      self.needed_locks = {
488
        locking.LEVEL_NODE: self.op.node_uuid,
489
        }
490

    
491
    # Since modifying a node can have severe effects on currently running
492
    # operations the resource lock is at least acquired in shared mode
493
    self.needed_locks[locking.LEVEL_NODE_RES] = \
494
      self.needed_locks[locking.LEVEL_NODE]
495

    
496
    # Get all locks except nodes in shared mode; they are not used for anything
497
    # but read-only access
498
    self.share_locks = ShareAll()
499
    self.share_locks[locking.LEVEL_NODE] = 0
500
    self.share_locks[locking.LEVEL_NODE_RES] = 0
501
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
502

    
503
    if self.lock_instances:
504
      self.needed_locks[locking.LEVEL_INSTANCE] = \
505
        self.cfg.GetInstanceNames(
506
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
507

    
508
  def BuildHooksEnv(self):
509
    """Build hooks env.
510

511
    This runs on the master node.
512

513
    """
514
    return {
515
      "OP_TARGET": self.op.node_name,
516
      "MASTER_CANDIDATE": str(self.op.master_candidate),
517
      "OFFLINE": str(self.op.offline),
518
      "DRAINED": str(self.op.drained),
519
      "MASTER_CAPABLE": str(self.op.master_capable),
520
      "VM_CAPABLE": str(self.op.vm_capable),
521
      }
522

    
523
  def BuildHooksNodes(self):
524
    """Build hooks nodes.
525

526
    """
527
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
528
    return (nl, nl)
529

    
530
  def CheckPrereq(self):
531
    """Check prerequisites.
532

533
    This only checks the instance list against the existing names.
534

535
    """
536
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
537
    if self.lock_instances:
538
      affected_instances = \
539
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
540

    
541
      # Verify instance locks
542
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
543
      wanted_instance_names = frozenset([inst.name for inst in
544
                                         affected_instances.values()])
545
      if wanted_instance_names - owned_instance_names:
546
        raise errors.OpPrereqError("Instances affected by changing node %s's"
547
                                   " secondary IP address have changed since"
548
                                   " locks were acquired, wanted '%s', have"
549
                                   " '%s'; retry the operation" %
550
                                   (node.name,
551
                                    utils.CommaJoin(wanted_instance_names),
552
                                    utils.CommaJoin(owned_instance_names)),
553
                                   errors.ECODE_STATE)
554
    else:
555
      affected_instances = None
556

    
557
    if (self.op.master_candidate is not None or
558
        self.op.drained is not None or
559
        self.op.offline is not None):
560
      # we can't change the master's node flags
561
      if node.uuid == self.cfg.GetMasterNode():
562
        raise errors.OpPrereqError("The master role can be changed"
563
                                   " only via master-failover",
564
                                   errors.ECODE_INVAL)
565

    
566
    if self.op.master_candidate and not node.master_capable:
567
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
568
                                 " it a master candidate" % node.name,
569
                                 errors.ECODE_STATE)
570

    
571
    if self.op.vm_capable is False:
572
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
573
      if ipri or isec:
574
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
575
                                   " the vm_capable flag" % node.name,
576
                                   errors.ECODE_STATE)
577

    
578
    if node.master_candidate and self.might_demote and not self.lock_all:
579
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
580
      # check if after removing the current node, we're missing master
581
      # candidates
582
      (mc_remaining, mc_should, _) = \
583
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
584
      if mc_remaining < mc_should:
585
        raise errors.OpPrereqError("Not enough master candidates, please"
586
                                   " pass auto promote option to allow"
587
                                   " promotion (--auto-promote or RAPI"
588
                                   " auto_promote=True)", errors.ECODE_STATE)
589

    
590
    self.old_flags = old_flags = (node.master_candidate,
591
                                  node.drained, node.offline)
592
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
593
    self.old_role = old_role = self._F2R[old_flags]
594

    
595
    # Check for ineffective changes
596
    for attr in self._FLAGS:
597
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
598
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
599
        setattr(self.op, attr, None)
600

    
601
    # Past this point, any flag change to False means a transition
602
    # away from the respective state, as only real changes are kept
603

    
604
    # TODO: We might query the real power state if it supports OOB
605
    if SupportsOob(self.cfg, node):
606
      if self.op.offline is False and not (node.powered or
607
                                           self.op.powered is True):
608
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
609
                                    " offline status can be reset") %
610
                                   self.op.node_name, errors.ECODE_STATE)
611
    elif self.op.powered is not None:
612
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
613
                                  " as it does not support out-of-band"
614
                                  " handling") % self.op.node_name,
615
                                 errors.ECODE_STATE)
616

    
617
    # If we're being deofflined/drained, we'll MC ourself if needed
618
    if (self.op.drained is False or self.op.offline is False or
619
        (self.op.master_capable and not node.master_capable)):
620
      if _DecideSelfPromotion(self):
621
        self.op.master_candidate = True
622
        self.LogInfo("Auto-promoting node to master candidate")
623

    
624
    # If we're no longer master capable, we'll demote ourselves from MC
625
    if self.op.master_capable is False and node.master_candidate:
626
      self.LogInfo("Demoting from master candidate")
627
      self.op.master_candidate = False
628

    
629
    # Compute new role
630
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
631
    if self.op.master_candidate:
632
      new_role = self._ROLE_CANDIDATE
633
    elif self.op.drained:
634
      new_role = self._ROLE_DRAINED
635
    elif self.op.offline:
636
      new_role = self._ROLE_OFFLINE
637
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
638
      # False is still in new flags, which means we're un-setting (the
639
      # only) True flag
640
      new_role = self._ROLE_REGULAR
641
    else: # no new flags, nothing, keep old role
642
      new_role = old_role
643

    
644
    self.new_role = new_role
645

    
646
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
647
      # Trying to transition out of offline status
648
      result = self.rpc.call_version([node.uuid])[node.uuid]
649
      if result.fail_msg:
650
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
651
                                   " to report its version: %s" %
652
                                   (node.name, result.fail_msg),
653
                                   errors.ECODE_STATE)
654
      else:
655
        self.LogWarning("Transitioning node from offline to online state"
656
                        " without using re-add. Please make sure the node"
657
                        " is healthy!")
658

    
659
    # When changing the secondary ip, verify if this is a single-homed to
660
    # multi-homed transition or vice versa, and apply the relevant
661
    # restrictions.
662
    if self.op.secondary_ip:
663
      # Ok even without locking, because this can't be changed by any LU
664
      master = self.cfg.GetMasterNodeInfo()
665
      master_singlehomed = master.secondary_ip == master.primary_ip
666
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
667
        if self.op.force and node.uuid == master.uuid:
668
          self.LogWarning("Transitioning from single-homed to multi-homed"
669
                          " cluster; all nodes will require a secondary IP"
670
                          " address")
671
        else:
672
          raise errors.OpPrereqError("Changing the secondary ip on a"
673
                                     " single-homed cluster requires the"
674
                                     " --force option to be passed, and the"
675
                                     " target node to be the master",
676
                                     errors.ECODE_INVAL)
677
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
678
        if self.op.force and node.uuid == master.uuid:
679
          self.LogWarning("Transitioning from multi-homed to single-homed"
680
                          " cluster; secondary IP addresses will have to be"
681
                          " removed")
682
        else:
683
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
684
                                     " same as the primary IP on a multi-homed"
685
                                     " cluster, unless the --force option is"
686
                                     " passed, and the target node is the"
687
                                     " master", errors.ECODE_INVAL)
688

    
689
      assert not (set([inst.name for inst in affected_instances.values()]) -
690
                  self.owned_locks(locking.LEVEL_INSTANCE))
691

    
692
      if node.offline:
693
        if affected_instances:
694
          msg = ("Cannot change secondary IP address: offline node has"
695
                 " instances (%s) configured to use it" %
696
                 utils.CommaJoin(
697
                   [inst.name for inst in affected_instances.values()]))
698
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
699
      else:
700
        # On online nodes, check that no instances are running, and that
701
        # the node has the new ip and we can reach it.
702
        for instance in affected_instances.values():
703
          CheckInstanceState(self, instance, INSTANCE_DOWN,
704
                             msg="cannot change secondary ip")
705

    
706
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
707
        if master.uuid != node.uuid:
708
          # check reachability from master secondary ip to new secondary ip
709
          if not netutils.TcpPing(self.op.secondary_ip,
710
                                  constants.DEFAULT_NODED_PORT,
711
                                  source=master.secondary_ip):
712
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
713
                                       " based ping to node daemon port",
714
                                       errors.ECODE_ENVIRON)
715

    
716
    if self.op.ndparams:
717
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
718
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
719
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
720
                           "node", "cluster or group")
721
      self.new_ndparams = new_ndparams
722

    
723
    if self.op.hv_state:
724
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
725
                                                node.hv_state_static)
726

    
727
    if self.op.disk_state:
728
      self.new_disk_state = \
729
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
730

    
731
  def Exec(self, feedback_fn):
732
    """Modifies a node.
733

734
    """
735
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
736
    result = []
737

    
738
    if self.op.ndparams:
739
      node.ndparams = self.new_ndparams
740

    
741
    if self.op.powered is not None:
742
      node.powered = self.op.powered
743

    
744
    if self.op.hv_state:
745
      node.hv_state_static = self.new_hv_state
746

    
747
    if self.op.disk_state:
748
      node.disk_state_static = self.new_disk_state
749

    
750
    for attr in ["master_capable", "vm_capable"]:
751
      val = getattr(self.op, attr)
752
      if val is not None:
753
        setattr(node, attr, val)
754
        result.append((attr, str(val)))
755

    
756
    if self.new_role != self.old_role:
757
      # Tell the node to demote itself, if no longer MC and not offline
758
      if self.old_role == self._ROLE_CANDIDATE and \
759
          self.new_role != self._ROLE_OFFLINE:
760
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
761
        if msg:
762
          self.LogWarning("Node failed to demote itself: %s", msg)
763

    
764
      new_flags = self._R2F[self.new_role]
765
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
766
        if of != nf:
767
          result.append((desc, str(nf)))
768
      (node.master_candidate, node.drained, node.offline) = new_flags
769

    
770
      # we locked all nodes, we adjust the CP before updating this node
771
      if self.lock_all:
772
        AdjustCandidatePool(self, [node.uuid])
773

    
774
    if self.op.secondary_ip:
775
      node.secondary_ip = self.op.secondary_ip
776
      result.append(("secondary_ip", self.op.secondary_ip))
777

    
778
    # this will trigger configuration file update, if needed
779
    self.cfg.Update(node, feedback_fn)
780

    
781
    # this will trigger job queue propagation or cleanup if the mc
782
    # flag changed
783
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
784
      self.context.ReaddNode(node)
785

    
786
    return result
787

    
788

    
789
class LUNodePowercycle(NoHooksLU):
790
  """Powercycles a node.
791

792
  """
793
  REQ_BGL = False
794

    
795
  def CheckArguments(self):
796
    (self.op.node_uuid, self.op.node_name) = \
797
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
798

    
799
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
800
      raise errors.OpPrereqError("The node is the master and the force"
801
                                 " parameter was not set",
802
                                 errors.ECODE_INVAL)
803

    
804
  def ExpandNames(self):
805
    """Locking for PowercycleNode.
806

807
    This is a last-resort option and shouldn't block on other
808
    jobs. Therefore, we grab no locks.
809

810
    """
811
    self.needed_locks = {}
812

    
813
  def Exec(self, feedback_fn):
814
    """Reboots a node.
815

816
    """
817
    default_hypervisor = self.cfg.GetHypervisorType()
818
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
819
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
820
                                           default_hypervisor,
821
                                           hvparams)
822
    result.Raise("Failed to schedule the reboot")
823
    return result.payload
824

    
825

    
826
def _GetNodeInstancesInner(cfg, fn):
827
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
828

    
829

    
830
def _GetNodePrimaryInstances(cfg, node_uuid):
831
  """Returns primary instances on a node.
832

833
  """
834
  return _GetNodeInstancesInner(cfg,
835
                                lambda inst: node_uuid == inst.primary_node)
836

    
837

    
838
def _GetNodeSecondaryInstances(cfg, node_uuid):
839
  """Returns secondary instances on a node.
840

841
  """
842
  return _GetNodeInstancesInner(cfg,
843
                                lambda inst: node_uuid in inst.secondary_nodes)
844

    
845

    
846
def _GetNodeInstances(cfg, node_uuid):
847
  """Returns a list of all primary and secondary instances on a node.
848

849
  """
850

    
851
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
852

    
853

    
854
class LUNodeEvacuate(NoHooksLU):
855
  """Evacuates instances off a list of nodes.
856

857
  """
858
  REQ_BGL = False
859

    
860
  def CheckArguments(self):
861
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
862

    
863
  def ExpandNames(self):
864
    (self.op.node_uuid, self.op.node_name) = \
865
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
866

    
867
    if self.op.remote_node is not None:
868
      (self.op.remote_node_uuid, self.op.remote_node) = \
869
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
870
                              self.op.remote_node)
871
      assert self.op.remote_node
872

    
873
      if self.op.node_uuid == self.op.remote_node_uuid:
874
        raise errors.OpPrereqError("Can not use evacuated node as a new"
875
                                   " secondary node", errors.ECODE_INVAL)
876

    
877
      if self.op.mode != constants.NODE_EVAC_SEC:
878
        raise errors.OpPrereqError("Without the use of an iallocator only"
879
                                   " secondary instances can be evacuated",
880
                                   errors.ECODE_INVAL)
881

    
882
    # Declare locks
883
    self.share_locks = ShareAll()
884
    self.needed_locks = {
885
      locking.LEVEL_INSTANCE: [],
886
      locking.LEVEL_NODEGROUP: [],
887
      locking.LEVEL_NODE: [],
888
      }
889

    
890
    # Determine nodes (via group) optimistically, needs verification once locks
891
    # have been acquired
892
    self.lock_nodes = self._DetermineNodes()
893

    
894
  def _DetermineNodes(self):
895
    """Gets the list of node UUIDs to operate on.
896

897
    """
898
    if self.op.remote_node is None:
899
      # Iallocator will choose any node(s) in the same group
900
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
901
    else:
902
      group_nodes = frozenset([self.op.remote_node_uuid])
903

    
904
    # Determine nodes to be locked
905
    return set([self.op.node_uuid]) | group_nodes
906

    
907
  def _DetermineInstances(self):
908
    """Builds list of instances to operate on.
909

910
    """
911
    assert self.op.mode in constants.NODE_EVAC_MODES
912

    
913
    if self.op.mode == constants.NODE_EVAC_PRI:
914
      # Primary instances only
915
      inst_fn = _GetNodePrimaryInstances
916
      assert self.op.remote_node is None, \
917
        "Evacuating primary instances requires iallocator"
918
    elif self.op.mode == constants.NODE_EVAC_SEC:
919
      # Secondary instances only
920
      inst_fn = _GetNodeSecondaryInstances
921
    else:
922
      # All instances
923
      assert self.op.mode == constants.NODE_EVAC_ALL
924
      inst_fn = _GetNodeInstances
925
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
926
      # per instance
927
      raise errors.OpPrereqError("Due to an issue with the iallocator"
928
                                 " interface it is not possible to evacuate"
929
                                 " all instances at once; specify explicitly"
930
                                 " whether to evacuate primary or secondary"
931
                                 " instances",
932
                                 errors.ECODE_INVAL)
933

    
934
    return inst_fn(self.cfg, self.op.node_uuid)
935

    
936
  def DeclareLocks(self, level):
937
    if level == locking.LEVEL_INSTANCE:
938
      # Lock instances optimistically, needs verification once node and group
939
      # locks have been acquired
940
      self.needed_locks[locking.LEVEL_INSTANCE] = \
941
        set(i.name for i in self._DetermineInstances())
942

    
943
    elif level == locking.LEVEL_NODEGROUP:
944
      # Lock node groups for all potential target nodes optimistically, needs
945
      # verification once nodes have been acquired
946
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
947
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
948

    
949
    elif level == locking.LEVEL_NODE:
950
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
951

    
952
  def CheckPrereq(self):
953
    # Verify locks
954
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
955
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
956
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
957

    
958
    need_nodes = self._DetermineNodes()
959

    
960
    if not owned_nodes.issuperset(need_nodes):
961
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
962
                                 " locks were acquired, current nodes are"
963
                                 " are '%s', used to be '%s'; retry the"
964
                                 " operation" %
965
                                 (self.op.node_name,
966
                                  utils.CommaJoin(need_nodes),
967
                                  utils.CommaJoin(owned_nodes)),
968
                                 errors.ECODE_STATE)
969

    
970
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
971
    if owned_groups != wanted_groups:
972
      raise errors.OpExecError("Node groups changed since locks were acquired,"
973
                               " current groups are '%s', used to be '%s';"
974
                               " retry the operation" %
975
                               (utils.CommaJoin(wanted_groups),
976
                                utils.CommaJoin(owned_groups)))
977

    
978
    # Determine affected instances
979
    self.instances = self._DetermineInstances()
980
    self.instance_names = [i.name for i in self.instances]
981

    
982
    if set(self.instance_names) != owned_instance_names:
983
      raise errors.OpExecError("Instances on node '%s' changed since locks"
984
                               " were acquired, current instances are '%s',"
985
                               " used to be '%s'; retry the operation" %
986
                               (self.op.node_name,
987
                                utils.CommaJoin(self.instance_names),
988
                                utils.CommaJoin(owned_instance_names)))
989

    
990
    if self.instance_names:
991
      self.LogInfo("Evacuating instances from node '%s': %s",
992
                   self.op.node_name,
993
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
994
    else:
995
      self.LogInfo("No instances to evacuate from node '%s'",
996
                   self.op.node_name)
997

    
998
    if self.op.remote_node is not None:
999
      for i in self.instances:
1000
        if i.primary_node == self.op.remote_node_uuid:
1001
          raise errors.OpPrereqError("Node %s is the primary node of"
1002
                                     " instance %s, cannot use it as"
1003
                                     " secondary" %
1004
                                     (self.op.remote_node, i.name),
1005
                                     errors.ECODE_INVAL)
1006

    
1007
  def Exec(self, feedback_fn):
1008
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1009

    
1010
    if not self.instance_names:
1011
      # No instances to evacuate
1012
      jobs = []
1013

    
1014
    elif self.op.iallocator is not None:
1015
      # TODO: Implement relocation to other group
1016
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1017
                                     instances=list(self.instance_names))
1018
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1019

    
1020
      ial.Run(self.op.iallocator)
1021

    
1022
      if not ial.success:
1023
        raise errors.OpPrereqError("Can't compute node evacuation using"
1024
                                   " iallocator '%s': %s" %
1025
                                   (self.op.iallocator, ial.info),
1026
                                   errors.ECODE_NORES)
1027

    
1028
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1029

    
1030
    elif self.op.remote_node is not None:
1031
      assert self.op.mode == constants.NODE_EVAC_SEC
1032
      jobs = [
1033
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1034
                                        remote_node=self.op.remote_node,
1035
                                        disks=[],
1036
                                        mode=constants.REPLACE_DISK_CHG,
1037
                                        early_release=self.op.early_release)]
1038
        for instance_name in self.instance_names]
1039

    
1040
    else:
1041
      raise errors.ProgrammerError("No iallocator or remote node")
1042

    
1043
    return ResultWithJobs(jobs)
1044

    
1045

    
1046
class LUNodeMigrate(LogicalUnit):
1047
  """Migrate all instances from a node.
1048

1049
  """
1050
  HPATH = "node-migrate"
1051
  HTYPE = constants.HTYPE_NODE
1052
  REQ_BGL = False
1053

    
1054
  def CheckArguments(self):
1055
    pass
1056

    
1057
  def ExpandNames(self):
1058
    (self.op.node_uuid, self.op.node_name) = \
1059
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1060

    
1061
    self.share_locks = ShareAll()
1062
    self.needed_locks = {
1063
      locking.LEVEL_NODE: [self.op.node_uuid],
1064
      }
1065

    
1066
  def BuildHooksEnv(self):
1067
    """Build hooks env.
1068

1069
    This runs on the master, the primary and all the secondaries.
1070

1071
    """
1072
    return {
1073
      "NODE_NAME": self.op.node_name,
1074
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1075
      }
1076

    
1077
  def BuildHooksNodes(self):
1078
    """Build hooks nodes.
1079

1080
    """
1081
    nl = [self.cfg.GetMasterNode()]
1082
    return (nl, nl)
1083

    
1084
  def CheckPrereq(self):
1085
    pass
1086

    
1087
  def Exec(self, feedback_fn):
1088
    # Prepare jobs for migration instances
1089
    jobs = [
1090
      [opcodes.OpInstanceMigrate(
1091
        instance_name=inst.name,
1092
        mode=self.op.mode,
1093
        live=self.op.live,
1094
        iallocator=self.op.iallocator,
1095
        target_node=self.op.target_node,
1096
        allow_runtime_changes=self.op.allow_runtime_changes,
1097
        ignore_ipolicy=self.op.ignore_ipolicy)]
1098
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1099

    
1100
    # TODO: Run iallocator in this opcode and pass correct placement options to
1101
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1102
    # running the iallocator and the actual migration, a good consistency model
1103
    # will have to be found.
1104

    
1105
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1106
            frozenset([self.op.node_uuid]))
1107

    
1108
    return ResultWithJobs(jobs)
1109

    
1110

    
1111
def _GetStorageTypeArgs(cfg, storage_type):
1112
  """Returns the arguments for a storage type.
1113

1114
  """
1115
  # Special case for file storage
1116
  if storage_type == constants.ST_FILE:
1117
    # storage.FileStorage wants a list of storage directories
1118
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1119

    
1120
  return []
1121

    
1122

    
1123
class LUNodeModifyStorage(NoHooksLU):
1124
  """Logical unit for modifying a storage volume on a node.
1125

1126
  """
1127
  REQ_BGL = False
1128

    
1129
  def CheckArguments(self):
1130
    (self.op.node_uuid, self.op.node_name) = \
1131
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1132

    
1133
    storage_type = self.op.storage_type
1134

    
1135
    try:
1136
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1137
    except KeyError:
1138
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1139
                                 " modified" % storage_type,
1140
                                 errors.ECODE_INVAL)
1141

    
1142
    diff = set(self.op.changes.keys()) - modifiable
1143
    if diff:
1144
      raise errors.OpPrereqError("The following fields can not be modified for"
1145
                                 " storage units of type '%s': %r" %
1146
                                 (storage_type, list(diff)),
1147
                                 errors.ECODE_INVAL)
1148

    
1149
  def CheckPrereq(self):
1150
    """Check prerequisites.
1151

1152
    """
1153
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1154

    
1155
  def ExpandNames(self):
1156
    self.needed_locks = {
1157
      locking.LEVEL_NODE: self.op.node_uuid,
1158
      }
1159

    
1160
  def Exec(self, feedback_fn):
1161
    """Computes the list of nodes and their attributes.
1162

1163
    """
1164
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1165
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1166
                                          self.op.storage_type, st_args,
1167
                                          self.op.name, self.op.changes)
1168
    result.Raise("Failed to modify storage unit '%s' on %s" %
1169
                 (self.op.name, self.op.node_name))
1170

    
1171

    
1172
class NodeQuery(QueryBase):
1173
  FIELDS = query.NODE_FIELDS
1174

    
1175
  def ExpandNames(self, lu):
1176
    lu.needed_locks = {}
1177
    lu.share_locks = ShareAll()
1178

    
1179
    if self.names:
1180
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1181
    else:
1182
      self.wanted = locking.ALL_SET
1183

    
1184
    self.do_locking = (self.use_locking and
1185
                       query.NQ_LIVE in self.requested_data)
1186

    
1187
    if self.do_locking:
1188
      # If any non-static field is requested we need to lock the nodes
1189
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1190
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1191

    
1192
  def DeclareLocks(self, lu, level):
1193
    pass
1194

    
1195
  def _GetQueryData(self, lu):
1196
    """Computes the list of nodes and their attributes.
1197

1198
    """
1199
    all_info = lu.cfg.GetAllNodesInfo()
1200

    
1201
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1202

    
1203
    # Gather data as requested
1204
    if query.NQ_LIVE in self.requested_data:
1205
      # filter out non-vm_capable nodes
1206
      toquery_node_uuids = [node.uuid for node in all_info.values()
1207
                            if node.vm_capable and node.uuid in node_uuids]
1208
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1209
      raw_storage_units = utils.storage.GetStorageUnits(
1210
          lu.cfg, [default_template])
1211
      storage_units = rpc.PrepareStorageUnitsForNodes(
1212
          lu.cfg, raw_storage_units, toquery_node_uuids)
1213
      default_hypervisor = lu.cfg.GetHypervisorType()
1214
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1215
      hvspecs = [(default_hypervisor, hvparams)]
1216
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1217
                                        hvspecs)
1218
      live_data = dict(
1219
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1220
          for (uuid, nresult) in node_data.items()
1221
          if not nresult.fail_msg and nresult.payload)
1222
    else:
1223
      live_data = None
1224

    
1225
    if query.NQ_INST in self.requested_data:
1226
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1227
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1228

    
1229
      inst_data = lu.cfg.GetAllInstancesInfo()
1230
      inst_uuid_to_inst_name = {}
1231

    
1232
      for inst in inst_data.values():
1233
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1234
        if inst.primary_node in node_to_primary:
1235
          node_to_primary[inst.primary_node].add(inst.uuid)
1236
        for secnode in inst.secondary_nodes:
1237
          if secnode in node_to_secondary:
1238
            node_to_secondary[secnode].add(inst.uuid)
1239
    else:
1240
      node_to_primary = None
1241
      node_to_secondary = None
1242
      inst_uuid_to_inst_name = None
1243

    
1244
    if query.NQ_OOB in self.requested_data:
1245
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1246
                         for uuid, node in all_info.iteritems())
1247
    else:
1248
      oob_support = None
1249

    
1250
    if query.NQ_GROUP in self.requested_data:
1251
      groups = lu.cfg.GetAllNodeGroupsInfo()
1252
    else:
1253
      groups = {}
1254

    
1255
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1256
                               live_data, lu.cfg.GetMasterNode(),
1257
                               node_to_primary, node_to_secondary,
1258
                               inst_uuid_to_inst_name, groups, oob_support,
1259
                               lu.cfg.GetClusterInfo())
1260

    
1261

    
1262
class LUNodeQuery(NoHooksLU):
1263
  """Logical unit for querying nodes.
1264

1265
  """
1266
  # pylint: disable=W0142
1267
  REQ_BGL = False
1268

    
1269
  def CheckArguments(self):
1270
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1271
                         self.op.output_fields, self.op.use_locking)
1272

    
1273
  def ExpandNames(self):
1274
    self.nq.ExpandNames(self)
1275

    
1276
  def DeclareLocks(self, level):
1277
    self.nq.DeclareLocks(self, level)
1278

    
1279
  def Exec(self, feedback_fn):
1280
    return self.nq.OldStyleQuery(self)
1281

    
1282

    
1283
def _CheckOutputFields(fields, selected):
1284
  """Checks whether all selected fields are valid according to fields.
1285

1286
  @type fields: L{utils.FieldSet}
1287
  @param fields: fields set
1288
  @type selected: L{utils.FieldSet}
1289
  @param selected: fields set
1290

1291
  """
1292
  delta = fields.NonMatching(selected)
1293
  if delta:
1294
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1295
                               % ",".join(delta), errors.ECODE_INVAL)
1296

    
1297

    
1298
class LUNodeQueryvols(NoHooksLU):
1299
  """Logical unit for getting volumes on node(s).
1300

1301
  """
1302
  REQ_BGL = False
1303

    
1304
  def CheckArguments(self):
1305
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1306
                                      constants.VF_VG, constants.VF_NAME,
1307
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1308
                       self.op.output_fields)
1309

    
1310
  def ExpandNames(self):
1311
    self.share_locks = ShareAll()
1312

    
1313
    if self.op.nodes:
1314
      self.needed_locks = {
1315
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1316
        }
1317
    else:
1318
      self.needed_locks = {
1319
        locking.LEVEL_NODE: locking.ALL_SET,
1320
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1321
        }
1322

    
1323
  def Exec(self, feedback_fn):
1324
    """Computes the list of nodes and their attributes.
1325

1326
    """
1327
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1328
    volumes = self.rpc.call_node_volumes(node_uuids)
1329

    
1330
    ilist = self.cfg.GetAllInstancesInfo()
1331
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1332

    
1333
    output = []
1334
    for node_uuid in node_uuids:
1335
      nresult = volumes[node_uuid]
1336
      if nresult.offline:
1337
        continue
1338
      msg = nresult.fail_msg
1339
      if msg:
1340
        self.LogWarning("Can't compute volume data on node %s: %s",
1341
                        self.cfg.GetNodeName(node_uuid), msg)
1342
        continue
1343

    
1344
      node_vols = sorted(nresult.payload,
1345
                         key=operator.itemgetter(constants.VF_DEV))
1346

    
1347
      for vol in node_vols:
1348
        node_output = []
1349
        for field in self.op.output_fields:
1350
          if field == constants.VF_NODE:
1351
            val = self.cfg.GetNodeName(node_uuid)
1352
          elif field == constants.VF_PHYS:
1353
            val = vol[constants.VF_DEV]
1354
          elif field == constants.VF_VG:
1355
            val = vol[constants.VF_VG]
1356
          elif field == constants.VF_NAME:
1357
            val = vol[constants.VF_NAME]
1358
          elif field == constants.VF_SIZE:
1359
            val = int(float(vol[constants.VF_SIZE]))
1360
          elif field == constants.VF_INSTANCE:
1361
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1362
                                 vol[constants.VF_NAME]), None)
1363
            if inst is not None:
1364
              val = inst.name
1365
            else:
1366
              val = "-"
1367
          else:
1368
            raise errors.ParameterError(field)
1369
          node_output.append(str(val))
1370

    
1371
        output.append(node_output)
1372

    
1373
    return output
1374

    
1375

    
1376
class LUNodeQueryStorage(NoHooksLU):
1377
  """Logical unit for getting information on storage units on node(s).
1378

1379
  """
1380
  REQ_BGL = False
1381

    
1382
  def CheckArguments(self):
1383
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1384
                       self.op.output_fields)
1385

    
1386
  def ExpandNames(self):
1387
    self.share_locks = ShareAll()
1388

    
1389
    if self.op.nodes:
1390
      self.needed_locks = {
1391
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1392
        }
1393
    else:
1394
      self.needed_locks = {
1395
        locking.LEVEL_NODE: locking.ALL_SET,
1396
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1397
        }
1398

    
1399
  def _DetermineStorageType(self):
1400
    """Determines the default storage type of the cluster.
1401

1402
    """
1403
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1404
    default_storage_type = \
1405
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1406
    return default_storage_type
1407

    
1408
  def CheckPrereq(self):
1409
    """Check prerequisites.
1410

1411
    """
1412
    if self.op.storage_type:
1413
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1414
      self.storage_type = self.op.storage_type
1415
    else:
1416
      self.storage_type = self._DetermineStorageType()
1417
      if self.storage_type not in constants.STS_REPORT:
1418
        raise errors.OpPrereqError(
1419
            "Storage reporting for storage type '%s' is not supported. Please"
1420
            " use the --storage-type option to specify one of the supported"
1421
            " storage types (%s) or set the default disk template to one that"
1422
            " supports storage reporting." %
1423
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1424

    
1425
  def Exec(self, feedback_fn):
1426
    """Computes the list of nodes and their attributes.
1427

1428
    """
1429
    if self.op.storage_type:
1430
      self.storage_type = self.op.storage_type
1431
    else:
1432
      self.storage_type = self._DetermineStorageType()
1433

    
1434
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1435

    
1436
    # Always get name to sort by
1437
    if constants.SF_NAME in self.op.output_fields:
1438
      fields = self.op.output_fields[:]
1439
    else:
1440
      fields = [constants.SF_NAME] + self.op.output_fields
1441

    
1442
    # Never ask for node or type as it's only known to the LU
1443
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1444
      while extra in fields:
1445
        fields.remove(extra)
1446

    
1447
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1448
    name_idx = field_idx[constants.SF_NAME]
1449

    
1450
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1451
    data = self.rpc.call_storage_list(self.node_uuids,
1452
                                      self.storage_type, st_args,
1453
                                      self.op.name, fields)
1454

    
1455
    result = []
1456

    
1457
    for node_uuid in utils.NiceSort(self.node_uuids):
1458
      node_name = self.cfg.GetNodeName(node_uuid)
1459
      nresult = data[node_uuid]
1460
      if nresult.offline:
1461
        continue
1462

    
1463
      msg = nresult.fail_msg
1464
      if msg:
1465
        self.LogWarning("Can't get storage data from node %s: %s",
1466
                        node_name, msg)
1467
        continue
1468

    
1469
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1470

    
1471
      for name in utils.NiceSort(rows.keys()):
1472
        row = rows[name]
1473

    
1474
        out = []
1475

    
1476
        for field in self.op.output_fields:
1477
          if field == constants.SF_NODE:
1478
            val = node_name
1479
          elif field == constants.SF_TYPE:
1480
            val = self.storage_type
1481
          elif field in field_idx:
1482
            val = row[field_idx[field]]
1483
          else:
1484
            raise errors.ParameterError(field)
1485

    
1486
          out.append(val)
1487

    
1488
        result.append(out)
1489

    
1490
    return result
1491

    
1492

    
1493
class LUNodeRemove(LogicalUnit):
1494
  """Logical unit for removing a node.
1495

1496
  """
1497
  HPATH = "node-remove"
1498
  HTYPE = constants.HTYPE_NODE
1499

    
1500
  def BuildHooksEnv(self):
1501
    """Build hooks env.
1502

1503
    """
1504
    return {
1505
      "OP_TARGET": self.op.node_name,
1506
      "NODE_NAME": self.op.node_name,
1507
      }
1508

    
1509
  def BuildHooksNodes(self):
1510
    """Build hooks nodes.
1511

1512
    This doesn't run on the target node in the pre phase as a failed
1513
    node would then be impossible to remove.
1514

1515
    """
1516
    all_nodes = self.cfg.GetNodeList()
1517
    try:
1518
      all_nodes.remove(self.op.node_uuid)
1519
    except ValueError:
1520
      pass
1521
    return (all_nodes, all_nodes)
1522

    
1523
  def CheckPrereq(self):
1524
    """Check prerequisites.
1525

1526
    This checks:
1527
     - the node exists in the configuration
1528
     - it does not have primary or secondary instances
1529
     - it's not the master
1530

1531
    Any errors are signaled by raising errors.OpPrereqError.
1532

1533
    """
1534
    (self.op.node_uuid, self.op.node_name) = \
1535
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1536
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1537
    assert node is not None
1538

    
1539
    masternode = self.cfg.GetMasterNode()
1540
    if node.uuid == masternode:
1541
      raise errors.OpPrereqError("Node is the master node, failover to another"
1542
                                 " node is required", errors.ECODE_INVAL)
1543

    
1544
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1545
      if node.uuid in instance.all_nodes:
1546
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1547
                                   " please remove first" % instance.name,
1548
                                   errors.ECODE_INVAL)
1549
    self.op.node_name = node.name
1550
    self.node = node
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Removes the node from the cluster.
1554

1555
    """
1556
    logging.info("Stopping the node daemon and removing configs from node %s",
1557
                 self.node.name)
1558

    
1559
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1560

    
1561
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1562
      "Not owning BGL"
1563

    
1564
    # Promote nodes to master candidate as needed
1565
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1566
    self.context.RemoveNode(self.node)
1567

    
1568
    # Run post hooks on the node before it's removed
1569
    RunPostHook(self, self.node.name)
1570

    
1571
    # we have to call this by name rather than by UUID, as the node is no longer
1572
    # in the config
1573
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1574
    msg = result.fail_msg
1575
    if msg:
1576
      self.LogWarning("Errors encountered on the remote node while leaving"
1577
                      " the cluster: %s", msg)
1578

    
1579
    # Remove node from our /etc/hosts
1580
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1581
      master_node_uuid = self.cfg.GetMasterNode()
1582
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1583
                                              constants.ETC_HOSTS_REMOVE,
1584
                                              self.node.name, None)
1585
      result.Raise("Can't update hosts file with new host data")
1586
      RedistributeAncillaryFiles(self)
1587

    
1588

    
1589
class LURepairNodeStorage(NoHooksLU):
1590
  """Repairs the volume group on a node.
1591

1592
  """
1593
  REQ_BGL = False
1594

    
1595
  def CheckArguments(self):
1596
    (self.op.node_uuid, self.op.node_name) = \
1597
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1598

    
1599
    storage_type = self.op.storage_type
1600

    
1601
    if (constants.SO_FIX_CONSISTENCY not in
1602
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1603
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1604
                                 " repaired" % storage_type,
1605
                                 errors.ECODE_INVAL)
1606

    
1607
  def ExpandNames(self):
1608
    self.needed_locks = {
1609
      locking.LEVEL_NODE: [self.op.node_uuid],
1610
      }
1611

    
1612
  def _CheckFaultyDisks(self, instance, node_uuid):
1613
    """Ensure faulty disks abort the opcode or at least warn."""
1614
    try:
1615
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1616
                                 node_uuid, True):
1617
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1618
                                   " node '%s'" %
1619
                                   (instance.name,
1620
                                    self.cfg.GetNodeName(node_uuid)),
1621
                                   errors.ECODE_STATE)
1622
    except errors.OpPrereqError, err:
1623
      if self.op.ignore_consistency:
1624
        self.LogWarning(str(err.args[0]))
1625
      else:
1626
        raise
1627

    
1628
  def CheckPrereq(self):
1629
    """Check prerequisites.
1630

1631
    """
1632
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1633

    
1634
    # Check whether any instance on this node has faulty disks
1635
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1636
      if not inst.disks_active:
1637
        continue
1638
      check_nodes = set(inst.all_nodes)
1639
      check_nodes.discard(self.op.node_uuid)
1640
      for inst_node_uuid in check_nodes:
1641
        self._CheckFaultyDisks(inst, inst_node_uuid)
1642

    
1643
  def Exec(self, feedback_fn):
1644
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1645
                (self.op.name, self.op.node_name))
1646

    
1647
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1648
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1649
                                           self.op.storage_type, st_args,
1650
                                           self.op.name,
1651
                                           constants.SO_FIX_CONSISTENCY)
1652
    result.Raise("Failed to repair storage unit '%s' on %s" %
1653
                 (self.op.name, self.op.node_name))