Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 84ad6b78

History | View | Annotate | Download (59.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
    if self.op.ndparams:
116
      ovs = self.op.ndparams.get(constants.ND_OVS, None)
117
      ovs_name = self.op.ndparams.get(constants.ND_OVS_NAME, None)
118
      ovs_link = self.op.ndparams.get(constants.ND_OVS_LINK, None)
119

    
120
      # OpenvSwitch: Warn user if link is missing
121
      if ovs and not ovs_link:
122
        self.LogInfo("No physical interface for OpenvSwitch was given."
123
                     " OpenvSwitch will not have an outside connection. This"
124
                     " might not be what you want.")
125
      # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
126
      if not ovs and (ovs_name or ovs_link):
127
        raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
128
                                   " OpenvSwitch is not enabled. Please enable"
129
                                   " OpenvSwitch with --ovs",
130
                                   errors.ECODE_INVAL)
131

    
132
  def BuildHooksEnv(self):
133
    """Build hooks env.
134

135
    This will run on all nodes before, and on all nodes + the new node after.
136

137
    """
138
    return {
139
      "OP_TARGET": self.op.node_name,
140
      "NODE_NAME": self.op.node_name,
141
      "NODE_PIP": self.op.primary_ip,
142
      "NODE_SIP": self.op.secondary_ip,
143
      "MASTER_CAPABLE": str(self.op.master_capable),
144
      "VM_CAPABLE": str(self.op.vm_capable),
145
      }
146

    
147
  def BuildHooksNodes(self):
148
    """Build hooks nodes.
149

150
    """
151
    hook_nodes = self.cfg.GetNodeList()
152
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
153
    if new_node_info is not None:
154
      # Exclude added node
155
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
156

    
157
    # add the new node as post hook node by name; it does not have an UUID yet
158
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
159

    
160
  def CheckPrereq(self):
161
    """Check prerequisites.
162

163
    This checks:
164
     - the new node is not already in the config
165
     - it is resolvable
166
     - its parameters (single/dual homed) matches the cluster
167

168
    Any errors are signaled by raising errors.OpPrereqError.
169

170
    """
171
    node_name = self.hostname.name
172
    self.op.primary_ip = self.hostname.ip
173
    if self.op.secondary_ip is None:
174
      if self.primary_ip_family == netutils.IP6Address.family:
175
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
176
                                   " IPv4 address must be given as secondary",
177
                                   errors.ECODE_INVAL)
178
      self.op.secondary_ip = self.op.primary_ip
179

    
180
    secondary_ip = self.op.secondary_ip
181
    if not netutils.IP4Address.IsValid(secondary_ip):
182
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
183
                                 " address" % secondary_ip, errors.ECODE_INVAL)
184

    
185
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
186
    if not self.op.readd and existing_node_info is not None:
187
      raise errors.OpPrereqError("Node %s is already in the configuration" %
188
                                 node_name, errors.ECODE_EXISTS)
189
    elif self.op.readd and existing_node_info is None:
190
      raise errors.OpPrereqError("Node %s is not in the configuration" %
191
                                 node_name, errors.ECODE_NOENT)
192

    
193
    self.changed_primary_ip = False
194

    
195
    for existing_node in self.cfg.GetAllNodesInfo().values():
196
      if self.op.readd and node_name == existing_node.name:
197
        if existing_node.secondary_ip != secondary_ip:
198
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
199
                                     " address configuration as before",
200
                                     errors.ECODE_INVAL)
201
        if existing_node.primary_ip != self.op.primary_ip:
202
          self.changed_primary_ip = True
203

    
204
        continue
205

    
206
      if (existing_node.primary_ip == self.op.primary_ip or
207
          existing_node.secondary_ip == self.op.primary_ip or
208
          existing_node.primary_ip == secondary_ip or
209
          existing_node.secondary_ip == secondary_ip):
210
        raise errors.OpPrereqError("New node ip address(es) conflict with"
211
                                   " existing node %s" % existing_node.name,
212
                                   errors.ECODE_NOTUNIQUE)
213

    
214
    # After this 'if' block, None is no longer a valid value for the
215
    # _capable op attributes
216
    if self.op.readd:
217
      assert existing_node_info is not None, \
218
        "Can't retrieve locked node %s" % node_name
219
      for attr in self._NFLAGS:
220
        if getattr(self.op, attr) is None:
221
          setattr(self.op, attr, getattr(existing_node_info, attr))
222
    else:
223
      for attr in self._NFLAGS:
224
        if getattr(self.op, attr) is None:
225
          setattr(self.op, attr, True)
226

    
227
    if self.op.readd and not self.op.vm_capable:
228
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
229
      if pri or sec:
230
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
231
                                   " flag set to false, but it already holds"
232
                                   " instances" % node_name,
233
                                   errors.ECODE_STATE)
234

    
235
    # check that the type of the node (single versus dual homed) is the
236
    # same as for the master
237
    myself = self.cfg.GetMasterNodeInfo()
238
    master_singlehomed = myself.secondary_ip == myself.primary_ip
239
    newbie_singlehomed = secondary_ip == self.op.primary_ip
240
    if master_singlehomed != newbie_singlehomed:
241
      if master_singlehomed:
242
        raise errors.OpPrereqError("The master has no secondary ip but the"
243
                                   " new node has one",
244
                                   errors.ECODE_INVAL)
245
      else:
246
        raise errors.OpPrereqError("The master has a secondary ip but the"
247
                                   " new node doesn't have one",
248
                                   errors.ECODE_INVAL)
249

    
250
    # checks reachability
251
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
252
      raise errors.OpPrereqError("Node not reachable by ping",
253
                                 errors.ECODE_ENVIRON)
254

    
255
    if not newbie_singlehomed:
256
      # check reachability from my secondary ip to newbie's secondary ip
257
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
258
                              source=myself.secondary_ip):
259
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
260
                                   " based ping to node daemon port",
261
                                   errors.ECODE_ENVIRON)
262

    
263
    if self.op.readd:
264
      exceptions = [existing_node_info.uuid]
265
    else:
266
      exceptions = []
267

    
268
    if self.op.master_capable:
269
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
270
    else:
271
      self.master_candidate = False
272

    
273
    if self.op.readd:
274
      self.new_node = existing_node_info
275
    else:
276
      node_group = self.cfg.LookupNodeGroup(self.op.group)
277
      self.new_node = objects.Node(name=node_name,
278
                                   primary_ip=self.op.primary_ip,
279
                                   secondary_ip=secondary_ip,
280
                                   master_candidate=self.master_candidate,
281
                                   offline=False, drained=False,
282
                                   group=node_group, ndparams={})
283

    
284
    if self.op.ndparams:
