Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 6afb9fb4

History | View | Annotate | Download (59.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
    # OpenvSwitch: Warn user if link is missing
116
    if (self.op.ndparams[constants.ND_OVS] and not
117
        self.op.ndparams[constants.ND_OVS_LINK]):
118
      self.LogInfo("No physical interface for OpenvSwitch was given."
119
                   " OpenvSwitch will not have an outside connection. This"
120
                   " might not be what you want.")
121
    # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
122
    if (not self.op.ndparams[constants.ND_OVS] and
123
        (self.op.ndparams[constants.ND_OVS_NAME] or
124
         self.op.ndparams[constants.ND_OVS_LINK])):
125
      raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
126
                                 " OpenvSwitch is not enabled. Please enable"
127
                                 " OpenvSwitch with --ovs", errors.ECODE_INVAL)
128

    
129
  def BuildHooksEnv(self):
130
    """Build hooks env.
131

132
    This will run on all nodes before, and on all nodes + the new node after.
133

134
    """
135
    return {
136
      "OP_TARGET": self.op.node_name,
137
      "NODE_NAME": self.op.node_name,
138
      "NODE_PIP": self.op.primary_ip,
139
      "NODE_SIP": self.op.secondary_ip,
140
      "MASTER_CAPABLE": str(self.op.master_capable),
141
      "VM_CAPABLE": str(self.op.vm_capable),
142
      }
143

    
144
  def BuildHooksNodes(self):
145
    """Build hooks nodes.
146

147
    """
148
    hook_nodes = self.cfg.GetNodeList()
149
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
150
    if new_node_info is not None:
151
      # Exclude added node
152
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
153

    
154
    # add the new node as post hook node by name; it does not have an UUID yet
155
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
156

    
157
  def CheckPrereq(self):
158
    """Check prerequisites.
159

160
    This checks:
161
     - the new node is not already in the config
162
     - it is resolvable
163
     - its parameters (single/dual homed) matches the cluster
164

165
    Any errors are signaled by raising errors.OpPrereqError.
166

167
    """
168
    node_name = self.hostname.name
169
    self.op.primary_ip = self.hostname.ip
170
    if self.op.secondary_ip is None:
171
      if self.primary_ip_family == netutils.IP6Address.family:
172
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
173
                                   " IPv4 address must be given as secondary",
174
                                   errors.ECODE_INVAL)
175
      self.op.secondary_ip = self.op.primary_ip
176

    
177
    secondary_ip = self.op.secondary_ip
178
    if not netutils.IP4Address.IsValid(secondary_ip):
179
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
180
                                 " address" % secondary_ip, errors.ECODE_INVAL)
181

    
182
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
183
    if not self.op.readd and existing_node_info is not None:
184
      raise errors.OpPrereqError("Node %s is already in the configuration" %
185
                                 node_name, errors.ECODE_EXISTS)
186
    elif self.op.readd and existing_node_info is None:
187
      raise errors.OpPrereqError("Node %s is not in the configuration" %
188
                                 node_name, errors.ECODE_NOENT)
189

    
190
    self.changed_primary_ip = False
191

    
192
    for existing_node in self.cfg.GetAllNodesInfo().values():
193
      if self.op.readd and node_name == existing_node.name:
194
        if existing_node.secondary_ip != secondary_ip:
195
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
196
                                     " address configuration as before",
197
                                     errors.ECODE_INVAL)
198
        if existing_node.primary_ip != self.op.primary_ip:
199
          self.changed_primary_ip = True
200

    
201
        continue
202

    
203
      if (existing_node.primary_ip == self.op.primary_ip or
204
          existing_node.secondary_ip == self.op.primary_ip or
205
          existing_node.primary_ip == secondary_ip or
206
          existing_node.secondary_ip == secondary_ip):
207
        raise errors.OpPrereqError("New node ip address(es) conflict with"
208
                                   " existing node %s" % existing_node.name,
209
                                   errors.ECODE_NOTUNIQUE)
210

    
211
    # After this 'if' block, None is no longer a valid value for the
212
    # _capable op attributes
213
    if self.op.readd:
214
      assert existing_node_info is not None, \
215
        "Can't retrieve locked node %s" % node_name
216
      for attr in self._NFLAGS:
217
        if getattr(self.op, attr) is None:
218
          setattr(self.op, attr, getattr(existing_node_info, attr))
219
    else:
220
      for attr in self._NFLAGS:
221
        if getattr(self.op, attr) is None:
222
          setattr(self.op, attr, True)
223

    
224
    if self.op.readd and not self.op.vm_capable:
225
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
226
      if pri or sec:
227
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
228
                                   " flag set to false, but it already holds"
229
                                   " instances" % node_name,
230
                                   errors.ECODE_STATE)
231

    
232
    # check that the type of the node (single versus dual homed) is the
233
    # same as for the master
234
    myself = self.cfg.GetMasterNodeInfo()
235
    master_singlehomed = myself.secondary_ip == myself.primary_ip
236
    newbie_singlehomed = secondary_ip == self.op.primary_ip
237
    if master_singlehomed != newbie_singlehomed:
238
      if master_singlehomed:
239
        raise errors.OpPrereqError("The master has no secondary ip but the"
240
                                   " new node has one",
241
                                   errors.ECODE_INVAL)
242
      else:
243
        raise errors.OpPrereqError("The master has a secondary ip but the"
244
                                   " new node doesn't have one",
245
                                   errors.ECODE_INVAL)
246

    
247
    # checks reachability
248
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
249
      raise errors.OpPrereqError("Node not reachable by ping",
250
                                 errors.ECODE_ENVIRON)
251

    
252
    if not newbie_singlehomed:
253
      # check reachability from my secondary ip to newbie's secondary ip
254
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
255
                              source=myself.secondary_ip):
256
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
257
                                   " based ping to node daemon port",
258
                                   errors.ECODE_ENVIRON)
259

    
260
    if self.op.readd:
261
      exceptions = [existing_node_info.uuid]
262
    else:
263
      exceptions = []
264

    
265
    if self.op.master_capable:
266
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
267
    else:
268
      self.master_candidate = False
269

    
270
    if self.op.readd:
271
      self.new_node = existing_node_info
272
    else:
273
      node_group = self.cfg.LookupNodeGroup(self.op.group)
274
      self.new_node = objects.Node(name=node_name,
275
                                   primary_ip=self.op.primary_ip,
276
                                   secondary_ip=secondary_ip,
277
                                   master_candidate=self.master_candidate,
278
                                   offline=False, drained=False,
279
                                   group=node_group, ndparams={})
280

    
281
    if self.op.ndparams:
282
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
283
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
284
                           "node", "cluster or group")