285
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
286
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
287
                           "node", "cluster or group")
288

    
289
    if self.op.hv_state:
290
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
291

    
292
    if self.op.disk_state:
293
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
294

    
295
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
296
    #       it a property on the base class.
297
    rpcrunner = rpc.DnsOnlyRunner()
298
    result = rpcrunner.call_version([node_name])[node_name]
299
    result.Raise("Can't get version information from node %s" % node_name)
300
    if constants.PROTOCOL_VERSION == result.payload:
301
      logging.info("Communication to node %s fine, sw version %s match",
302
                   node_name, result.payload)
303
    else:
304
      raise errors.OpPrereqError("Version mismatch master version %s,"
305
                                 " node version %s" %
306
                                 (constants.PROTOCOL_VERSION, result.payload),
307
                                 errors.ECODE_ENVIRON)
308

    
309
    vg_name = self.cfg.GetVGName()
310
    if vg_name is not None:
311
      vparams = {constants.NV_PVLIST: [vg_name]}
312
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
313
      cname = self.cfg.GetClusterName()
314
      result = rpcrunner.call_node_verify_light(
315
          [node_name], vparams, cname,
316
          self.cfg.GetClusterInfo().hvparams)[node_name]
317
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318
      if errmsgs:
319
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
321

    
322
  def Exec(self, feedback_fn):
323
    """Adds the new node to the cluster.
324

325
    """
326
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
327
      "Not owning BGL"
328

    
329
    # We adding a new node so we assume it's powered
330
    self.new_node.powered = True
331

    
332
    # for re-adds, reset the offline/drained/master-candidate flags;
333
    # we need to reset here, otherwise offline would prevent RPC calls
334
    # later in the procedure; this also means that if the re-add
335
    # fails, we are left with a non-offlined, broken node
336
    if self.op.readd:
337
      self.new_node.drained = False
338
      self.LogInfo("Readding a node, the offline/drained flags were reset")
339
      # if we demote the node, we do cleanup later in the procedure
340
      self.new_node.master_candidate = self.master_candidate
341
      if self.changed_primary_ip:
342
        self.new_node.primary_ip = self.op.primary_ip
343

    
344
    # copy the master/vm_capable flags
345
    for attr in self._NFLAGS:
346
      setattr(self.new_node, attr, getattr(self.op, attr))
347

    
348
    # notify the user about any possible mc promotion
349
    if self.new_node.master_candidate:
350
      self.LogInfo("Node will be a master candidate")
351

    
352
    if self.op.ndparams:
353
      self.new_node.ndparams = self.op.ndparams
354
    else:
355
      self.new_node.ndparams = {}
356

    
357
    if self.op.hv_state:
358
      self.new_node.hv_state_static = self.new_hv_state
359

    
360
    if self.op.disk_state:
361
      self.new_node.disk_state_static = self.new_disk_state
362

    
363
    # Add node to our /etc/hosts, and add key to known_hosts
364
    if self.cfg.GetClusterInfo().modify_etc_hosts:
365
      master_node = self.cfg.GetMasterNode()
366
      result = self.rpc.call_etc_hosts_modify(
367
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
368
                 self.hostname.ip)
369
      result.Raise("Can't update hosts file with new host data")
370

    
371
    if self.new_node.secondary_ip != self.new_node.primary_ip:
372
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
373
                               False)
374

    
375
    node_verifier_uuids = [self.cfg.GetMasterNode()]
376
    node_verify_param = {
377
      constants.NV_NODELIST: ([self.new_node.name], {}),
378
      # TODO: do a node-net-test as well?
379
    }
380

    
381
    result = self.rpc.call_node_verify(
382
               node_verifier_uuids, node_verify_param,
383
               self.cfg.GetClusterName(),
384
               self.cfg.GetClusterInfo().hvparams)
385
    for verifier in node_verifier_uuids:
386
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
387
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
388
      if nl_payload:
389
        for failed in nl_payload:
390
          feedback_fn("ssh/hostname verification failed"
391
                      " (checking from %s): %s" %
392
                      (verifier, nl_payload[failed]))
393
        raise errors.OpExecError("ssh/hostname verification failed")
394

    
395
    # OpenvSwitch initialization on the node
396
    ovs = self.new_node.ndparams.get(constants.ND_OVS, None)
397
    ovs_name = self.new_node.ndparams.get(constants.ND_OVS_NAME, None)
398
    ovs_link = self.new_node.ndparams.get(constants.ND_OVS_LINK, None)
399

    
400
    if ovs:
401
      result = self.rpc.call_node_configure_ovs(
402
                 self.new_node.name, ovs_name, ovs_link)
403

    
404
    if self.op.readd:
405
      self.context.ReaddNode(self.new_node)
406
      RedistributeAncillaryFiles(self)
407
      # make sure we redistribute the config
408
      self.cfg.Update(self.new_node, feedback_fn)
409
      # and make sure the new node will not have old files around
410
      if not self.new_node.master_candidate:
411
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
412
        result.Warn("Node failed to demote itself from master candidate status",
413
                    self.LogWarning)
414
    else:
415
      self.context.AddNode(self.new_node, self.proc.GetECId())
416
      RedistributeAncillaryFiles(self)
417

    
418

    
419
class LUNodeSetParams(LogicalUnit):
420
  """Modifies the parameters of a node.
421

422
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
423
      to the node role (as _ROLE_*)
424
  @cvar _R2F: a dictionary from node role to tuples of flags
425
  @cvar _FLAGS: a list of attribute names corresponding to the flags
426

427
  """
428
  HPATH = "node-modify"
429
  HTYPE = constants.HTYPE_NODE
430
  REQ_BGL = False
431
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
432
  _F2R = {
433
    (True, False, False): _ROLE_CANDIDATE,
434
    (False, True, False): _ROLE_DRAINED,
435
    (False, False, True): _ROLE_OFFLINE,
436
    (False, False, False): _ROLE_REGULAR,
437
    }
438
  _R2F = dict((v, k) for k, v in _F2R.items())
439
  _FLAGS = ["master_candidate", "drained", "offline"]
440

    
441
  def CheckArguments(self):
442
    (self.op.node_uuid, self.op.node_name) = \
443
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
444
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
445
                self.op.master_capable, self.op.vm_capable,
446
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
447
                self.op.disk_state]
448
    if all_mods.count(None) == len(all_mods):
449
      raise errors.OpPrereqError("Please pass at least one modification",
450
                                 errors.ECODE_INVAL)
451
    if all_mods.count(True) > 1:
452
      raise errors.OpPrereqError("Can't set the node into more than one"
453
                                 " state at the same time",
454
                                 errors.ECODE_INVAL)
455

    
456
    # Boolean value that tells us whether we might be demoting from MC
457
    self.might_demote = (self.op.master_candidate is False or
458
                         self.op.offline is True or
459
                         self.op.drained is True or
460
                         self.op.master_capable is False)
461

    
462
    if self.op.secondary_ip:
463
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
464
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
465
                                   " address" % self.op.secondary_ip,
466
                                   errors.ECODE_INVAL)
467

    
468
    self.lock_all = self.op.auto_promote and self.might_demote
469
    self.lock_instances = self.op.secondary_ip is not None
470

    
471
  def _InstanceFilter(self, instance):
472
    """Filter for getting affected instances.
473

474
    """
475
    return (instance.disk_template in constants.DTS_INT_MIRROR and
476
            self.op.node_uuid in instance.all_nodes)
477

    
478
  def ExpandNames(self):
479
    if self.lock_all:
480
      self.needed_locks = {
481
        locking.LEVEL_NODE: locking.ALL_SET,
482

    
483
        # Block allocations when all nodes are locked
484
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
485
        }
486
    else:
487
      self.needed_locks = {
488
        locking.LEVEL_NODE: self.op.node_uuid,
489
        }
490

    
491
    # Since modifying a node can have severe effects on currently running
492
    # operations the resource lock is at least acquired in shared mode
493
    self.needed_locks[locking.LEVEL_NODE_RES] = \
494
      self.needed_locks[locking.LEVEL_NODE]
495

    
496
    # Get all locks except nodes in shared mode; they are not used for anything
497
    # but read-only access
498
    self.share_locks = ShareAll()
499
    self.share_locks[locking.LEVEL_NODE] = 0
500
    self.share_locks[locking.LEVEL_NODE_RES] = 0
501
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
502

    
503
    if self.lock_instances:
504
      self.needed_locks[locking.LEVEL_INSTANCE] = \
505
        self.cfg.GetInstanceNames(
506
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
507

    
508
  def BuildHooksEnv(self):
509
    """Build hooks env.
510

511
    This runs on the master node.
512

513
    """
514
    return {
515
      "OP_TARGET": self.op.node_name,
516
      "MASTER_CANDIDATE": str(self.op.master_candidate),
517
      "OFFLINE": str(self.op.offline),
518
      "DRAINED": str(self.op.drained),
519
      "MASTER_CAPABLE": str(self.op.master_capable),
520
      "VM_CAPABLE": str(self.op.vm_capable),
521
      }
522

    
523
  def BuildHooksNodes(self):
524
    """Build hooks nodes.
525

526
    """
527
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
528
    return (nl, nl)
529

    
530
  def CheckPrereq(self):
531
    """Check prerequisites.
532

533
    This only checks the instance list against the existing names.
534

535
    """
536
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
537
    if self.lock_instances:
538
      affected_instances = \
539
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
540

    
541
      # Verify instance locks
542
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
543
      wanted_instance_names = frozenset([inst.name for inst in
544
                                         affected_instances.values()])
545
      if wanted_instance_names - owned_instance_names:
546
        raise errors.OpPrereqError("Instances affected by changing node %s's"
547
                                   " secondary IP address have changed since"
548
                                   " locks were acquired, wanted '%s', have"
549
                                   " '%s'; retry the operation" %
550
                                   (node.name,
551
                                    utils.CommaJoin(wanted_instance_names),
552
                                    utils.CommaJoin(owned_instance_names)),
553
                                   errors.ECODE_STATE)
554
    else:
555
      affected_instances = None
556

    
557
    if (self.op.master_candidate is not None or
558
        self.op.drained is not None or
559
        self.op.offline is not None):
560
      # we can't change the master's node flags
561
      if node.uuid == self.cfg.GetMasterNode():
562
        raise errors.OpPrereqError("The master role can be changed"
563
                                   " only via master-failover",
564
                                   errors.ECODE_INVAL)
565

    
566
    if self.op.master_candidate and not node.master_capable:
567
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
568
                                 " it a master candidate" % node.name,
569
                                 errors.ECODE_STATE)
570

    
571
    if self.op.vm_capable is False:
572
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
573
      if ipri or isec:
574
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
575
                                   " the vm_capable flag" % node.name,
576
                                   errors.ECODE_STATE)
577

    
578
    if node.master_candidate and self.might_demote and not self.lock_all:
579
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
580
      # check if after removing the current node, we're missing master
581
      # candidates
582
      (mc_remaining, mc_should, _) = \
583
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
584
      if mc_remaining < mc_should:
585
        raise errors.OpPrereqError("Not enough master candidates, please"
586
                                   " pass auto promote option to allow"
587
                                   " promotion (--auto-promote or RAPI"
588
                                   " auto_promote=True)", errors.ECODE_STATE)
589

    
590
    self.old_flags = old_flags = (node.master_candidate,
591
                                  node.drained, node.offline)
592
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
593
    self.old_role = old_role = self._F2R[old_flags]
594

    
595
    # Check for ineffective changes
596
    for attr in self._FLAGS:
597
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
598
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
599
        setattr(self.op, attr, None)
600

    
601
    # Past this point, any flag change to False means a transition
602
    # away from the respective state, as only real changes are kept
603

    
604
    # TODO: We might query the real power state if it supports OOB
605
    if SupportsOob(self.cfg, node):
606
      if self.op.offline is False and not (node.powered or
607
                                           self.op.powered is True):
608
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
609
                                    " offline status can be reset") %
610
                                   self.op.node_name, errors.ECODE_STATE)
611
    elif self.op.powered is not None:
612
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
613
                                  " as it does not support out-of-band"
614
                                  " handling") % self.op.node_name,
615
                                 errors.ECODE_STATE)
616

    
617
    # If we're being deofflined/drained, we'll MC ourself if needed
618
    if (self.op.drained is False or self.op.offline is False or
619
        (self.op.master_capable and not node.master_capable)):
620
      if _DecideSelfPromotion(self):
621
        self.op.master_candidate = True
622
        self.LogInfo("Auto-promoting node to master candidate")
623

    
624
    # If we're no longer master capable, we'll demote ourselves from MC
625
    if self.op.master_capable is False and node.master_candidate:
626
      self.LogInfo("Demoting from master candidate")
627
      self.op.master_candidate = False
628

    
629
    # Compute new role
630
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
631
    if self.op.master_candidate:
632
      new_role = self._ROLE_CANDIDATE
633
    elif self.op.drained:
634
      new_role = self._ROLE_DRAINED
635
    elif self.op.offline:
636
      new_role = self._ROLE_OFFLINE
637
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
638
      # False is still in new flags, which means we're un-setting (the
639
      # only) True flag
640
      new_role = self._ROLE_REGULAR