285

    
286
    if self.op.hv_state:
287
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
288

    
289
    if self.op.disk_state:
290
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
291

    
292
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
293
    #       it a property on the base class.
294
    rpcrunner = rpc.DnsOnlyRunner()
295
    result = rpcrunner.call_version([node_name])[node_name]
296
    result.Raise("Can't get version information from node %s" % node_name)
297
    if constants.PROTOCOL_VERSION == result.payload:
298
      logging.info("Communication to node %s fine, sw version %s match",
299
                   node_name, result.payload)
300
    else:
301
      raise errors.OpPrereqError("Version mismatch master version %s,"
302
                                 " node version %s" %
303
                                 (constants.PROTOCOL_VERSION, result.payload),
304
                                 errors.ECODE_ENVIRON)
305

    
306
    vg_name = self.cfg.GetVGName()
307
    if vg_name is not None:
308
      vparams = {constants.NV_PVLIST: [vg_name]}
309
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310
      cname = self.cfg.GetClusterName()
311
      result = rpcrunner.call_node_verify_light(
312
          [node_name], vparams, cname,
313
          self.cfg.GetClusterInfo().hvparams)[node_name]
314
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
315
      if errmsgs:
316
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
317
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
318

    
319
  def Exec(self, feedback_fn):
320
    """Adds the new node to the cluster.
321

322
    """
323
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
324
      "Not owning BGL"
325

    
326
    # We adding a new node so we assume it's powered
327
    self.new_node.powered = True
328

    
329
    # for re-adds, reset the offline/drained/master-candidate flags;
330
    # we need to reset here, otherwise offline would prevent RPC calls
331
    # later in the procedure; this also means that if the re-add
332
    # fails, we are left with a non-offlined, broken node
333
    if self.op.readd:
334
      self.new_node.drained = False
335
      self.LogInfo("Readding a node, the offline/drained flags were reset")
336
      # if we demote the node, we do cleanup later in the procedure
337
      self.new_node.master_candidate = self.master_candidate
338
      if self.changed_primary_ip:
339
        self.new_node.primary_ip = self.op.primary_ip
340

    
341
    # copy the master/vm_capable flags
342
    for attr in self._NFLAGS:
343
      setattr(self.new_node, attr, getattr(self.op, attr))
344

    
345
    # notify the user about any possible mc promotion
346
    if self.new_node.master_candidate:
347
      self.LogInfo("Node will be a master candidate")
348

    
349
    if self.op.ndparams:
350
      self.new_node.ndparams = self.op.ndparams
351
    else:
352
      self.new_node.ndparams = {}
353

    
354
    if self.op.hv_state:
355
      self.new_node.hv_state_static = self.new_hv_state
356

    
357
    if self.op.disk_state:
358
      self.new_node.disk_state_static = self.new_disk_state
359

    
360
    # Add node to our /etc/hosts, and add key to known_hosts
361
    if self.cfg.GetClusterInfo().modify_etc_hosts:
362
      master_node = self.cfg.GetMasterNode()
363
      result = self.rpc.call_etc_hosts_modify(
364
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
365
                 self.hostname.ip)
366
      result.Raise("Can't update hosts file with new host data")
367

    
368
    if self.new_node.secondary_ip != self.new_node.primary_ip:
369
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
370
                               False)
371

    
372
    node_verifier_uuids = [self.cfg.GetMasterNode()]
373
    node_verify_param = {
374
      constants.NV_NODELIST: ([self.new_node.name], {}),
375
      # TODO: do a node-net-test as well?
376
    }
377

    
378
    result = self.rpc.call_node_verify(
379
               node_verifier_uuids, node_verify_param,
380
               self.cfg.GetClusterName(),
381
               self.cfg.GetClusterInfo().hvparams)
382
    for verifier in node_verifier_uuids:
383
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
384
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
385
      if nl_payload:
386
        for failed in nl_payload:
387
          feedback_fn("ssh/hostname verification failed"
388
                      " (checking from %s): %s" %
389
                      (verifier, nl_payload[failed]))
390
        raise errors.OpExecError("ssh/hostname verification failed")
391

    
392
    # OpenvSwitch initialization on the node
393
    if self.new_node.ndparams[constants.ND_OVS]:
394
      result = self.rpc.call_node_configure_ovs(
395
                 self.new_node.name,
396
                 self.new_node.ndparams[constants.ND_OVS_NAME],
397
                 self.new_node.ndparams[constants.ND_OVS_LINK])
398

    
399
    if self.op.readd:
400
      self.context.ReaddNode(self.new_node)
401
      RedistributeAncillaryFiles(self)
402
      # make sure we redistribute the config
403
      self.cfg.Update(self.new_node, feedback_fn)
404
      # and make sure the new node will not have old files around
405
      if not self.new_node.master_candidate:
406
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
407
        result.Warn("Node failed to demote itself from master candidate status",
408
                    self.LogWarning)
409
    else:
410
      self.context.AddNode(self.new_node, self.proc.GetECId())
411
      RedistributeAncillaryFiles(self)
412

    
413

    
414
class LUNodeSetParams(LogicalUnit):
415
  """Modifies the parameters of a node.
416

417
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
418
      to the node role (as _ROLE_*)
419
  @cvar _R2F: a dictionary from node role to tuples of flags
420
  @cvar _FLAGS: a list of attribute names corresponding to the flags
421

422
  """
423
  HPATH = "node-modify"
424
  HTYPE = constants.HTYPE_NODE
425
  REQ_BGL = False
426
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
427
  _F2R = {
428
    (True, False, False): _ROLE_CANDIDATE,
429
    (False, True, False): _ROLE_DRAINED,
430
    (False, False, True): _ROLE_OFFLINE,
431
    (False, False, False): _ROLE_REGULAR,
432
    }
433
  _R2F = dict((v, k) for k, v in _F2R.items())
434
  _FLAGS = ["master_candidate", "drained", "offline"]
435

    
436
  def CheckArguments(self):
437
    (self.op.node_uuid, self.op.node_name) = \
438
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
439
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
440
                self.op.master_capable, self.op.vm_capable,
441
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
442
                self.op.disk_state]
443
    if all_mods.count(None) == len(all_mods):
444
      raise errors.OpPrereqError("Please pass at least one modification",
445
                                 errors.ECODE_INVAL)
446
    if all_mods.count(True) > 1:
447
      raise errors.OpPrereqError("Can't set the node into more than one"
448
                                 " state at the same time",
449
                                 errors.ECODE_INVAL)
450

    
451
    # Boolean value that tells us whether we might be demoting from MC
452
    self.might_demote = (self.op.master_candidate is False or
453
                         self.op.offline is True or
454
                         self.op.drained is True or
455
                         self.op.master_capable is False)