641
    else: # no new flags, nothing, keep old role
642
      new_role = old_role
643

    
644
    self.new_role = new_role
645

    
646
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
647
      # Trying to transition out of offline status
648
      result = self.rpc.call_version([node.uuid])[node.uuid]
649
      if result.fail_msg:
650
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
651
                                   " to report its version: %s" %
652
                                   (node.name, result.fail_msg),
653
                                   errors.ECODE_STATE)
654
      else:
655
        self.LogWarning("Transitioning node from offline to online state"
656
                        " without using re-add. Please make sure the node"
657
                        " is healthy!")
658

    
659
    # When changing the secondary ip, verify if this is a single-homed to
660
    # multi-homed transition or vice versa, and apply the relevant
661
    # restrictions.
662
    if self.op.secondary_ip:
663
      # Ok even without locking, because this can't be changed by any LU
664
      master = self.cfg.GetMasterNodeInfo()
665
      master_singlehomed = master.secondary_ip == master.primary_ip
666
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
667
        if self.op.force and node.uuid == master.uuid:
668
          self.LogWarning("Transitioning from single-homed to multi-homed"
669
                          " cluster; all nodes will require a secondary IP"
670
                          " address")
671
        else:
672
          raise errors.OpPrereqError("Changing the secondary ip on a"
673
                                     " single-homed cluster requires the"
674
                                     " --force option to be passed, and the"
675
                                     " target node to be the master",
676
                                     errors.ECODE_INVAL)
677
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
678
        if self.op.force and node.uuid == master.uuid:
679
          self.LogWarning("Transitioning from multi-homed to single-homed"
680
                          " cluster; secondary IP addresses will have to be"
681
                          " removed")
682
        else:
683
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
684
                                     " same as the primary IP on a multi-homed"
685
                                     " cluster, unless the --force option is"
686
                                     " passed, and the target node is the"
687
                                     " master", errors.ECODE_INVAL)
688

    
689
      assert not (set([inst.name for inst in affected_instances.values()]) -
690
                  self.owned_locks(locking.LEVEL_INSTANCE))
691

    
692
      if node.offline:
693
        if affected_instances:
694
          msg = ("Cannot change secondary IP address: offline node has"
695
                 " instances (%s) configured to use it" %
696
                 utils.CommaJoin(
697
                   [inst.name for inst in affected_instances.values()]))
698
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
699
      else:
700
        # On online nodes, check that no instances are running, and that
701
        # the node has the new ip and we can reach it.
702
        for instance in affected_instances.values():
703
          CheckInstanceState(self, instance, INSTANCE_DOWN,
704
                             msg="cannot change secondary ip")
705

    
706
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
707
        if master.uuid != node.uuid:
708
          # check reachability from master secondary ip to new secondary ip
709
          if not netutils.TcpPing(self.op.secondary_ip,
710
                                  constants.DEFAULT_NODED_PORT,
711
                                  source=master.secondary_ip):
712
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
713
                                       " based ping to node daemon port",
714
                                       errors.ECODE_ENVIRON)
715

    
716
    if self.op.ndparams:
717
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
718
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
719
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
720
                           "node", "cluster or group")
721
      self.new_ndparams = new_ndparams
722

    
723
    if self.op.hv_state:
724
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
725
                                                node.hv_state_static)
726

    
727
    if self.op.disk_state:
728
      self.new_disk_state = \
729
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
730

    
731
  def Exec(self, feedback_fn):
732
    """Modifies a node.
733

734
    """
735
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
736
    result = []
737

    
738
    if self.op.ndparams:
739
      node.ndparams = self.new_ndparams
740

    
741
    if self.op.powered is not None:
742
      node.powered = self.op.powered
743

    
744
    if self.op.hv_state:
745
      node.hv_state_static = self.new_hv_state
746

    
747
    if self.op.disk_state:
748
      node.disk_state_static = self.new_disk_state
749

    
750
    for attr in ["master_capable", "vm_capable"]:
751
      val = getattr(self.op, attr)
752
      if val is not None:
753
        setattr(node, attr, val)
754
        result.append((attr, str(val)))
755

    
756
    if self.new_role != self.old_role:
757
      # Tell the node to demote itself, if no longer MC and not offline
758
      if self.old_role == self._ROLE_CANDIDATE and \
759
          self.new_role != self._ROLE_OFFLINE:
760
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
761
        if msg:
762
          self.LogWarning("Node failed to demote itself: %s", msg)
763

    
764
      new_flags = self._R2F[self.new_role]
765
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
766
        if of != nf:
767
          result.append((desc, str(nf)))
768
      (node.master_candidate, node.drained, node.offline) = new_flags
769

    
770
      # we locked all nodes, we adjust the CP before updating this node
771
      if self.lock_all:
772
        AdjustCandidatePool(self, [node.uuid])
773

    
774
    if self.op.secondary_ip:
775
      node.secondary_ip = self.op.secondary_ip
776
      result.append(("secondary_ip", self.op.secondary_ip))
777

    
778
    # this will trigger configuration file update, if needed
779
    self.cfg.Update(node, feedback_fn)
780

    
781
    # this will trigger job queue propagation or cleanup if the mc
782
    # flag changed
783
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
784
      self.context.ReaddNode(node)
785

    
786
    return result
787

    
788

    
789
class LUNodePowercycle(NoHooksLU):
790
  """Powercycles a node.
791

792
  """
793
  REQ_BGL = False
794

    
795
  def CheckArguments(self):
796
    (self.op.node_uuid, self.op.node_name) = \
797
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
798

    
799
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
800
      raise errors.OpPrereqError("The node is the master and the force"
801
                                 " parameter was not set",
802
                                 errors.ECODE_INVAL)
803

    
804
  def ExpandNames(self):
805
    """Locking for PowercycleNode.
806

807
    This is a last-resort option and shouldn't block on other
808
    jobs. Therefore, we grab no locks.
809

810
    """
811
    self.needed_locks = {}
812

    
813
  def Exec(self, feedback_fn):
814
    """Reboots a node.
815

816
    """
817
    default_hypervisor = self.cfg.GetHypervisorType()
818
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
819
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
820
                                           default_hypervisor,
821
                                           hvparams)
822
    result.Raise("Failed to schedule the reboot")
823
    return result.payload
824

    
825

    
826
def _GetNodeInstancesInner(cfg, fn):
827
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
828

    
829

    
830
def _GetNodePrimaryInstances(cfg, node_uuid):
831
  """Returns primary instances on a node.
832

833
  """
834
  return _GetNodeInstancesInner(cfg,
835
                                lambda inst: node_uuid == inst.primary_node)