456

    
457
    if self.op.secondary_ip:
458
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
459
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
460
                                   " address" % self.op.secondary_ip,
461
                                   errors.ECODE_INVAL)
462

    
463
    self.lock_all = self.op.auto_promote and self.might_demote
464
    self.lock_instances = self.op.secondary_ip is not None
465

    
466
  def _InstanceFilter(self, instance):
467
    """Filter for getting affected instances.
468

469
    """
470
    return (instance.disk_template in constants.DTS_INT_MIRROR and
471
            self.op.node_uuid in instance.all_nodes)
472

    
473
  def ExpandNames(self):
474
    if self.lock_all:
475
      self.needed_locks = {
476
        locking.LEVEL_NODE: locking.ALL_SET,
477

    
478
        # Block allocations when all nodes are locked
479
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
480
        }
481
    else:
482
      self.needed_locks = {
483
        locking.LEVEL_NODE: self.op.node_uuid,
484
        }
485

    
486
    # Since modifying a node can have severe effects on currently running
487
    # operations the resource lock is at least acquired in shared mode
488
    self.needed_locks[locking.LEVEL_NODE_RES] = \
489
      self.needed_locks[locking.LEVEL_NODE]
490

    
491
    # Get all locks except nodes in shared mode; they are not used for anything
492
    # but read-only access
493
    self.share_locks = ShareAll()
494
    self.share_locks[locking.LEVEL_NODE] = 0
495
    self.share_locks[locking.LEVEL_NODE_RES] = 0
496
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
497

    
498
    if self.lock_instances:
499
      self.needed_locks[locking.LEVEL_INSTANCE] = \
500
        self.cfg.GetInstanceNames(
501
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
502

    
503
  def BuildHooksEnv(self):
504
    """Build hooks env.
505

506
    This runs on the master node.
507

508
    """
509
    return {
510
      "OP_TARGET": self.op.node_name,
511
      "MASTER_CANDIDATE": str(self.op.master_candidate),
512
      "OFFLINE": str(self.op.offline),
513
      "DRAINED": str(self.op.drained),
514
      "MASTER_CAPABLE": str(self.op.master_capable),
515
      "VM_CAPABLE": str(self.op.vm_capable),
516
      }
517

    
518
  def BuildHooksNodes(self):
519
    """Build hooks nodes.
520

521
    """
522
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
523
    return (nl, nl)
524

    
525
  def CheckPrereq(self):
526
    """Check prerequisites.
527

528
    This only checks the instance list against the existing names.
529

530
    """
531
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
532
    if self.lock_instances:
533
      affected_instances = \
534
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
535

    
536
      # Verify instance locks
537
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
538
      wanted_instance_names = frozenset([inst.name for inst in
539
                                         affected_instances.values()])
540
      if wanted_instance_names - owned_instance_names:
541
        raise errors.OpPrereqError("Instances affected by changing node %s's"
542
                                   " secondary IP address have changed since"
543
                                   " locks were acquired, wanted '%s', have"
544
                                   " '%s'; retry the operation" %
545
                                   (node.name,
546
                                    utils.CommaJoin(wanted_instance_names),
547
                                    utils.CommaJoin(owned_instance_names)),
548
                                   errors.ECODE_STATE)
549
    else:
550
      affected_instances = None
551

    
552
    if (self.op.master_candidate is not None or
553
        self.op.drained is not None or
554
        self.op.offline is not None):
555
      # we can't change the master's node flags
556
      if node.uuid == self.cfg.GetMasterNode():
557
        raise errors.OpPrereqError("The master role can be changed"
558
                                   " only via master-failover",
559
                                   errors.ECODE_INVAL)
560

    
561
    if self.op.master_candidate and not node.master_capable:
562
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
563
                                 " it a master candidate" % node.name,
564
                                 errors.ECODE_STATE)
565

    
566
    if self.op.vm_capable is False:
567
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
568
      if ipri or isec:
569
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
570
                                   " the vm_capable flag" % node.name,
571
                                   errors.ECODE_STATE)
572

    
573
    if node.master_candidate and self.might_demote and not self.lock_all:
574
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
575
      # check if after removing the current node, we're missing master
576
      # candidates
577
      (mc_remaining, mc_should, _) = \
578
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
579
      if mc_remaining < mc_should:
580
        raise errors.OpPrereqError("Not enough master candidates, please"
581
                                   " pass auto promote option to allow"
582
                                   " promotion (--auto-promote or RAPI"
583
                                   " auto_promote=True)", errors.ECODE_STATE)
584

    
585
    self.old_flags = old_flags = (node.master_candidate,
586
                                  node.drained, node.offline)
587
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
588
    self.old_role = old_role = self._F2R[old_flags]
589

    
590
    # Check for ineffective changes
591
    for attr in self._FLAGS:
592
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
593
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
594
        setattr(self.op, attr, None)
595

    
596
    # Past this point, any flag change to False means a transition
597
    # away from the respective state, as only real changes are kept
598

    
599
    # TODO: We might query the real power state if it supports OOB
600
    if SupportsOob(self.cfg, node):
601
      if self.op.offline is False and not (node.powered or
602
                                           self.op.powered is True):
603
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
604
                                    " offline status can be reset") %
605
                                   self.op.node_name, errors.ECODE_STATE)
606
    elif self.op.powered is not None:
607
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
608
                                  " as it does not support out-of-band"
609
                                  " handling") % self.op.node_name,
610
                                 errors.ECODE_STATE)
611

    
612
    # If we're being deofflined/drained, we'll MC ourself if needed
613
    if (self.op.drained is False or self.op.offline is False or
614
        (self.op.master_capable and not node.master_capable)):
615
      if _DecideSelfPromotion(self):
616
        self.op.master_candidate = True
617
        self.LogInfo("Auto-promoting node to master candidate")
618

    
619
    # If we're no longer master capable, we'll demote ourselves from MC
620
    if self.op.master_capable is False and node.master_candidate:
621
      self.LogInfo("Demoting from master candidate")
622
      self.op.master_candidate = False
623

    
624
    # Compute new role
625
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
626
    if self.op.master_candidate:
627
      new_role = self._ROLE_CANDIDATE
628
    elif self.op.drained:
629
      new_role = self._ROLE_DRAINED
630
    elif self.op.offline:
631
      new_role = self._ROLE_OFFLINE
632
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
633
      # False is still in new flags, which means we're un-setting (the
634
      # only) True flag
635
      new_role = self._ROLE_REGULAR
636
    else: # no new flags, nothing, keep old role
637
      new_role = old_role
638

    
639
    self.new_role = new_role