836

    
837

    
838
def _GetNodeSecondaryInstances(cfg, node_uuid):
839
  """Returns secondary instances on a node.
840

841
  """
842
  return _GetNodeInstancesInner(cfg,
843
                                lambda inst: node_uuid in inst.secondary_nodes)
844

    
845

    
846
def _GetNodeInstances(cfg, node_uuid):
847
  """Returns a list of all primary and secondary instances on a node.
848

849
  """
850

    
851
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
852

    
853

    
854
class LUNodeEvacuate(NoHooksLU):
855
  """Evacuates instances off a list of nodes.
856

857
  """
858
  REQ_BGL = False
859

    
860
  def CheckArguments(self):
861
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
862

    
863
  def ExpandNames(self):
864
    (self.op.node_uuid, self.op.node_name) = \
865
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
866

    
867
    if self.op.remote_node is not None:
868
      (self.op.remote_node_uuid, self.op.remote_node) = \
869
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
870
                              self.op.remote_node)
871
      assert self.op.remote_node
872

    
873
      if self.op.node_uuid == self.op.remote_node_uuid:
874
        raise errors.OpPrereqError("Can not use evacuated node as a new"
875
                                   " secondary node", errors.ECODE_INVAL)
876

    
877
      if self.op.mode != constants.NODE_EVAC_SEC:
878
        raise errors.OpPrereqError("Without the use of an iallocator only"
879
                                   " secondary instances can be evacuated",
880
                                   errors.ECODE_INVAL)
881

    
882
    # Declare locks
883
    self.share_locks = ShareAll()
884
    self.needed_locks = {
885
      locking.LEVEL_INSTANCE: [],
886
      locking.LEVEL_NODEGROUP: [],
887
      locking.LEVEL_NODE: [],
888
      }
889

    
890
    # Determine nodes (via group) optimistically, needs verification once locks
891
    # have been acquired
892
    self.lock_nodes = self._DetermineNodes()
893

    
894
  def _DetermineNodes(self):
895
    """Gets the list of node UUIDs to operate on.
896

897
    """
898
    if self.op.remote_node is None:
899
      # Iallocator will choose any node(s) in the same group
900
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
901
    else:
902
      group_nodes = frozenset([self.op.remote_node_uuid])
903

    
904
    # Determine nodes to be locked
905
    return set([self.op.node_uuid]) | group_nodes
906

    
907
  def _DetermineInstances(self):
908
    """Builds list of instances to operate on.
909

910
    """
911
    assert self.op.mode in constants.NODE_EVAC_MODES
912

    
913
    if self.op.mode == constants.NODE_EVAC_PRI:
914
      # Primary instances only
915
      inst_fn = _GetNodePrimaryInstances
916
      assert self.op.remote_node is None, \
917
        "Evacuating primary instances requires iallocator"
918
    elif self.op.mode == constants.NODE_EVAC_SEC:
919
      # Secondary instances only
920
      inst_fn = _GetNodeSecondaryInstances
921
    else:
922
      # All instances
923
      assert self.op.mode == constants.NODE_EVAC_ALL
924
      inst_fn = _GetNodeInstances
925
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
926
      # per instance
927
      raise errors.OpPrereqError("Due to an issue with the iallocator"
928
                                 " interface it is not possible to evacuate"
929
                                 " all instances at once; specify explicitly"
930
                                 " whether to evacuate primary or secondary"
931
                                 " instances",
932
                                 errors.ECODE_INVAL)
933

    
934
    return inst_fn(self.cfg, self.op.node_uuid)
935

    
936
  def DeclareLocks(self, level):
937
    if level == locking.LEVEL_INSTANCE:
938
      # Lock instances optimistically, needs verification once node and group
939
      # locks have been acquired
940
      self.needed_locks[locking.LEVEL_INSTANCE] = \
941
        set(i.name for i in self._DetermineInstances())
942

    
943
    elif level == locking.LEVEL_NODEGROUP:
944
      # Lock node groups for all potential target nodes optimistically, needs
945
      # verification once nodes have been acquired
946
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
947
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
948

    
949
    elif level == locking.LEVEL_NODE:
950
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
951

    
952
  def CheckPrereq(self):
953
    # Verify locks
954
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
955
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
956
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
957

    
958
    need_nodes = self._DetermineNodes()
959

    
960
    if not owned_nodes.issuperset(need_nodes):
961
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
962
                                 " locks were acquired, current nodes are"
963
                                 " are '%s', used to be '%s'; retry the"
964
                                 " operation" %
965
                                 (self.op.node_name,
966
                                  utils.CommaJoin(need_nodes),
967
                                  utils.CommaJoin(owned_nodes)),
968
                                 errors.ECODE_STATE)
969

    
970
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
971
    if owned_groups != wanted_groups:
972
      raise errors.OpExecError("Node groups changed since locks were acquired,"
973
                               " current groups are '%s', used to be '%s';"
974
                               " retry the operation" %
975
                               (utils.CommaJoin(wanted_groups),
976
                                utils.CommaJoin(owned_groups)))
977

    
978
    # Determine affected instances
979
    self.instances = self._DetermineInstances()
980
    self.instance_names = [i.name for i in self.instances]
981

    
982
    if set(self.instance_names) != owned_instance_names:
983
      raise errors.OpExecError("Instances on node '%s' changed since locks"
984
                               " were acquired, current instances are '%s',"
985
                               " used to be '%s'; retry the operation" %
986
                               (self.op.node_name,
987
                                utils.CommaJoin(self.instance_names),
988
                                utils.CommaJoin(owned_instance_names)))
989

    
990
    if self.instance_names:
991
      self.LogInfo("Evacuating instances from node '%s': %s",
992
                   self.op.node_name,
993
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
994
    else:
995
      self.LogInfo("No instances to evacuate from node '%s'",
996
                   self.op.node_name)
997

    
998
    if self.op.remote_node is not None:
999
      for i in self.instances:
1000
        if i.primary_node == self.op.remote_node_uuid:
1001
          raise errors.OpPrereqError("Node %s is the primary node of"
1002
                                     " instance %s, cannot use it as"
1003
                                     " secondary" %
1004
                                     (self.op.remote_node, i.name),
1005
                                     errors.ECODE_INVAL)
1006

    
1007
  def Exec(self, feedback_fn):
1008
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1009

    
1010
    if not self.instance_names:
1011
      # No instances to evacuate
1012
      jobs = []
1013

    
1014
    elif self.op.iallocator is not None:
1015
      # TODO: Implement relocation to other group
1016
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1017
                                     instances=list(self.instance_names))
1018
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1019

    
1020
      ial.Run(self.op.iallocator)
1021

    
1022
      if not ial.success:
1023
        raise errors.OpPrereqError("Can't compute node evacuation using"
1024
                                   " iallocator '%s': %s" %
1025
                                   (self.op.iallocator, ial.info),
1026
                                   errors.ECODE_NORES)
1027

    
1028
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1029

    
1030
    elif self.op.remote_node is not None:
1031
      assert self.op.mode == constants.NODE_EVAC_SEC
1032
      jobs = [
1033
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1034
                                        remote_node=self.op.remote_node,
1035
                                        disks=[],
1036
                                        mode=constants.REPLACE_DISK_CHG,
1037
                                        early_release=self.op.early_release)]
1038
        for instance_name in self.instance_names]
1039

    
1040
    else:
1041
      raise errors.ProgrammerError("No iallocator or remote node")
1042

    
1043
    return ResultWithJobs(jobs)
1044

    
1045

    
1046
class LUNodeMigrate(LogicalUnit):
1047
  """Migrate all instances from a node.
1048

1049
  """
1050
  HPATH = "node-migrate"
1051
  HTYPE = constants.HTYPE_NODE
1052
  REQ_BGL = False
1053

    
1054
  def CheckArguments(self):
1055
    pass
1056

    
1057
  def ExpandNames(self):
1058
    (self.op.node_uuid, self.op.node_name) = \
1059
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1060

    
1061
    self.share_locks = ShareAll()
1062
    self.needed_locks = {
1063
      locking.LEVEL_NODE: [self.op.node_uuid],
1064
      }
1065

    
1066
  def BuildHooksEnv(self):
1067
    """Build hooks env.
1068

1069
    This runs on the master, the primary and all the secondaries.
1070

1071
    """
1072
    return {
1073
      "NODE_NAME": self.op.node_name,
1074
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1075
      }
1076

    
1077
  def BuildHooksNodes(self):
1078
    """Build hooks nodes.
1079

1080
    """
1081
    nl = [self.cfg.GetMasterNode()]
1082
    return (nl, nl)
1083

    
1084
  def CheckPrereq(self):
1085
    pass
1086

    
1087
  def Exec(self, feedback_fn):
1088
    # Prepare jobs for migration instances
1089
    jobs = [
1090
      [opcodes.OpInstanceMigrate(
1091
        instance_name=inst.name,
1092
        mode=self.op.mode,
1093
        live=self.op.live,
1094
        iallocator=self.op.iallocator,
1095
        target_node=self.op.target_node,
1096
        allow_runtime_changes=self.op.allow_runtime_changes,
1097
        ignore_ipolicy=self.op.ignore_ipolicy)]
1098
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1099

    
1100
    # TODO: Run iallocator in this opcode and pass correct placement options to
1101
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1102
    # running the iallocator and the actual migration, a good consistency model
1103
    # will have to be found.
1104

    
1105
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1106
            frozenset([self.op.node_uuid]))
1107

    
1108
    return ResultWithJobs(jobs)
1109

    
1110

    
1111
def _GetStorageTypeArgs(cfg, storage_type):
1112
  """Returns the arguments for a storage type.
1113

1114
  """
1115
  # Special case for file storage
1116
  if storage_type == constants.ST_FILE:
1117
    # storage.FileStorage wants a list of storage directories
1118
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1119

    
1120
  return []
1121

    
1122

    
1123
class LUNodeModifyStorage(NoHooksLU):
1124
  """Logical unit for modifying a storage volume on a node.
1125

1126
  """
1127
  REQ_BGL = False
1128

    
1129
  def CheckArguments(self):
1130
    (self.op.node_uuid, self.op.node_name) = \
1131
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1132

    
1133
    storage_type = self.op.storage_type
1134

    
1135
    try:
1136
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1137
    except KeyError:
1138
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1139
                                 " modified" % storage_type,
1140
                                 errors.ECODE_INVAL)
1141

    
1142
    diff = set(self.op.changes.keys()) - modifiable
1143
    if diff:
1144
      raise errors.OpPrereqError("The following fields can not be modified for"
1145
                                 " storage units of type '%s': %r" %
1146
                                 (storage_type, list(diff)),
1147
                                 errors.ECODE_INVAL)
1148

    
1149
  def CheckPrereq(self):
1150
    """Check prerequisites.
1151

1152
    """
1153
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1154

    
1155
  def ExpandNames(self):
1156
    self.needed_locks = {
1157
      locking.LEVEL_NODE: self.op.node_uuid,
1158
      }
1159

    
1160
  def Exec(self, feedback_fn):
1161
    """Computes the list of nodes and their attributes.
1162

1163
    """
1164
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1165
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1166
                                          self.op.storage_type, st_args,
1167
                                          self.op.name, self.op.changes)
1168
    result.Raise("Failed to modify storage unit '%s' on %s" %
1169
                 (self.op.name, self.op.node_name))
1170

    
1171

    
1172
class NodeQuery(QueryBase):
1173
  FIELDS = query.NODE_FIELDS
1174

    
1175
  def ExpandNames(self, lu):
1176
    lu.needed_locks = {}
1177
    lu.share_locks = ShareAll()
1178

    
1179
    if self.names:
1180
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1181
    else:
1182
      self.wanted = locking.ALL_SET
1183

    
1184
    self.do_locking = (self.use_locking and
1185
                       query.NQ_LIVE in self.requested_data)
1186

    
1187
    if self.do_locking:
1188
      # If any non-static field is requested we need to lock the nodes
1189
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1190
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1191

    
1192
  def DeclareLocks(self, lu, level):
1193
    pass
1194

    
1195
  def _GetQueryData(self, lu):
1196
    """Computes the list of nodes and their attributes.
1197

1198
    """
1199
    all_info = lu.cfg.GetAllNodesInfo()
1200

    
1201
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1202

    
1203
    # Gather data as requested
1204
    if query.NQ_LIVE in self.requested_data:
1205
      # filter out non-vm_capable nodes
1206
      toquery_node_uuids = [node.uuid for node in all_info.values()
1207
                            if node.vm_capable and node.uuid in node_uuids]
1208
      lvm_enabled = utils.storage.IsLvmEnabled(
1209
          lu.cfg.GetClusterInfo().enabled_disk_templates)
1210
      # FIXME: this per default asks for storage space information for all
1211
      # enabled disk templates. Fix this by making it possible to specify
1212
      # space report fields for specific disk templates.
1213
      raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1214
          lu.cfg, include_spindles=lvm_enabled)
1215
      storage_units = rpc.PrepareStorageUnitsForNodes(
1216
          lu.cfg, raw_storage_units, toquery_node_uuids)
1217
      default_hypervisor = lu.cfg.GetHypervisorType()
1218
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1219
      hvspecs = [(default_hypervisor, hvparams)]
1220
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1221
                                        hvspecs)