640

    
641
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
642
      # Trying to transition out of offline status
643
      result = self.rpc.call_version([node.uuid])[node.uuid]
644
      if result.fail_msg:
645
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
646
                                   " to report its version: %s" %
647
                                   (node.name, result.fail_msg),
648
                                   errors.ECODE_STATE)
649
      else:
650
        self.LogWarning("Transitioning node from offline to online state"
651
                        " without using re-add. Please make sure the node"
652
                        " is healthy!")
653

    
654
    # When changing the secondary ip, verify if this is a single-homed to
655
    # multi-homed transition or vice versa, and apply the relevant
656
    # restrictions.
657
    if self.op.secondary_ip:
658
      # Ok even without locking, because this can't be changed by any LU
659
      master = self.cfg.GetMasterNodeInfo()
660
      master_singlehomed = master.secondary_ip == master.primary_ip
661
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
662
        if self.op.force and node.uuid == master.uuid:
663
          self.LogWarning("Transitioning from single-homed to multi-homed"
664
                          " cluster; all nodes will require a secondary IP"
665
                          " address")
666
        else:
667
          raise errors.OpPrereqError("Changing the secondary ip on a"
668
                                     " single-homed cluster requires the"
669
                                     " --force option to be passed, and the"
670
                                     " target node to be the master",
671
                                     errors.ECODE_INVAL)
672
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
673
        if self.op.force and node.uuid == master.uuid:
674
          self.LogWarning("Transitioning from multi-homed to single-homed"
675
                          " cluster; secondary IP addresses will have to be"
676
                          " removed")
677
        else:
678
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
679
                                     " same as the primary IP on a multi-homed"
680
                                     " cluster, unless the --force option is"
681
                                     " passed, and the target node is the"
682
                                     " master", errors.ECODE_INVAL)
683

    
684
      assert not (set([inst.name for inst in affected_instances.values()]) -
685
                  self.owned_locks(locking.LEVEL_INSTANCE))
686

    
687
      if node.offline:
688
        if affected_instances:
689
          msg = ("Cannot change secondary IP address: offline node has"
690
                 " instances (%s) configured to use it" %
691
                 utils.CommaJoin(
692
                   [inst.name for inst in affected_instances.values()]))
693
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
694
      else:
695
        # On online nodes, check that no instances are running, and that
696
        # the node has the new ip and we can reach it.
697
        for instance in affected_instances.values():
698
          CheckInstanceState(self, instance, INSTANCE_DOWN,
699
                             msg="cannot change secondary ip")
700

    
701
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
702
        if master.uuid != node.uuid:
703
          # check reachability from master secondary ip to new secondary ip
704
          if not netutils.TcpPing(self.op.secondary_ip,
705
                                  constants.DEFAULT_NODED_PORT,
706
                                  source=master.secondary_ip):
707
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
708
                                       " based ping to node daemon port",
709
                                       errors.ECODE_ENVIRON)
710

    
711
    if self.op.ndparams:
712
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
713
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
714
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
715
                           "node", "cluster or group")
716
      self.new_ndparams = new_ndparams
717

    
718
    if self.op.hv_state:
719
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
720
                                                node.hv_state_static)
721

    
722
    if self.op.disk_state:
723
      self.new_disk_state = \
724
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
725

    
726
  def Exec(self, feedback_fn):
727
    """Modifies a node.
728

729
    """
730
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
731
    result = []
732

    
733
    if self.op.ndparams:
734
      node.ndparams = self.new_ndparams
735

    
736
    if self.op.powered is not None:
737
      node.powered = self.op.powered
738

    
739
    if self.op.hv_state:
740
      node.hv_state_static = self.new_hv_state
741

    
742
    if self.op.disk_state:
743
      node.disk_state_static = self.new_disk_state
744

    
745
    for attr in ["master_capable", "vm_capable"]:
746
      val = getattr(self.op, attr)
747
      if val is not None:
748
        setattr(node, attr, val)
749
        result.append((attr, str(val)))
750

    
751
    if self.new_role != self.old_role:
752
      # Tell the node to demote itself, if no longer MC and not offline
753
      if self.old_role == self._ROLE_CANDIDATE and \
754
          self.new_role != self._ROLE_OFFLINE:
755
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
756
        if msg:
757
          self.LogWarning("Node failed to demote itself: %s", msg)
758

    
759
      new_flags = self._R2F[self.new_role]
760
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
761
        if of != nf:
762
          result.append((desc, str(nf)))
763
      (node.master_candidate, node.drained, node.offline) = new_flags
764

    
765
      # we locked all nodes, we adjust the CP before updating this node
766
      if self.lock_all:
767
        AdjustCandidatePool(self, [node.uuid])
768

    
769
    if self.op.secondary_ip:
770
      node.secondary_ip = self.op.secondary_ip
771
      result.append(("secondary_ip", self.op.secondary_ip))
772

    
773
    # this will trigger configuration file update, if needed
774
    self.cfg.Update(node, feedback_fn)
775

    
776
    # this will trigger job queue propagation or cleanup if the mc
777
    # flag changed
778
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
779
      self.context.ReaddNode(node)
780

    
781
    return result
782

    
783

    
784
class LUNodePowercycle(NoHooksLU):
785
  """Powercycles a node.
786

787
  """
788
  REQ_BGL = False
789

    
790
  def CheckArguments(self):
791
    (self.op.node_uuid, self.op.node_name) = \
792
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
793

    
794
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
795
      raise errors.OpPrereqError("The node is the master and the force"
796
                                 " parameter was not set",
797
                                 errors.ECODE_INVAL)
798

    
799
  def ExpandNames(self):
800
    """Locking for PowercycleNode.
801

802
    This is a last-resort option and shouldn't block on other
803
    jobs. Therefore, we grab no locks.
804

805
    """
806
    self.needed_locks = {}
807

    
808
  def Exec(self, feedback_fn):
809
    """Reboots a node.
810

811
    """
812
    default_hypervisor = self.cfg.GetHypervisorType()
813
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
814
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
815
                                           default_hypervisor,
816
                                           hvparams)
817
    result.Raise("Failed to schedule the reboot")
818
    return result.payload
819

    
820

    
821
def _GetNodeInstancesInner(cfg, fn):
822
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
823

    
824

    
825
def _GetNodePrimaryInstances(cfg, node_uuid):
826
  """Returns primary instances on a node.
827

828
  """
829
  return _GetNodeInstancesInner(cfg,
830
                                lambda inst: node_uuid == inst.primary_node)
831

    
832

    
833
def _GetNodeSecondaryInstances(cfg, node_uuid):
834
  """Returns secondary instances on a node.
835

836
  """
837
  return _GetNodeInstancesInner(cfg,
838
                                lambda inst: node_uuid in inst.secondary_nodes)
839

    
840

    
841
def _GetNodeInstances(cfg, node_uuid):
842
  """Returns a list of all primary and secondary instances on a node.
843

844
  """
845

    
846
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
847

    
848

    
849
class LUNodeEvacuate(NoHooksLU):
850
  """Evacuates instances off a list of nodes.
851

852
  """
853
  REQ_BGL = False
854

    
855
  _MODE2IALLOCATOR = {
856
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
857
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
858
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
859
    }
860
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
861
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
862
          constants.IALLOCATOR_NEVAC_MODES)
863

    
864
  def CheckArguments(self):
865
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
866

    
867
  def ExpandNames(self):
868
    (self.op.node_uuid, self.op.node_name) = \
869
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
870

    
871
    if self.op.remote_node is not None:
872
      (self.op.remote_node_uuid, self.op.remote_node) = \
873
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
874
                              self.op.remote_node)
875
      assert self.op.remote_node
876

    
877
      if self.op.node_uuid == self.op.remote_node_uuid:
878
        raise errors.OpPrereqError("Can not use evacuated node as a new"
879
                                   " secondary node", errors.ECODE_INVAL)
880

    
881
      if self.op.mode != constants.NODE_EVAC_SEC:
882
        raise errors.OpPrereqError("Without the use of an iallocator only"
883
                                   " secondary instances can be evacuated",
884
                                   errors.ECODE_INVAL)
885

    
886
    # Declare locks
887
    self.share_locks = ShareAll()
888
    self.needed_locks = {
889
      locking.LEVEL_INSTANCE: [],
890
      locking.LEVEL_NODEGROUP: [],
891
      locking.LEVEL_NODE: [],
892
      }
893

    
894
    # Determine nodes (via group) optimistically, needs verification once locks
895
    # have been acquired
896
    self.lock_nodes = self._DetermineNodes()
897

    
898
  def _DetermineNodes(self):
899
    """Gets the list of node UUIDs to operate on.
900

901
    """
902
    if self.op.remote_node is None:
903
      # Iallocator will choose any node(s) in the same group
904
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
905
    else:
906
      group_nodes = frozenset([self.op.remote_node_uuid])
907

    
908
    # Determine nodes to be locked
909
    return set([self.op.node_uuid]) | group_nodes
910

    
911
  def _DetermineInstances(self):
912
    """Builds list of instances to operate on.
913

914
    """
915
    assert self.op.mode in constants.NODE_EVAC_MODES
916

    
917
    if self.op.mode == constants.NODE_EVAC_PRI:
918
      # Primary instances only
919
      inst_fn = _GetNodePrimaryInstances
920
      assert self.op.remote_node is None, \
921
        "Evacuating primary instances requires iallocator"
922
    elif self.op.mode == constants.NODE_EVAC_SEC:
923
      # Secondary instances only
924
      inst_fn = _GetNodeSecondaryInstances
925
    else:
926
      # All instances
927
      assert self.op.mode == constants.NODE_EVAC_ALL
928
      inst_fn = _GetNodeInstances
929
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
930
      # per instance
931
      raise errors.OpPrereqError("Due to an issue with the iallocator"
932
                                 " interface it is not possible to evacuate"
933
                                 " all instances at once; specify explicitly"
934
                                 " whether to evacuate primary or secondary"
935
                                 " instances",
936
                                 errors.ECODE_INVAL)
937

    
938
    return inst_fn(self.cfg, self.op.node_uuid)
939

    
940
  def DeclareLocks(self, level):
941
    if level == locking.LEVEL_INSTANCE:
942
      # Lock instances optimistically, needs verification once node and group
943
      # locks have been acquired
944
      self.needed_locks[locking.LEVEL_INSTANCE] = \
945
        set(i.name for i in self._DetermineInstances())
946

    
947
    elif level == locking.LEVEL_NODEGROUP:
948
      # Lock node groups for all potential target nodes optimistically, needs
949
      # verification once nodes have been acquired
950
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
951
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
952

    
953
    elif level == locking.LEVEL_NODE:
954
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
955

    
956
  def CheckPrereq(self):
957
    # Verify locks
958
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
959
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
960
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
961

    
962
    need_nodes = self._DetermineNodes()
963

    
964
    if not owned_nodes.issuperset(need_nodes):
965
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
966
                                 " locks were acquired, current nodes are"
967
                                 " are '%s', used to be '%s'; retry the"
968
                                 " operation" %
969
                                 (self.op.node_name,
970
                                  utils.CommaJoin(need_nodes),
971
                                  utils.CommaJoin(owned_nodes)),
972
                                 errors.ECODE_STATE)
973

    
974
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
975
    if owned_groups != wanted_groups:
976
      raise errors.OpExecError("Node groups changed since locks were acquired,"
977
                               " current groups are '%s', used to be '%s';"
978
                               " retry the operation" %
979
                               (utils.CommaJoin(wanted_groups),
980
                                utils.CommaJoin(owned_groups)))
981

    
982
    # Determine affected instances
983
    self.instances = self._DetermineInstances()
984
    self.instance_names = [i.name for i in self.instances]
985

    
986
    if set(self.instance_names) != owned_instance_names:
987
      raise errors.OpExecError("Instances on node '%s' changed since locks"
988
                               " were acquired, current instances are '%s',"
989
                               " used to be '%s'; retry the operation" %
990
                               (self.op.node_name,
991
                                utils.CommaJoin(self.instance_names),
992
                                utils.CommaJoin(owned_instance_names)))
993

    
994
    if self.instance_names:
995
      self.LogInfo("Evacuating instances from node '%s': %s",
996
                   self.op.node_name,
997
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
998
    else:
999
      self.LogInfo("No instances to evacuate from node '%s'",
1000
                   self.op.node_name)
1001

    
1002
    if self.op.remote_node is not None:
1003
      for i in self.instances:
1004
        if i.primary_node == self.op.remote_node_uuid:
1005
          raise errors.OpPrereqError("Node %s is the primary node of"
1006
                                     " instance %s, cannot use it as"
1007
                                     " secondary" %
1008
                                     (self.op.remote_node, i.name),
1009
                                     errors.ECODE_INVAL)
1010

    
1011
  def Exec(self, feedback_fn):
1012
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1013

    
1014
    if not self.instance_names:
1015
      # No instances to evacuate
1016
      jobs = []
1017

    
1018
    elif self.op.iallocator is not None:
1019
      # TODO: Implement relocation to other group
1020
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1021
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1022
                                     instances=list(self.instance_names))
1023
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1024

    
1025
      ial.Run(self.op.iallocator)