1222
      live_data = dict(
1223
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1224
                                        require_spindles=lvm_enabled))
1225
          for (uuid, nresult) in node_data.items()
1226
          if not nresult.fail_msg and nresult.payload)
1227
    else:
1228
      live_data = None
1229

    
1230
    if query.NQ_INST in self.requested_data:
1231
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1232
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1233

    
1234
      inst_data = lu.cfg.GetAllInstancesInfo()
1235
      inst_uuid_to_inst_name = {}
1236

    
1237
      for inst in inst_data.values():
1238
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1239
        if inst.primary_node in node_to_primary:
1240
          node_to_primary[inst.primary_node].add(inst.uuid)
1241
        for secnode in inst.secondary_nodes:
1242
          if secnode in node_to_secondary:
1243
            node_to_secondary[secnode].add(inst.uuid)
1244
    else:
1245
      node_to_primary = None
1246
      node_to_secondary = None
1247
      inst_uuid_to_inst_name = None
1248

    
1249
    if query.NQ_OOB in self.requested_data:
1250
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1251
                         for uuid, node in all_info.iteritems())
1252
    else:
1253
      oob_support = None
1254

    
1255
    if query.NQ_GROUP in self.requested_data:
1256
      groups = lu.cfg.GetAllNodeGroupsInfo()
1257
    else:
1258
      groups = {}
1259

    
1260
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1261
                               live_data, lu.cfg.GetMasterNode(),
1262
                               node_to_primary, node_to_secondary,
1263
                               inst_uuid_to_inst_name, groups, oob_support,
1264
                               lu.cfg.GetClusterInfo())
1265

    
1266

    
1267
class LUNodeQuery(NoHooksLU):
1268
  """Logical unit for querying nodes.
1269

1270
  """
1271
  # pylint: disable=W0142
1272
  REQ_BGL = False
1273

    
1274
  def CheckArguments(self):
1275
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1276
                         self.op.output_fields, self.op.use_locking)
1277

    
1278
  def ExpandNames(self):
1279
    self.nq.ExpandNames(self)
1280

    
1281
  def DeclareLocks(self, level):
1282
    self.nq.DeclareLocks(self, level)
1283

    
1284
  def Exec(self, feedback_fn):
1285
    return self.nq.OldStyleQuery(self)
1286

    
1287

    
1288
def _CheckOutputFields(fields, selected):
1289
  """Checks whether all selected fields are valid according to fields.
1290

1291
  @type fields: L{utils.FieldSet}
1292
  @param fields: fields set
1293
  @type selected: L{utils.FieldSet}
1294
  @param selected: fields set
1295

1296
  """
1297
  delta = fields.NonMatching(selected)
1298
  if delta:
1299
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1300
                               % ",".join(delta), errors.ECODE_INVAL)
1301

    
1302

    
1303
class LUNodeQueryvols(NoHooksLU):
1304
  """Logical unit for getting volumes on node(s).
1305

1306
  """
1307
  REQ_BGL = False
1308

    
1309
  def CheckArguments(self):
1310
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1311
                                      constants.VF_VG, constants.VF_NAME,
1312
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1313
                       self.op.output_fields)
1314

    
1315
  def ExpandNames(self):
1316
    self.share_locks = ShareAll()
1317

    
1318
    if self.op.nodes:
1319
      self.needed_locks = {
1320
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1321
        }
1322
    else:
1323
      self.needed_locks = {
1324
        locking.LEVEL_NODE: locking.ALL_SET,
1325
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1326
        }
1327

    
1328
  def Exec(self, feedback_fn):
1329
    """Computes the list of nodes and their attributes.
1330

1331
    """
1332
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1333
    volumes = self.rpc.call_node_volumes(node_uuids)
1334

    
1335
    ilist = self.cfg.GetAllInstancesInfo()
1336
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1337

    
1338
    output = []
1339
    for node_uuid in node_uuids:
1340
      nresult = volumes[node_uuid]
1341
      if nresult.offline:
1342
        continue
1343
      msg = nresult.fail_msg
1344
      if msg:
1345
        self.LogWarning("Can't compute volume data on node %s: %s",
1346
                        self.cfg.GetNodeName(node_uuid), msg)
1347
        continue
1348

    
1349
      node_vols = sorted(nresult.payload,
1350
                         key=operator.itemgetter(constants.VF_DEV))
1351

    
1352
      for vol in node_vols:
1353
        node_output = []
1354
        for field in self.op.output_fields:
1355
          if field == constants.VF_NODE:
1356
            val = self.cfg.GetNodeName(node_uuid)
1357
          elif field == constants.VF_PHYS:
1358
            val = vol[constants.VF_DEV]
1359
          elif field == constants.VF_VG:
1360
            val = vol[constants.VF_VG]
1361
          elif field == constants.VF_NAME:
1362
            val = vol[constants.VF_NAME]
1363
          elif field == constants.VF_SIZE:
1364
            val = int(float(vol[constants.VF_SIZE]))
1365
          elif field == constants.VF_INSTANCE:
1366
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1367
                                 vol[constants.VF_NAME]), None)
1368
            if inst is not None:
1369
              val = inst.name
1370
            else:
1371
              val = "-"
1372
          else:
1373
            raise errors.ParameterError(field)
1374
          node_output.append(str(val))
1375

    
1376
        output.append(node_output)
1377

    
1378
    return output
1379

    
1380

    
1381
class LUNodeQueryStorage(NoHooksLU):
1382
  """Logical unit for getting information on storage units on node(s).
1383

1384
  """
1385
  REQ_BGL = False
1386

    
1387
  def CheckArguments(self):
1388
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1389
                       self.op.output_fields)
1390

    
1391
  def ExpandNames(self):
1392
    self.share_locks = ShareAll()
1393

    
1394
    if self.op.nodes:
1395
      self.needed_locks = {
1396
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1397
        }
1398
    else:
1399
      self.needed_locks = {
1400
        locking.LEVEL_NODE: locking.ALL_SET,
1401
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1402
        }
1403

    
1404
  def CheckPrereq(self):
1405
    """Check prerequisites.
1406

1407
    """
1408
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1409

    
1410
  def Exec(self, feedback_fn):
1411
    """Computes the list of nodes and their attributes.
1412

1413
    """
1414
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1415

    
1416
    # Always get name to sort by
1417
    if constants.SF_NAME in self.op.output_fields:
1418
      fields = self.op.output_fields[:]
1419
    else:
1420
      fields = [constants.SF_NAME] + self.op.output_fields
1421

    
1422
    # Never ask for node or type as it's only known to the LU
1423
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1424
      while extra in fields:
1425
        fields.remove(extra)