1026

    
1027
      if not ial.success:
1028
        raise errors.OpPrereqError("Can't compute node evacuation using"
1029
                                   " iallocator '%s': %s" %
1030
                                   (self.op.iallocator, ial.info),
1031
                                   errors.ECODE_NORES)
1032

    
1033
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1034

    
1035
    elif self.op.remote_node is not None:
1036
      assert self.op.mode == constants.NODE_EVAC_SEC
1037
      jobs = [
1038
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1039
                                        remote_node=self.op.remote_node,
1040
                                        disks=[],
1041
                                        mode=constants.REPLACE_DISK_CHG,
1042
                                        early_release=self.op.early_release)]
1043
        for instance_name in self.instance_names]
1044

    
1045
    else:
1046
      raise errors.ProgrammerError("No iallocator or remote node")
1047

    
1048
    return ResultWithJobs(jobs)
1049

    
1050

    
1051
class LUNodeMigrate(LogicalUnit):
1052
  """Migrate all instances from a node.
1053

1054
  """
1055
  HPATH = "node-migrate"
1056
  HTYPE = constants.HTYPE_NODE
1057
  REQ_BGL = False
1058

    
1059
  def CheckArguments(self):
1060
    pass
1061

    
1062
  def ExpandNames(self):
1063
    (self.op.node_uuid, self.op.node_name) = \
1064
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1065

    
1066
    self.share_locks = ShareAll()
1067
    self.needed_locks = {
1068
      locking.LEVEL_NODE: [self.op.node_uuid],
1069
      }
1070

    
1071
  def BuildHooksEnv(self):
1072
    """Build hooks env.
1073

1074
    This runs on the master, the primary and all the secondaries.
1075

1076
    """
1077
    return {
1078
      "NODE_NAME": self.op.node_name,
1079
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1080
      }
1081

    
1082
  def BuildHooksNodes(self):
1083
    """Build hooks nodes.
1084

1085
    """
1086
    nl = [self.cfg.GetMasterNode()]
1087
    return (nl, nl)
1088

    
1089
  def CheckPrereq(self):
1090
    pass
1091

    
1092
  def Exec(self, feedback_fn):
1093
    # Prepare jobs for migration instances
1094
    jobs = [
1095
      [opcodes.OpInstanceMigrate(
1096
        instance_name=inst.name,
1097
        mode=self.op.mode,
1098
        live=self.op.live,
1099
        iallocator=self.op.iallocator,
1100
        target_node=self.op.target_node,
1101
        allow_runtime_changes=self.op.allow_runtime_changes,
1102
        ignore_ipolicy=self.op.ignore_ipolicy)]
1103
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1104

    
1105
    # TODO: Run iallocator in this opcode and pass correct placement options to
1106
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1107
    # running the iallocator and the actual migration, a good consistency model
1108
    # will have to be found.
1109

    
1110
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1111
            frozenset([self.op.node_uuid]))
1112

    
1113
    return ResultWithJobs(jobs)
1114

    
1115

    
1116
def _GetStorageTypeArgs(cfg, storage_type):
1117
  """Returns the arguments for a storage type.
1118

1119
  """
1120
  # Special case for file storage
1121
  if storage_type == constants.ST_FILE:
1122
    # storage.FileStorage wants a list of storage directories
1123
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1124

    
1125
  return []
1126

    
1127

    
1128
class LUNodeModifyStorage(NoHooksLU):
1129
  """Logical unit for modifying a storage volume on a node.
1130

1131
  """
1132
  REQ_BGL = False
1133

    
1134
  def CheckArguments(self):
1135
    (self.op.node_uuid, self.op.node_name) = \
1136
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1137

    
1138
    storage_type = self.op.storage_type
1139

    
1140
    try:
1141
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1142
    except KeyError:
1143
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1144
                                 " modified" % storage_type,
1145
                                 errors.ECODE_INVAL)
1146

    
1147
    diff = set(self.op.changes.keys()) - modifiable
1148
    if diff:
1149
      raise errors.OpPrereqError("The following fields can not be modified for"
1150
                                 " storage units of type '%s': %r" %
1151
                                 (storage_type, list(diff)),
1152
                                 errors.ECODE_INVAL)
1153

    
1154
  def CheckPrereq(self):
1155
    """Check prerequisites.
1156

1157
    """
1158
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1159

    
1160
  def ExpandNames(self):
1161
    self.needed_locks = {
1162
      locking.LEVEL_NODE: self.op.node_uuid,
1163
      }
1164

    
1165
  def Exec(self, feedback_fn):
1166
    """Computes the list of nodes and their attributes.
1167

1168
    """
1169
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1170
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1171
                                          self.op.storage_type, st_args,
1172
                                          self.op.name, self.op.changes)
1173
    result.Raise("Failed to modify storage unit '%s' on %s" %
1174
                 (self.op.name, self.op.node_name))
1175

    
1176

    
1177
class NodeQuery(QueryBase):
1178
  FIELDS = query.NODE_FIELDS
1179

    
1180
  def ExpandNames(self, lu):
1181
    lu.needed_locks = {}
1182
    lu.share_locks = ShareAll()
1183

    
1184
    if self.names:
1185
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1186
    else:
1187
      self.wanted = locking.ALL_SET
1188

    
1189
    self.do_locking = (self.use_locking and
1190
                       query.NQ_LIVE in self.requested_data)
1191

    
1192
    if self.do_locking:
1193
      # If any non-static field is requested we need to lock the nodes
1194
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1195
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1196

    
1197
  def DeclareLocks(self, lu, level):
1198
    pass
1199

    
1200
  def _GetQueryData(self, lu):
1201
    """Computes the list of nodes and their attributes.
1202

1203
    """
1204
    all_info = lu.cfg.GetAllNodesInfo()
1205

    
1206
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1207

    
1208
    # Gather data as requested
1209
    if query.NQ_LIVE in self.requested_data:
1210
      # filter out non-vm_capable nodes
1211
      toquery_node_uuids = [node.uuid for node in all_info.values()
1212
                            if node.vm_capable and node.uuid in node_uuids]
1213
      lvm_enabled = utils.storage.IsLvmEnabled(
1214
          lu.cfg.GetClusterInfo().enabled_disk_templates)
1215
      # FIXME: this per default asks for storage space information for all
1216
      # enabled disk templates. Fix this by making it possible to specify
1217
      # space report fields for specific disk templates.
1218
      raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1219
          lu.cfg, include_spindles=lvm_enabled)
1220
      storage_units = rpc.PrepareStorageUnitsForNodes(
1221
          lu.cfg, raw_storage_units, toquery_node_uuids)
1222
      default_hypervisor = lu.cfg.GetHypervisorType()
1223
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1224
      hvspecs = [(default_hypervisor, hvparams)]
1225
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1226
                                        hvspecs)
1227
      live_data = dict(
1228
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1229
                                        require_spindles=lvm_enabled))
1230
          for (uuid, nresult) in node_data.items()
1231
          if not nresult.fail_msg and nresult.payload)
1232
    else:
1233
      live_data = None
1234

    
1235
    if query.NQ_INST in self.requested_data:
1236
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1237
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1238

    
1239
      inst_data = lu.cfg.GetAllInstancesInfo()
1240
      inst_uuid_to_inst_name = {}
1241

    
1242
      for inst in inst_data.values():
1243
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1244
        if inst.primary_node in node_to_primary:
1245
          node_to_primary[inst.primary_node].add(inst.uuid)
1246
        for secnode in inst.secondary_nodes:
1247
          if secnode in node_to_secondary:
1248
            node_to_secondary[secnode].add(inst.uuid)
1249
    else:
1250
      node_to_primary = None
1251
      node_to_secondary = None
1252
      inst_uuid_to_inst_name = None
1253

    
1254
    if query.NQ_OOB in self.requested_data:
1255
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1256
                         for uuid, node in all_info.iteritems())
1257
    else:
1258
      oob_support = None
1259

    
1260
    if query.NQ_GROUP in self.requested_data:
1261
      groups = lu.cfg.GetAllNodeGroupsInfo()
1262
    else:
1263
      groups = {}
1264

    
1265
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1266
                               live_data, lu.cfg.GetMasterNode(),
1267
                               node_to_primary, node_to_secondary,
1268
                               inst_uuid_to_inst_name, groups, oob_support,
1269
                               lu.cfg.GetClusterInfo())
1270

    
1271

    
1272
class LUNodeQuery(NoHooksLU):
1273
  """Logical unit for querying nodes.
1274

1275
  """
1276
  # pylint: disable=W0142
1277
  REQ_BGL = False
1278

    
1279
  def CheckArguments(self):
1280
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1281
                         self.op.output_fields, self.op.use_locking)
1282

    
1283
  def ExpandNames(self):
1284
    self.nq.ExpandNames(self)
1285

    
1286
  def DeclareLocks(self, level):
1287
    self.nq.DeclareLocks(self, level)
1288

    
1289
  def Exec(self, feedback_fn):
1290
    return self.nq.OldStyleQuery(self)
1291

    
1292

    
1293
def _CheckOutputFields(fields, selected):
1294
  """Checks whether all selected fields are valid according to fields.
1295

1296
  @type fields: L{utils.FieldSet}
1297
  @param fields: fields set
1298
  @type selected: L{utils.FieldSet}
1299
  @param selected: fields set
1300

1301
  """
1302
  delta = fields.NonMatching(selected)
1303
  if delta:
1304
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1305
                               % ",".join(delta), errors.ECODE_INVAL)
1306

    
1307

    
1308
class LUNodeQueryvols(NoHooksLU):
1309
  """Logical unit for getting volumes on node(s).
1310

1311
  """
1312
  REQ_BGL = False
1313

    
1314
  def CheckArguments(self):
1315
    _CheckOutputFields(utils.FieldSet("node", "phys", "vg", "name", "size",
1316
                                      "instance"),
1317
                       self.op.output_fields)
1318

    
1319
  def ExpandNames(self):
1320
    self.share_locks = ShareAll()
1321

    
1322
    if self.op.nodes:
1323
      self.needed_locks = {
1324
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1325
        }
1326
    else:
1327
      self.needed_locks = {
1328
        locking.LEVEL_NODE: locking.ALL_SET,
1329
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1330
        }
1331

    
1332
  def Exec(self, feedback_fn):
1333
    """Computes the list of nodes and their attributes.
1334

1335
    """
1336
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1337
    volumes = self.rpc.call_node_volumes(node_uuids)
1338

    
1339
    ilist = self.cfg.GetAllInstancesInfo()
1340
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1341

    
1342
    output = []
1343
    for node_uuid in node_uuids:
1344
      nresult = volumes[node_uuid]
1345
      if nresult.offline:
1346
        continue
1347
      msg = nresult.fail_msg
1348
      if msg:
1349
        self.LogWarning("Can't compute volume data on node %s: %s",
1350
                        self.cfg.GetNodeName(node_uuid), msg)
1351
        continue
1352

    
1353
      node_vols = sorted(nresult.payload,
1354
                         key=operator.itemgetter("dev"))
1355

    
1356
      for vol in node_vols:
1357
        node_output = []
1358
        for field in self.op.output_fields:
1359
          if field == "node":
1360
            val = self.cfg.GetNodeName(node_uuid)
1361
          elif field == "phys":
1362
            val = vol["dev"]
1363
          elif field == "vg":
1364
            val = vol["vg"]
1365
          elif field == "name":
1366
            val = vol["name"]
1367
          elif field == "size":
1368
            val = int(float(vol["size"]))
1369
          elif field == "instance":
1370
            inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1371
                                None)
1372
            if inst is not None:
1373
              val = inst.name
1374
            else:
1375
              val = "-"
1376
          else:
1377
            raise errors.ParameterError(field)
1378
          node_output.append(str(val))
1379

    
1380
        output.append(node_output)
1381

    
1382
    return output
1383

    
1384

    
1385
class LUNodeQueryStorage(NoHooksLU):
1386
  """Logical unit for getting information on storage units on node(s).
1387

1388
  """
1389
  REQ_BGL = False
1390

    
1391
  def CheckArguments(self):
1392
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1393
                       self.op.output_fields)
1394

    
1395
  def ExpandNames(self):
1396
    self.share_locks = ShareAll()
1397

    
1398
    if self.op.nodes:
1399
      self.needed_locks = {
1400
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1401
        }
1402
    else:
1403
      self.needed_locks = {
1404
        locking.LEVEL_NODE: locking.ALL_SET,
1405
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1406
        }
1407

    
1408
  def CheckPrereq(self):
1409
    """Check prerequisites.
1410

1411
    """
1412
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1413

    
1414
  def Exec(self, feedback_fn):
1415
    """Computes the list of nodes and their attributes.
1416

1417
    """
1418
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1419

    
1420
    # Always get name to sort by
1421
    if constants.SF_NAME in self.op.output_fields:
1422
      fields = self.op.output_fields[:]
1423
    else:
1424
      fields = [constants.SF_NAME] + self.op.output_fields
1425

    
1426
    # Never ask for node or type as it's only known to the LU
1427
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1428
      while extra in fields:
1429
        fields.remove(extra)