1426

    
1427
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1428
    name_idx = field_idx[constants.SF_NAME]
1429

    
1430
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1431
    data = self.rpc.call_storage_list(self.node_uuids,
1432
                                      self.op.storage_type, st_args,
1433
                                      self.op.name, fields)
1434

    
1435
    result = []
1436

    
1437
    for node_uuid in utils.NiceSort(self.node_uuids):
1438
      node_name = self.cfg.GetNodeName(node_uuid)
1439
      nresult = data[node_uuid]
1440
      if nresult.offline:
1441
        continue
1442

    
1443
      msg = nresult.fail_msg
1444
      if msg:
1445
        self.LogWarning("Can't get storage data from node %s: %s",
1446
                        node_name, msg)
1447
        continue
1448

    
1449
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1450

    
1451
      for name in utils.NiceSort(rows.keys()):
1452
        row = rows[name]
1453

    
1454
        out = []
1455

    
1456
        for field in self.op.output_fields:
1457
          if field == constants.SF_NODE:
1458
            val = node_name
1459
          elif field == constants.SF_TYPE:
1460
            val = self.op.storage_type
1461
          elif field in field_idx:
1462
            val = row[field_idx[field]]
1463
          else:
1464
            raise errors.ParameterError(field)
1465

    
1466
          out.append(val)
1467

    
1468
        result.append(out)
1469

    
1470
    return result
1471

    
1472

    
1473
class LUNodeRemove(LogicalUnit):
1474
  """Logical unit for removing a node.
1475

1476
  """
1477
  HPATH = "node-remove"
1478
  HTYPE = constants.HTYPE_NODE
1479

    
1480
  def BuildHooksEnv(self):
1481
    """Build hooks env.
1482

1483
    """
1484
    return {
1485
      "OP_TARGET": self.op.node_name,
1486
      "NODE_NAME": self.op.node_name,
1487
      }
1488

    
1489
  def BuildHooksNodes(self):
1490
    """Build hooks nodes.
1491

1492
    This doesn't run on the target node in the pre phase as a failed
1493
    node would then be impossible to remove.
1494

1495
    """
1496
    all_nodes = self.cfg.GetNodeList()
1497
    try:
1498
      all_nodes.remove(self.op.node_uuid)
1499
    except ValueError:
1500
      pass
1501
    return (all_nodes, all_nodes)
1502

    
1503
  def CheckPrereq(self):
1504
    """Check prerequisites.
1505

1506
    This checks:
1507
     - the node exists in the configuration
1508
     - it does not have primary or secondary instances
1509
     - it's not the master
1510

1511
    Any errors are signaled by raising errors.OpPrereqError.
1512

1513
    """
1514
    (self.op.node_uuid, self.op.node_name) = \
1515
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1516
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1517
    assert node is not None
1518

    
1519
    masternode = self.cfg.GetMasterNode()
1520
    if node.uuid == masternode:
1521
      raise errors.OpPrereqError("Node is the master node, failover to another"
1522
                                 " node is required", errors.ECODE_INVAL)
1523

    
1524
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1525
      if node.uuid in instance.all_nodes:
1526
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1527
                                   " please remove first" % instance.name,
1528
                                   errors.ECODE_INVAL)
1529
    self.op.node_name = node.name
1530
    self.node = node
1531

    
1532
  def Exec(self, feedback_fn):
1533
    """Removes the node from the cluster.
1534

1535
    """
1536
    logging.info("Stopping the node daemon and removing configs from node %s",
1537
                 self.node.name)
1538

    
1539
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1540

    
1541
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1542
      "Not owning BGL"
1543

    
1544
    # Promote nodes to master candidate as needed
1545
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1546
    self.context.RemoveNode(self.node)
1547

    
1548
    # Run post hooks on the node before it's removed
1549
    RunPostHook(self, self.node.name)
1550

    
1551
    # we have to call this by name rather than by UUID, as the node is no longer
1552
    # in the config
1553
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1554
    msg = result.fail_msg
1555
    if msg:
1556
      self.LogWarning("Errors encountered on the remote node while leaving"
1557
                      " the cluster: %s", msg)
1558

    
1559
    # Remove node from our /etc/hosts
1560
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1561
      master_node_uuid = self.cfg.GetMasterNode()
1562
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1563
                                              constants.ETC_HOSTS_REMOVE,
1564
                                              self.node.name, None)
1565
      result.Raise("Can't update hosts file with new host data")
1566
      RedistributeAncillaryFiles(self)
1567

    
1568

    
1569
class LURepairNodeStorage(NoHooksLU):
1570
  """Repairs the volume group on a node.
1571

1572
  """
1573
  REQ_BGL = False
1574

    
1575
  def CheckArguments(self):
1576
    (self.op.node_uuid, self.op.node_name) = \
1577
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1578

    
1579
    storage_type = self.op.storage_type
1580

    
1581
    if (constants.SO_FIX_CONSISTENCY not in
1582
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1583
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1584
                                 " repaired" % storage_type,
1585
                                 errors.ECODE_INVAL)
1586

    
1587
  def ExpandNames(self):
1588
    self.needed_locks = {
1589
      locking.LEVEL_NODE: [self.op.node_uuid],
1590
      }
1591

    
1592
  def _CheckFaultyDisks(self, instance, node_uuid):
1593
    """Ensure faulty disks abort the opcode or at least warn."""
1594
    try:
1595
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1596
                                 node_uuid, True):
1597
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1598
                                   " node '%s'" %
1599
                                   (instance.name,
1600
                                    self.cfg.GetNodeName(node_uuid)),
1601
                                   errors.ECODE_STATE)
1602
    except errors.OpPrereqError, err:
1603
      if self.op.ignore_consistency:
1604
        self.LogWarning(str(err.args[0]))
1605
      else:
1606
        raise
1607

    
1608
  def CheckPrereq(self):
1609
    """Check prerequisites.
1610

1611
    """
1612
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1613

    
1614
    # Check whether any instance on this node has faulty disks
1615
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1616
      if not inst.disks_active:
1617
        continue
1618
      check_nodes = set(inst.all_nodes)
1619
      check_nodes.discard(self.op.node_uuid)
1620
      for inst_node_uuid in check_nodes:
1621
        self._CheckFaultyDisks(inst, inst_node_uuid)
1622

    
1623
  def Exec(self, feedback_fn):
1624
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1625
                (self.op.name, self.op.node_name))
1626

    
1627
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1628
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1629
                                           self.op.storage_type, st_args,
1630
                                           self.op.name,
1631
                                           constants.SO_FIX_CONSISTENCY)
1632
    result.Raise("Failed to repair storage unit '%s' on %s" %
1633
                 (self.op.name, self.op.node_name))