1430

    
1431
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1432
    name_idx = field_idx[constants.SF_NAME]
1433

    
1434
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1435
    data = self.rpc.call_storage_list(self.node_uuids,
1436
                                      self.op.storage_type, st_args,
1437
                                      self.op.name, fields)
1438

    
1439
    result = []
1440

    
1441
    for node_uuid in utils.NiceSort(self.node_uuids):
1442
      node_name = self.cfg.GetNodeName(node_uuid)
1443
      nresult = data[node_uuid]
1444
      if nresult.offline:
1445
        continue
1446

    
1447
      msg = nresult.fail_msg
1448
      if msg:
1449
        self.LogWarning("Can't get storage data from node %s: %s",
1450
                        node_name, msg)
1451
        continue
1452

    
1453
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1454

    
1455
      for name in utils.NiceSort(rows.keys()):
1456
        row = rows[name]
1457

    
1458
        out = []
1459

    
1460
        for field in self.op.output_fields:
1461
          if field == constants.SF_NODE:
1462
            val = node_name
1463
          elif field == constants.SF_TYPE:
1464
            val = self.op.storage_type
1465
          elif field in field_idx:
1466
            val = row[field_idx[field]]
1467
          else:
1468
            raise errors.ParameterError(field)
1469

    
1470
          out.append(val)
1471

    
1472
        result.append(out)
1473

    
1474
    return result
1475

    
1476

    
1477
class LUNodeRemove(LogicalUnit):
1478
  """Logical unit for removing a node.
1479

1480
  """
1481
  HPATH = "node-remove"
1482
  HTYPE = constants.HTYPE_NODE
1483

    
1484
  def BuildHooksEnv(self):
1485
    """Build hooks env.
1486

1487
    """
1488
    return {
1489
      "OP_TARGET": self.op.node_name,
1490
      "NODE_NAME": self.op.node_name,
1491
      }
1492

    
1493
  def BuildHooksNodes(self):
1494
    """Build hooks nodes.
1495

1496
    This doesn't run on the target node in the pre phase as a failed
1497
    node would then be impossible to remove.
1498

1499
    """
1500
    all_nodes = self.cfg.GetNodeList()
1501
    try:
1502
      all_nodes.remove(self.op.node_uuid)
1503
    except ValueError:
1504
      pass
1505
    return (all_nodes, all_nodes)
1506

    
1507
  def CheckPrereq(self):
1508
    """Check prerequisites.
1509

1510
    This checks:
1511
     - the node exists in the configuration
1512
     - it does not have primary or secondary instances
1513
     - it's not the master
1514

1515
    Any errors are signaled by raising errors.OpPrereqError.
1516

1517
    """
1518
    (self.op.node_uuid, self.op.node_name) = \
1519
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1520
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1521
    assert node is not None
1522

    
1523
    masternode = self.cfg.GetMasterNode()
1524
    if node.uuid == masternode:
1525
      raise errors.OpPrereqError("Node is the master node, failover to another"
1526
                                 " node is required", errors.ECODE_INVAL)
1527

    
1528
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1529
      if node.uuid in instance.all_nodes:
1530
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1531
                                   " please remove first" % instance.name,
1532
                                   errors.ECODE_INVAL)
1533
    self.op.node_name = node.name
1534
    self.node = node
1535

    
1536
  def Exec(self, feedback_fn):
1537
    """Removes the node from the cluster.
1538

1539
    """
1540
    logging.info("Stopping the node daemon and removing configs from node %s",
1541
                 self.node.name)
1542

    
1543
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1544

    
1545
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1546
      "Not owning BGL"
1547

    
1548
    # Promote nodes to master candidate as needed
1549
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1550
    self.context.RemoveNode(self.node)
1551

    
1552
    # Run post hooks on the node before it's removed
1553
    RunPostHook(self, self.node.name)
1554

    
1555
    # we have to call this by name rather than by UUID, as the node is no longer
1556
    # in the config
1557
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1558
    msg = result.fail_msg
1559
    if msg:
1560
      self.LogWarning("Errors encountered on the remote node while leaving"
1561
                      " the cluster: %s", msg)
1562

    
1563
    # Remove node from our /etc/hosts
1564
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1565
      master_node_uuid = self.cfg.GetMasterNode()
1566
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1567
                                              constants.ETC_HOSTS_REMOVE,
1568
                                              self.node.name, None)
1569
      result.Raise("Can't update hosts file with new host data")
1570
      RedistributeAncillaryFiles(self)
1571

    
1572

    
1573
class LURepairNodeStorage(NoHooksLU):
1574
  """Repairs the volume group on a node.
1575

1576
  """
1577
  REQ_BGL = False
1578

    
1579
  def CheckArguments(self):
1580
    (self.op.node_uuid, self.op.node_name) = \
1581
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1582

    
1583
    storage_type = self.op.storage_type
1584

    
1585
    if (constants.SO_FIX_CONSISTENCY not in
1586
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1587
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1588
                                 " repaired" % storage_type,
1589
                                 errors.ECODE_INVAL)
1590

    
1591
  def ExpandNames(self):
1592
    self.needed_locks = {
1593
      locking.LEVEL_NODE: [self.op.node_uuid],
1594
      }
1595

    
1596
  def _CheckFaultyDisks(self, instance, node_uuid):
1597
    """Ensure faulty disks abort the opcode or at least warn."""
1598
    try:
1599
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1600
                                 node_uuid, True):
1601
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1602
                                   " node '%s'" %
1603
                                   (instance.name,
1604
                                    self.cfg.GetNodeName(node_uuid)),
1605
                                   errors.ECODE_STATE)
1606
    except errors.OpPrereqError, err:
1607
      if self.op.ignore_consistency:
1608
        self.LogWarning(str(err.args[0]))
1609
      else:
1610
        raise
1611

    
1612
  def CheckPrereq(self):
1613
    """Check prerequisites.
1614

1615
    """
1616
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1617

    
1618
    # Check whether any instance on this node has faulty disks
1619
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1620
      if not inst.disks_active:
1621
        continue
1622
      check_nodes = set(inst.all_nodes)
1623
      check_nodes.discard(self.op.node_uuid)
1624
      for inst_node_uuid in check_nodes:
1625
        self._CheckFaultyDisks(inst, inst_node_uuid)
1626

    
1627
  def Exec(self, feedback_fn):
1628
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1629
                (self.op.name, self.op.node_name))
1630

    
1631
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1632
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1633
                                           self.op.storage_type, st_args,
1634
                                           self.op.name,
1635
                                           constants.SO_FIX_CONSISTENCY)
1636
    result.Raise("Failed to repair storage unit '%s' on %s" %
1637
                 (self.op.name, self.op.node_name))