Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 44ffd981

History | View | Annotate | Download (57.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import query
34
from ganeti import rpc
35
from ganeti import utils
36
from ganeti.masterd import iallocator
37

    
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
39
  ResultWithJobs
40
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
41
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
42
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
43
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
44
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
45
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
46
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
47
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
48

    
49

    
50
def _DecideSelfPromotion(lu, exceptions=None):
51
  """Decide whether I should promote myself as a master candidate.
52

53
  """
54
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
55
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
56
  # the new node will increase mc_max with one, so:
57
  mc_should = min(mc_should + 1, cp_size)
58
  return mc_now < mc_should
59

    
60

    
61
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
62
  """Ensure that a node has the given secondary ip.
63

64
  @type lu: L{LogicalUnit}
65
  @param lu: the LU on behalf of which we make the check
66
  @type node: L{objects.Node}
67
  @param node: the node to check
68
  @type secondary_ip: string
69
  @param secondary_ip: the ip to check
70
  @type prereq: boolean
71
  @param prereq: whether to throw a prerequisite or an execute error
72
  @raise errors.OpPrereqError: if the node doesn't have the ip,
73
  and prereq=True
74
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75

76
  """
77
  # this can be called with a new node, which has no UUID yet, so perform the
78
  # RPC call using its name
79
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
80
  result.Raise("Failure checking secondary ip on node %s" % node.name,
81
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
82
  if not result.payload:
83
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
84
           " please fix and re-run this command" % secondary_ip)
85
    if prereq:
86
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
87
    else:
88
      raise errors.OpExecError(msg)
89

    
90

    
91
class LUNodeAdd(LogicalUnit):
92
  """Logical unit for adding node to the cluster.
93

94
  """
95
  HPATH = "node-add"
96
  HTYPE = constants.HTYPE_NODE
97
  _NFLAGS = ["master_capable", "vm_capable"]
98

    
99
  def CheckArguments(self):
100
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
101
    # validate/normalize the node name
102
    self.hostname = netutils.GetHostname(name=self.op.node_name,
103
                                         family=self.primary_ip_family)
104
    self.op.node_name = self.hostname.name
105

    
106
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
107
      raise errors.OpPrereqError("Cannot readd the master node",
108
                                 errors.ECODE_STATE)
109

    
110
    if self.op.readd and self.op.group:
111
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
112
                                 " being readded", errors.ECODE_INVAL)
113

    
114
  def BuildHooksEnv(self):
115
    """Build hooks env.
116

117
    This will run on all nodes before, and on all nodes + the new node after.
118

119
    """
120
    return {
121
      "OP_TARGET": self.op.node_name,
122
      "NODE_NAME": self.op.node_name,
123
      "NODE_PIP": self.op.primary_ip,
124
      "NODE_SIP": self.op.secondary_ip,
125
      "MASTER_CAPABLE": str(self.op.master_capable),
126
      "VM_CAPABLE": str(self.op.vm_capable),
127
      }
128

    
129
  def BuildHooksNodes(self):
130
    """Build hooks nodes.
131

132
    """
133
    hook_nodes = self.cfg.GetNodeList()
134
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
135
    if new_node_info is not None:
136
      # Exclude added node
137
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
138

    
139
    # add the new node as post hook node by name; it does not have an UUID yet
140
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
141

    
142
  def CheckPrereq(self):
143
    """Check prerequisites.
144

145
    This checks:
146
     - the new node is not already in the config
147
     - it is resolvable
148
     - its parameters (single/dual homed) matches the cluster
149

150
    Any errors are signaled by raising errors.OpPrereqError.
151

152
    """
153
    node_name = self.hostname.name
154
    self.op.primary_ip = self.hostname.ip
155
    if self.op.secondary_ip is None:
156
      if self.primary_ip_family == netutils.IP6Address.family:
157
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
158
                                   " IPv4 address must be given as secondary",
159
                                   errors.ECODE_INVAL)
160
      self.op.secondary_ip = self.op.primary_ip
161

    
162
    secondary_ip = self.op.secondary_ip
163
    if not netutils.IP4Address.IsValid(secondary_ip):
164
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
165
                                 " address" % secondary_ip, errors.ECODE_INVAL)
166

    
167
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
168
    if not self.op.readd and existing_node_info is not None:
169
      raise errors.OpPrereqError("Node %s is already in the configuration" %
170
                                 node_name, errors.ECODE_EXISTS)
171
    elif self.op.readd and existing_node_info is None:
172
      raise errors.OpPrereqError("Node %s is not in the configuration" %
173
                                 node_name, errors.ECODE_NOENT)
174

    
175
    self.changed_primary_ip = False
176

    
177
    for existing_node in self.cfg.GetAllNodesInfo().values():
178
      if self.op.readd and node_name == existing_node.name:
179
        if existing_node.secondary_ip != secondary_ip:
180
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
181
                                     " address configuration as before",
182
                                     errors.ECODE_INVAL)
183
        if existing_node.primary_ip != self.op.primary_ip:
184
          self.changed_primary_ip = True
185

    
186
        continue
187

    
188
      if (existing_node.primary_ip == self.op.primary_ip or
189
          existing_node.secondary_ip == self.op.primary_ip or
190
          existing_node.primary_ip == secondary_ip or
191
          existing_node.secondary_ip == secondary_ip):
192
        raise errors.OpPrereqError("New node ip address(es) conflict with"
193
                                   " existing node %s" % existing_node.name,
194
                                   errors.ECODE_NOTUNIQUE)
195

    
196
    # After this 'if' block, None is no longer a valid value for the
197
    # _capable op attributes
198
    if self.op.readd:
199
      assert existing_node_info is not None, \
200
        "Can't retrieve locked node %s" % node_name
201
      for attr in self._NFLAGS:
202
        if getattr(self.op, attr) is None:
203
          setattr(self.op, attr, getattr(existing_node_info, attr))
204
    else:
205
      for attr in self._NFLAGS:
206
        if getattr(self.op, attr) is None:
207
          setattr(self.op, attr, True)
208

    
209
    if self.op.readd and not self.op.vm_capable:
210
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
211
      if pri or sec:
212
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
213
                                   " flag set to false, but it already holds"
214
                                   " instances" % node_name,
215
                                   errors.ECODE_STATE)
216

    
217
    # check that the type of the node (single versus dual homed) is the
218
    # same as for the master
219
    myself = self.cfg.GetMasterNodeInfo()
220
    master_singlehomed = myself.secondary_ip == myself.primary_ip
221
    newbie_singlehomed = secondary_ip == self.op.primary_ip
222
    if master_singlehomed != newbie_singlehomed:
223
      if master_singlehomed:
224
        raise errors.OpPrereqError("The master has no secondary ip but the"
225
                                   " new node has one",
226
                                   errors.ECODE_INVAL)
227
      else:
228
        raise errors.OpPrereqError("The master has a secondary ip but the"
229
                                   " new node doesn't have one",
230
                                   errors.ECODE_INVAL)
231

    
232
    # checks reachability
233
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
234
      raise errors.OpPrereqError("Node not reachable by ping",
235
                                 errors.ECODE_ENVIRON)
236

    
237
    if not newbie_singlehomed:
238
      # check reachability from my secondary ip to newbie's secondary ip
239
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
240
                              source=myself.secondary_ip):
241
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
242
                                   " based ping to node daemon port",
243
                                   errors.ECODE_ENVIRON)
244

    
245
    if self.op.readd:
246
      exceptions = [existing_node_info.uuid]
247
    else:
248
      exceptions = []
249

    
250
    if self.op.master_capable:
251
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
252
    else:
253
      self.master_candidate = False
254

    
255
    node_group = self.cfg.LookupNodeGroup(self.op.group)
256

    
257
    if self.op.readd:
258
      self.new_node = existing_node_info
259
    else:
260
      self.new_node = objects.Node(name=node_name,
261
                                   primary_ip=self.op.primary_ip,
262
                                   secondary_ip=secondary_ip,
263
                                   master_candidate=self.master_candidate,
264
                                   offline=False, drained=False,
265
                                   group=node_group, ndparams={})
266

    
267
    if self.op.ndparams:
268
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270
                           "node", "cluster or group")
271

    
272
    if self.op.hv_state:
273
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274

    
275
    if self.op.disk_state:
276
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277

    
278
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279
    #       it a property on the base class.
280
    rpcrunner = rpc.DnsOnlyRunner()
281
    result = rpcrunner.call_version([node_name])[node_name]
282
    result.Raise("Can't get version information from node %s" % node_name)
283
    if constants.PROTOCOL_VERSION == result.payload:
284
      logging.info("Communication to node %s fine, sw version %s match",
285
                   node_name, result.payload)
286
    else:
287
      raise errors.OpPrereqError("Version mismatch master version %s,"
288
                                 " node version %s" %
289
                                 (constants.PROTOCOL_VERSION, result.payload),
290
                                 errors.ECODE_ENVIRON)
291

    
292
    vg_name = self.cfg.GetVGName()
293
    if vg_name is not None:
294
      vparams = {constants.NV_PVLIST: [vg_name]}
295
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296
      cname = self.cfg.GetClusterName()
297
      result = rpcrunner.call_node_verify_light(
298
          [node_name], vparams, cname,
299
          self.cfg.GetClusterInfo().hvparams,
300
          {node_name: node_group},
301
          self.cfg.GetAllNodeGroupsInfoDict()
302
        )[node_name]
303
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
304
      if errmsgs:
305
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
306
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
307

    
308
  def _InitOpenVSwitch(self):
309
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
310
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
311

    
312
    ovs = filled_ndparams.get(constants.ND_OVS, None)
313
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
314
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
315

    
316
    if ovs:
317
      if not ovs_link:
318
        self.LogInfo("No physical interface for OpenvSwitch was given."
319
                     " OpenvSwitch will not have an outside connection. This"
320
                     " might not be what you want.")
321

    
322
      result = self.rpc.call_node_configure_ovs(
323
                 self.new_node.name, ovs_name, ovs_link)
324
      result.Raise("Failed to initialize OpenVSwitch on new node")
325

    
326
  def Exec(self, feedback_fn):
327
    """Adds the new node to the cluster.
328

329
    """
330
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
331
      "Not owning BGL"
332

    
333
    # We adding a new node so we assume it's powered
334
    self.new_node.powered = True
335

    
336
    # for re-adds, reset the offline/drained/master-candidate flags;
337
    # we need to reset here, otherwise offline would prevent RPC calls
338
    # later in the procedure; this also means that if the re-add
339
    # fails, we are left with a non-offlined, broken node
340
    if self.op.readd:
341
      self.new_node.offline = False
342
      self.new_node.drained = False
343
      self.LogInfo("Readding a node, the offline/drained flags were reset")
344
      # if we demote the node, we do cleanup later in the procedure
345
      self.new_node.master_candidate = self.master_candidate
346
      if self.changed_primary_ip:
347
        self.new_node.primary_ip = self.op.primary_ip
348

    
349
    # copy the master/vm_capable flags
350
    for attr in self._NFLAGS:
351
      setattr(self.new_node, attr, getattr(self.op, attr))
352

    
353
    # notify the user about any possible mc promotion
354
    if self.new_node.master_candidate:
355
      self.LogInfo("Node will be a master candidate")
356

    
357
    if self.op.ndparams:
358
      self.new_node.ndparams = self.op.ndparams
359
    else:
360
      self.new_node.ndparams = {}
361

    
362
    if self.op.hv_state:
363
      self.new_node.hv_state_static = self.new_hv_state
364

    
365
    if self.op.disk_state:
366
      self.new_node.disk_state_static = self.new_disk_state
367

    
368
    # Add node to our /etc/hosts, and add key to known_hosts
369
    if self.cfg.GetClusterInfo().modify_etc_hosts:
370
      master_node = self.cfg.GetMasterNode()
371
      result = self.rpc.call_etc_hosts_modify(
372
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
373
                 self.hostname.ip)
374
      result.Raise("Can't update hosts file with new host data")
375

    
376
    if self.new_node.secondary_ip != self.new_node.primary_ip:
377
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
378
                               False)
379

    
380
    node_verifier_uuids = [self.cfg.GetMasterNode()]
381
    node_verify_param = {
382
      constants.NV_NODELIST: ([self.new_node.name], {}),
383
      # TODO: do a node-net-test as well?
384
    }
385

    
386
    result = self.rpc.call_node_verify(
387
               node_verifier_uuids, node_verify_param,
388
               self.cfg.GetClusterName(),
389
               self.cfg.GetClusterInfo().hvparams,
390
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
391
               self.cfg.GetAllNodeGroupsInfoDict()
392
               )
393
    for verifier in node_verifier_uuids:
394
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
395
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
396
      if nl_payload:
397
        for failed in nl_payload:
398
          feedback_fn("ssh/hostname verification failed"
399
                      " (checking from %s): %s" %
400
                      (verifier, nl_payload[failed]))
401
        raise errors.OpExecError("ssh/hostname verification failed")
402

    
403
    self._InitOpenVSwitch()
404

    
405
    if self.op.readd:
406
      self.context.ReaddNode(self.new_node)
407
      RedistributeAncillaryFiles(self)
408
      # make sure we redistribute the config
409
      self.cfg.Update(self.new_node, feedback_fn)
410
      # and make sure the new node will not have old files around
411
      if not self.new_node.master_candidate:
412
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
413
        result.Warn("Node failed to demote itself from master candidate status",
414
                    self.LogWarning)
415
    else:
416
      self.context.AddNode(self.new_node, self.proc.GetECId())
417
      RedistributeAncillaryFiles(self)
418

    
419

    
420
class LUNodeSetParams(LogicalUnit):
421
  """Modifies the parameters of a node.
422

423
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
424
      to the node role (as _ROLE_*)
425
  @cvar _R2F: a dictionary from node role to tuples of flags
426
  @cvar _FLAGS: a list of attribute names corresponding to the flags
427

428
  """
429
  HPATH = "node-modify"
430
  HTYPE = constants.HTYPE_NODE
431
  REQ_BGL = False
432
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
433
  _F2R = {
434
    (True, False, False): _ROLE_CANDIDATE,
435
    (False, True, False): _ROLE_DRAINED,
436
    (False, False, True): _ROLE_OFFLINE,
437
    (False, False, False): _ROLE_REGULAR,
438
    }
439
  _R2F = dict((v, k) for k, v in _F2R.items())
440
  _FLAGS = ["master_candidate", "drained", "offline"]
441

    
442
  def CheckArguments(self):
443
    (self.op.node_uuid, self.op.node_name) = \
444
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
445
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
446
                self.op.master_capable, self.op.vm_capable,
447
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
448
                self.op.disk_state]
449
    if all_mods.count(None) == len(all_mods):
450
      raise errors.OpPrereqError("Please pass at least one modification",
451
                                 errors.ECODE_INVAL)
452
    if all_mods.count(True) > 1:
453
      raise errors.OpPrereqError("Can't set the node into more than one"
454
                                 " state at the same time",
455
                                 errors.ECODE_INVAL)
456

    
457
    # Boolean value that tells us whether we might be demoting from MC
458
    self.might_demote = (self.op.master_candidate is False or
459
                         self.op.offline is True or
460
                         self.op.drained is True or
461
                         self.op.master_capable is False)
462

    
463
    if self.op.secondary_ip:
464
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
465
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
466
                                   " address" % self.op.secondary_ip,
467
                                   errors.ECODE_INVAL)
468

    
469
    self.lock_all = self.op.auto_promote and self.might_demote
470
    self.lock_instances = self.op.secondary_ip is not None
471

    
472
  def _InstanceFilter(self, instance):
473
    """Filter for getting affected instances.
474

475
    """
476
    return (instance.disk_template in constants.DTS_INT_MIRROR and
477
            self.op.node_uuid in instance.all_nodes)
478

    
479
  def ExpandNames(self):
480
    if self.lock_all:
481
      self.needed_locks = {
482
        locking.LEVEL_NODE: locking.ALL_SET,
483

    
484
        # Block allocations when all nodes are locked
485
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
486
        }
487
    else:
488
      self.needed_locks = {
489
        locking.LEVEL_NODE: self.op.node_uuid,
490
        }
491

    
492
    # Since modifying a node can have severe effects on currently running
493
    # operations the resource lock is at least acquired in shared mode
494
    self.needed_locks[locking.LEVEL_NODE_RES] = \
495
      self.needed_locks[locking.LEVEL_NODE]
496

    
497
    # Get all locks except nodes in shared mode; they are not used for anything
498
    # but read-only access
499
    self.share_locks = ShareAll()
500
    self.share_locks[locking.LEVEL_NODE] = 0
501
    self.share_locks[locking.LEVEL_NODE_RES] = 0
502
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
503

    
504
    if self.lock_instances:
505
      self.needed_locks[locking.LEVEL_INSTANCE] = \
506
        self.cfg.GetInstanceNames(
507
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
508

    
509
  def BuildHooksEnv(self):
510
    """Build hooks env.
511

512
    This runs on the master node.
513

514
    """
515
    return {
516
      "OP_TARGET": self.op.node_name,
517
      "MASTER_CANDIDATE": str(self.op.master_candidate),
518
      "OFFLINE": str(self.op.offline),
519
      "DRAINED": str(self.op.drained),
520
      "MASTER_CAPABLE": str(self.op.master_capable),
521
      "VM_CAPABLE": str(self.op.vm_capable),
522
      }
523

    
524
  def BuildHooksNodes(self):
525
    """Build hooks nodes.
526

527
    """
528
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
529
    return (nl, nl)
530

    
531
  def CheckPrereq(self):
532
    """Check prerequisites.
533

534
    This only checks the instance list against the existing names.
535

536
    """
537
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
538
    if self.lock_instances:
539
      affected_instances = \
540
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
541

    
542
      # Verify instance locks
543
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
544
      wanted_instance_names = frozenset([inst.name for inst in
545
                                         affected_instances.values()])
546
      if wanted_instance_names - owned_instance_names:
547
        raise errors.OpPrereqError("Instances affected by changing node %s's"
548
                                   " secondary IP address have changed since"
549
                                   " locks were acquired, wanted '%s', have"
550
                                   " '%s'; retry the operation" %
551
                                   (node.name,
552
                                    utils.CommaJoin(wanted_instance_names),
553
                                    utils.CommaJoin(owned_instance_names)),
554
                                   errors.ECODE_STATE)
555
    else:
556
      affected_instances = None
557

    
558
    if (self.op.master_candidate is not None or
559
        self.op.drained is not None or
560
        self.op.offline is not None):
561
      # we can't change the master's node flags
562
      if node.uuid == self.cfg.GetMasterNode():
563
        raise errors.OpPrereqError("The master role can be changed"
564
                                   " only via master-failover",
565
                                   errors.ECODE_INVAL)
566

    
567
    if self.op.master_candidate and not node.master_capable:
568
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
569
                                 " it a master candidate" % node.name,
570
                                 errors.ECODE_STATE)
571

    
572
    if self.op.vm_capable is False:
573
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
574
      if ipri or isec:
575
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
576
                                   " the vm_capable flag" % node.name,
577
                                   errors.ECODE_STATE)
578

    
579
    if node.master_candidate and self.might_demote and not self.lock_all:
580
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
581
      # check if after removing the current node, we're missing master
582
      # candidates
583
      (mc_remaining, mc_should, _) = \
584
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
585
      if mc_remaining < mc_should:
586
        raise errors.OpPrereqError("Not enough master candidates, please"
587
                                   " pass auto promote option to allow"
588
                                   " promotion (--auto-promote or RAPI"
589
                                   " auto_promote=True)", errors.ECODE_STATE)
590

    
591
    self.old_flags = old_flags = (node.master_candidate,
592
                                  node.drained, node.offline)
593
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
594
    self.old_role = old_role = self._F2R[old_flags]
595

    
596
    # Check for ineffective changes
597
    for attr in self._FLAGS:
598
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
599
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
600
        setattr(self.op, attr, None)
601

    
602
    # Past this point, any flag change to False means a transition
603
    # away from the respective state, as only real changes are kept
604

    
605
    # TODO: We might query the real power state if it supports OOB
606
    if SupportsOob(self.cfg, node):
607
      if self.op.offline is False and not (node.powered or
608
                                           self.op.powered is True):
609
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
610
                                    " offline status can be reset") %
611
                                   self.op.node_name, errors.ECODE_STATE)
612
    elif self.op.powered is not None:
613
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
614
                                  " as it does not support out-of-band"
615
                                  " handling") % self.op.node_name,
616
                                 errors.ECODE_STATE)
617

    
618
    # If we're being deofflined/drained, we'll MC ourself if needed
619
    if (self.op.drained is False or self.op.offline is False or
620
        (self.op.master_capable and not node.master_capable)):
621
      if _DecideSelfPromotion(self):
622
        self.op.master_candidate = True
623
        self.LogInfo("Auto-promoting node to master candidate")
624

    
625
    # If we're no longer master capable, we'll demote ourselves from MC
626
    if self.op.master_capable is False and node.master_candidate:
627
      self.LogInfo("Demoting from master candidate")
628
      self.op.master_candidate = False
629

    
630
    # Compute new role
631
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
632
    if self.op.master_candidate:
633
      new_role = self._ROLE_CANDIDATE
634
    elif self.op.drained:
635
      new_role = self._ROLE_DRAINED
636
    elif self.op.offline:
637
      new_role = self._ROLE_OFFLINE
638
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
639
      # False is still in new flags, which means we're un-setting (the
640
      # only) True flag
641
      new_role = self._ROLE_REGULAR
642
    else: # no new flags, nothing, keep old role
643
      new_role = old_role
644

    
645
    self.new_role = new_role
646

    
647
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
648
      # Trying to transition out of offline status
649
      result = self.rpc.call_version([node.uuid])[node.uuid]
650
      if result.fail_msg:
651
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
652
                                   " to report its version: %s" %
653
                                   (node.name, result.fail_msg),
654
                                   errors.ECODE_STATE)
655
      else:
656
        self.LogWarning("Transitioning node from offline to online state"
657
                        " without using re-add. Please make sure the node"
658
                        " is healthy!")
659

    
660
    # When changing the secondary ip, verify if this is a single-homed to
661
    # multi-homed transition or vice versa, and apply the relevant
662
    # restrictions.
663
    if self.op.secondary_ip:
664
      # Ok even without locking, because this can't be changed by any LU
665
      master = self.cfg.GetMasterNodeInfo()
666
      master_singlehomed = master.secondary_ip == master.primary_ip
667
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
668
        if self.op.force and node.uuid == master.uuid:
669
          self.LogWarning("Transitioning from single-homed to multi-homed"
670
                          " cluster; all nodes will require a secondary IP"
671
                          " address")
672
        else:
673
          raise errors.OpPrereqError("Changing the secondary ip on a"
674
                                     " single-homed cluster requires the"
675
                                     " --force option to be passed, and the"
676
                                     " target node to be the master",
677
                                     errors.ECODE_INVAL)
678
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
679
        if self.op.force and node.uuid == master.uuid:
680
          self.LogWarning("Transitioning from multi-homed to single-homed"
681
                          " cluster; secondary IP addresses will have to be"
682
                          " removed")
683
        else:
684
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
685
                                     " same as the primary IP on a multi-homed"
686
                                     " cluster, unless the --force option is"
687
                                     " passed, and the target node is the"
688
                                     " master", errors.ECODE_INVAL)
689

    
690
      assert not (set([inst.name for inst in affected_instances.values()]) -
691
                  self.owned_locks(locking.LEVEL_INSTANCE))
692

    
693
      if node.offline:
694
        if affected_instances:
695
          msg = ("Cannot change secondary IP address: offline node has"
696
                 " instances (%s) configured to use it" %
697
                 utils.CommaJoin(
698
                   [inst.name for inst in affected_instances.values()]))
699
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
700
      else:
701
        # On online nodes, check that no instances are running, and that
702
        # the node has the new ip and we can reach it.
703
        for instance in affected_instances.values():
704
          CheckInstanceState(self, instance, INSTANCE_DOWN,
705
                             msg="cannot change secondary ip")
706

    
707
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
708
        if master.uuid != node.uuid:
709
          # check reachability from master secondary ip to new secondary ip
710
          if not netutils.TcpPing(self.op.secondary_ip,
711
                                  constants.DEFAULT_NODED_PORT,
712
                                  source=master.secondary_ip):
713
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
714
                                       " based ping to node daemon port",
715
                                       errors.ECODE_ENVIRON)
716

    
717
    if self.op.ndparams:
718
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
719
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
720
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
721
                           "node", "cluster or group")
722
      self.new_ndparams = new_ndparams
723

    
724
    if self.op.hv_state:
725
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
726
                                                node.hv_state_static)
727

    
728
    if self.op.disk_state:
729
      self.new_disk_state = \
730
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
731

    
732
  def Exec(self, feedback_fn):
733
    """Modifies a node.
734

735
    """
736
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
737
    result = []
738

    
739
    if self.op.ndparams:
740
      node.ndparams = self.new_ndparams
741

    
742
    if self.op.powered is not None:
743
      node.powered = self.op.powered
744

    
745
    if self.op.hv_state:
746
      node.hv_state_static = self.new_hv_state
747

    
748
    if self.op.disk_state:
749
      node.disk_state_static = self.new_disk_state
750

    
751
    for attr in ["master_capable", "vm_capable"]:
752
      val = getattr(self.op, attr)
753
      if val is not None:
754
        setattr(node, attr, val)
755
        result.append((attr, str(val)))
756

    
757
    if self.new_role != self.old_role:
758
      # Tell the node to demote itself, if no longer MC and not offline
759
      if self.old_role == self._ROLE_CANDIDATE and \
760
          self.new_role != self._ROLE_OFFLINE:
761
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
762
        if msg:
763
          self.LogWarning("Node failed to demote itself: %s", msg)
764

    
765
      new_flags = self._R2F[self.new_role]
766
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
767
        if of != nf:
768
          result.append((desc, str(nf)))
769
      (node.master_candidate, node.drained, node.offline) = new_flags
770

    
771
      # we locked all nodes, we adjust the CP before updating this node
772
      if self.lock_all:
773
        AdjustCandidatePool(self, [node.uuid])
774

    
775
    if self.op.secondary_ip:
776
      node.secondary_ip = self.op.secondary_ip
777
      result.append(("secondary_ip", self.op.secondary_ip))
778

    
779
    # this will trigger configuration file update, if needed
780
    self.cfg.Update(node, feedback_fn)
781

    
782
    # this will trigger job queue propagation or cleanup if the mc
783
    # flag changed
784
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
785
      self.context.ReaddNode(node)
786

    
787
    return result
788

    
789

    
790
class LUNodePowercycle(NoHooksLU):
791
  """Powercycles a node.
792

793
  """
794
  REQ_BGL = False
795

    
796
  def CheckArguments(self):
797
    (self.op.node_uuid, self.op.node_name) = \
798
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
799

    
800
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
801
      raise errors.OpPrereqError("The node is the master and the force"
802
                                 " parameter was not set",
803
                                 errors.ECODE_INVAL)
804

    
805
  def ExpandNames(self):
806
    """Locking for PowercycleNode.
807

808
    This is a last-resort option and shouldn't block on other
809
    jobs. Therefore, we grab no locks.
810

811
    """
812
    self.needed_locks = {}
813

    
814
  def Exec(self, feedback_fn):
815
    """Reboots a node.
816

817
    """
818
    default_hypervisor = self.cfg.GetHypervisorType()
819
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
820
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
821
                                           default_hypervisor,
822
                                           hvparams)
823
    result.Raise("Failed to schedule the reboot")
824
    return result.payload
825

    
826

    
827
def _GetNodeInstancesInner(cfg, fn):
828
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
829

    
830

    
831
def _GetNodePrimaryInstances(cfg, node_uuid):
832
  """Returns primary instances on a node.
833

834
  """
835
  return _GetNodeInstancesInner(cfg,
836
                                lambda inst: node_uuid == inst.primary_node)
837

    
838

    
839
def _GetNodeSecondaryInstances(cfg, node_uuid):
840
  """Returns secondary instances on a node.
841

842
  """
843
  return _GetNodeInstancesInner(cfg,
844
                                lambda inst: node_uuid in inst.secondary_nodes)
845

    
846

    
847
def _GetNodeInstances(cfg, node_uuid):
848
  """Returns a list of all primary and secondary instances on a node.
849

850
  """
851

    
852
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
853

    
854

    
855
class LUNodeEvacuate(NoHooksLU):
856
  """Evacuates instances off a list of nodes.
857

858
  """
859
  REQ_BGL = False
860

    
861
  def CheckArguments(self):
862
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
863

    
864
  def ExpandNames(self):
865
    (self.op.node_uuid, self.op.node_name) = \
866
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
867

    
868
    if self.op.remote_node is not None:
869
      (self.op.remote_node_uuid, self.op.remote_node) = \
870
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
871
                              self.op.remote_node)
872
      assert self.op.remote_node
873

    
874
      if self.op.node_uuid == self.op.remote_node_uuid:
875
        raise errors.OpPrereqError("Can not use evacuated node as a new"
876
                                   " secondary node", errors.ECODE_INVAL)
877

    
878
      if self.op.mode != constants.NODE_EVAC_SEC:
879
        raise errors.OpPrereqError("Without the use of an iallocator only"
880
                                   " secondary instances can be evacuated",
881
                                   errors.ECODE_INVAL)
882

    
883
    # Declare locks
884
    self.share_locks = ShareAll()
885
    self.needed_locks = {
886
      locking.LEVEL_INSTANCE: [],
887
      locking.LEVEL_NODEGROUP: [],
888
      locking.LEVEL_NODE: [],
889
      }
890

    
891
    # Determine nodes (via group) optimistically, needs verification once locks
892
    # have been acquired
893
    self.lock_nodes = self._DetermineNodes()
894

    
895
  def _DetermineNodes(self):
896
    """Gets the list of node UUIDs to operate on.
897

898
    """
899
    if self.op.remote_node is None:
900
      # Iallocator will choose any node(s) in the same group
901
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
902
    else:
903
      group_nodes = frozenset([self.op.remote_node_uuid])
904

    
905
    # Determine nodes to be locked
906
    return set([self.op.node_uuid]) | group_nodes
907

    
908
  def _DetermineInstances(self):
909
    """Builds list of instances to operate on.
910

911
    """
912
    assert self.op.mode in constants.NODE_EVAC_MODES
913

    
914
    if self.op.mode == constants.NODE_EVAC_PRI:
915
      # Primary instances only
916
      inst_fn = _GetNodePrimaryInstances
917
      assert self.op.remote_node is None, \
918
        "Evacuating primary instances requires iallocator"
919
    elif self.op.mode == constants.NODE_EVAC_SEC:
920
      # Secondary instances only
921
      inst_fn = _GetNodeSecondaryInstances
922
    else:
923
      # All instances
924
      assert self.op.mode == constants.NODE_EVAC_ALL
925
      inst_fn = _GetNodeInstances
926
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
927
      # per instance
928
      raise errors.OpPrereqError("Due to an issue with the iallocator"
929
                                 " interface it is not possible to evacuate"
930
                                 " all instances at once; specify explicitly"
931
                                 " whether to evacuate primary or secondary"
932
                                 " instances",
933
                                 errors.ECODE_INVAL)
934

    
935
    return inst_fn(self.cfg, self.op.node_uuid)
936

    
937
  def DeclareLocks(self, level):
938
    if level == locking.LEVEL_INSTANCE:
939
      # Lock instances optimistically, needs verification once node and group
940
      # locks have been acquired
941
      self.needed_locks[locking.LEVEL_INSTANCE] = \
942
        set(i.name for i in self._DetermineInstances())
943

    
944
    elif level == locking.LEVEL_NODEGROUP:
945
      # Lock node groups for all potential target nodes optimistically, needs
946
      # verification once nodes have been acquired
947
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
948
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
949

    
950
    elif level == locking.LEVEL_NODE:
951
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
952

    
953
  def CheckPrereq(self):
954
    # Verify locks
955
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
956
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
957
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
958

    
959
    need_nodes = self._DetermineNodes()
960

    
961
    if not owned_nodes.issuperset(need_nodes):
962
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
963
                                 " locks were acquired, current nodes are"
964
                                 " are '%s', used to be '%s'; retry the"
965
                                 " operation" %
966
                                 (self.op.node_name,
967
                                  utils.CommaJoin(need_nodes),
968
                                  utils.CommaJoin(owned_nodes)),
969
                                 errors.ECODE_STATE)
970

    
971
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
972
    if owned_groups != wanted_groups:
973
      raise errors.OpExecError("Node groups changed since locks were acquired,"
974
                               " current groups are '%s', used to be '%s';"
975
                               " retry the operation" %
976
                               (utils.CommaJoin(wanted_groups),
977
                                utils.CommaJoin(owned_groups)))
978

    
979
    # Determine affected instances
980
    self.instances = self._DetermineInstances()
981
    self.instance_names = [i.name for i in self.instances]
982

    
983
    if set(self.instance_names) != owned_instance_names:
984
      raise errors.OpExecError("Instances on node '%s' changed since locks"
985
                               " were acquired, current instances are '%s',"
986
                               " used to be '%s'; retry the operation" %
987
                               (self.op.node_name,
988
                                utils.CommaJoin(self.instance_names),
989
                                utils.CommaJoin(owned_instance_names)))
990

    
991
    if self.instance_names:
992
      self.LogInfo("Evacuating instances from node '%s': %s",
993
                   self.op.node_name,
994
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
995
    else:
996
      self.LogInfo("No instances to evacuate from node '%s'",
997
                   self.op.node_name)
998

    
999
    if self.op.remote_node is not None:
1000
      for i in self.instances:
1001
        if i.primary_node == self.op.remote_node_uuid:
1002
          raise errors.OpPrereqError("Node %s is the primary node of"
1003
                                     " instance %s, cannot use it as"
1004
                                     " secondary" %
1005
                                     (self.op.remote_node, i.name),
1006
                                     errors.ECODE_INVAL)
1007

    
1008
  def Exec(self, feedback_fn):
1009
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1010

    
1011
    if not self.instance_names:
1012
      # No instances to evacuate
1013
      jobs = []
1014

    
1015
    elif self.op.iallocator is not None:
1016
      # TODO: Implement relocation to other group
1017
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1018
                                     instances=list(self.instance_names))
1019
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1020

    
1021
      ial.Run(self.op.iallocator)
1022

    
1023
      if not ial.success:
1024
        raise errors.OpPrereqError("Can't compute node evacuation using"
1025
                                   " iallocator '%s': %s" %
1026
                                   (self.op.iallocator, ial.info),
1027
                                   errors.ECODE_NORES)
1028

    
1029
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1030

    
1031
    elif self.op.remote_node is not None:
1032
      assert self.op.mode == constants.NODE_EVAC_SEC
1033
      jobs = [
1034
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1035
                                        remote_node=self.op.remote_node,
1036
                                        disks=[],
1037
                                        mode=constants.REPLACE_DISK_CHG,
1038
                                        early_release=self.op.early_release)]
1039
        for instance_name in self.instance_names]
1040

    
1041
    else:
1042
      raise errors.ProgrammerError("No iallocator or remote node")
1043

    
1044
    return ResultWithJobs(jobs)
1045

    
1046

    
1047
class LUNodeMigrate(LogicalUnit):
1048
  """Migrate all instances from a node.
1049

1050
  """
1051
  HPATH = "node-migrate"
1052
  HTYPE = constants.HTYPE_NODE
1053
  REQ_BGL = False
1054

    
1055
  def CheckArguments(self):
1056
    pass
1057

    
1058
  def ExpandNames(self):
1059
    (self.op.node_uuid, self.op.node_name) = \
1060
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1061

    
1062
    self.share_locks = ShareAll()
1063
    self.needed_locks = {
1064
      locking.LEVEL_NODE: [self.op.node_uuid],
1065
      }
1066

    
1067
  def BuildHooksEnv(self):
1068
    """Build hooks env.
1069

1070
    This runs on the master, the primary and all the secondaries.
1071

1072
    """
1073
    return {
1074
      "NODE_NAME": self.op.node_name,
1075
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1076
      }
1077

    
1078
  def BuildHooksNodes(self):
1079
    """Build hooks nodes.
1080

1081
    """
1082
    nl = [self.cfg.GetMasterNode()]
1083
    return (nl, nl)
1084

    
1085
  def CheckPrereq(self):
1086
    pass
1087

    
1088
  def Exec(self, feedback_fn):
1089
    # Prepare jobs for migration instances
1090
    jobs = [
1091
      [opcodes.OpInstanceMigrate(
1092
        instance_name=inst.name,
1093
        mode=self.op.mode,
1094
        live=self.op.live,
1095
        iallocator=self.op.iallocator,
1096
        target_node=self.op.target_node,
1097
        allow_runtime_changes=self.op.allow_runtime_changes,
1098
        ignore_ipolicy=self.op.ignore_ipolicy)]
1099
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1100

    
1101
    # TODO: Run iallocator in this opcode and pass correct placement options to
1102
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1103
    # running the iallocator and the actual migration, a good consistency model
1104
    # will have to be found.
1105

    
1106
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1107
            frozenset([self.op.node_uuid]))
1108

    
1109
    return ResultWithJobs(jobs)
1110

    
1111

    
1112
def _GetStorageTypeArgs(cfg, storage_type):
1113
  """Returns the arguments for a storage type.
1114

1115
  """
1116
  # Special case for file storage
1117
  if storage_type == constants.ST_FILE:
1118
    # storage.FileStorage wants a list of storage directories
1119
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1120

    
1121
  return []
1122

    
1123

    
1124
class LUNodeModifyStorage(NoHooksLU):
1125
  """Logical unit for modifying a storage volume on a node.
1126

1127
  """
1128
  REQ_BGL = False
1129

    
1130
  def CheckArguments(self):
1131
    (self.op.node_uuid, self.op.node_name) = \
1132
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1133

    
1134
    storage_type = self.op.storage_type
1135

    
1136
    try:
1137
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1138
    except KeyError:
1139
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1140
                                 " modified" % storage_type,
1141
                                 errors.ECODE_INVAL)
1142

    
1143
    diff = set(self.op.changes.keys()) - modifiable
1144
    if diff:
1145
      raise errors.OpPrereqError("The following fields can not be modified for"
1146
                                 " storage units of type '%s': %r" %
1147
                                 (storage_type, list(diff)),
1148
                                 errors.ECODE_INVAL)
1149

    
1150
  def CheckPrereq(self):
1151
    """Check prerequisites.
1152

1153
    """
1154
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1155

    
1156
  def ExpandNames(self):
1157
    self.needed_locks = {
1158
      locking.LEVEL_NODE: self.op.node_uuid,
1159
      }
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Computes the list of nodes and their attributes.
1163

1164
    """
1165
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1166
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1167
                                          self.op.storage_type, st_args,
1168
                                          self.op.name, self.op.changes)
1169
    result.Raise("Failed to modify storage unit '%s' on %s" %
1170
                 (self.op.name, self.op.node_name))
1171

    
1172

    
1173
class NodeQuery(QueryBase):
1174
  FIELDS = query.NODE_FIELDS
1175

    
1176
  def ExpandNames(self, lu):
1177
    raise NotImplementedError
1178

    
1179
  def DeclareLocks(self, lu, level):
1180
    pass
1181

    
1182
  def _GetQueryData(self, lu):
1183
    raise NotImplementedError
1184

    
1185

    
1186
class LUNodeQuery(NoHooksLU):
1187
  """Logical unit for querying nodes.
1188

1189
  """
1190
  # pylint: disable=W0142
1191
  REQ_BGL = False
1192

    
1193
  def CheckArguments(self):
1194
    raise NotImplementedError
1195

    
1196
  def ExpandNames(self):
1197
    raise NotImplementedError
1198

    
1199
  def DeclareLocks(self, level):
1200
    raise NotImplementedError
1201

    
1202
  def Exec(self, feedback_fn):
1203
    raise NotImplementedError
1204

    
1205

    
1206
def _CheckOutputFields(fields, selected):
1207
  """Checks whether all selected fields are valid according to fields.
1208

1209
  @type fields: L{utils.FieldSet}
1210
  @param fields: fields set
1211
  @type selected: L{utils.FieldSet}
1212
  @param selected: fields set
1213

1214
  """
1215
  delta = fields.NonMatching(selected)
1216
  if delta:
1217
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1218
                               % ",".join(delta), errors.ECODE_INVAL)
1219

    
1220

    
1221
class LUNodeQueryvols(NoHooksLU):
1222
  """Logical unit for getting volumes on node(s).
1223

1224
  """
1225
  REQ_BGL = False
1226

    
1227
  def CheckArguments(self):
1228
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1229
                                      constants.VF_VG, constants.VF_NAME,
1230
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1231
                       self.op.output_fields)
1232

    
1233
  def ExpandNames(self):
1234
    self.share_locks = ShareAll()
1235

    
1236
    if self.op.nodes:
1237
      self.needed_locks = {
1238
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1239
        }
1240
    else:
1241
      self.needed_locks = {
1242
        locking.LEVEL_NODE: locking.ALL_SET,
1243
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1244
        }
1245

    
1246
  def Exec(self, feedback_fn):
1247
    """Computes the list of nodes and their attributes.
1248

1249
    """
1250
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1251
    volumes = self.rpc.call_node_volumes(node_uuids)
1252

    
1253
    ilist = self.cfg.GetAllInstancesInfo()
1254
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1255

    
1256
    output = []
1257
    for node_uuid in node_uuids:
1258
      nresult = volumes[node_uuid]
1259
      if nresult.offline:
1260
        continue
1261
      msg = nresult.fail_msg
1262
      if msg:
1263
        self.LogWarning("Can't compute volume data on node %s: %s",
1264
                        self.cfg.GetNodeName(node_uuid), msg)
1265
        continue
1266

    
1267
      node_vols = sorted(nresult.payload,
1268
                         key=operator.itemgetter(constants.VF_DEV))
1269

    
1270
      for vol in node_vols:
1271
        node_output = []
1272
        for field in self.op.output_fields:
1273
          if field == constants.VF_NODE:
1274
            val = self.cfg.GetNodeName(node_uuid)
1275
          elif field == constants.VF_PHYS:
1276
            val = vol[constants.VF_DEV]
1277
          elif field == constants.VF_VG:
1278
            val = vol[constants.VF_VG]
1279
          elif field == constants.VF_NAME:
1280
            val = vol[constants.VF_NAME]
1281
          elif field == constants.VF_SIZE:
1282
            val = int(float(vol[constants.VF_SIZE]))
1283
          elif field == constants.VF_INSTANCE:
1284
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1285
                                 vol[constants.VF_NAME]), None)
1286
            if inst is not None:
1287
              val = inst.name
1288
            else:
1289
              val = "-"
1290
          else:
1291
            raise errors.ParameterError(field)
1292
          node_output.append(str(val))
1293

    
1294
        output.append(node_output)
1295

    
1296
    return output
1297

    
1298

    
1299
class LUNodeQueryStorage(NoHooksLU):
1300
  """Logical unit for getting information on storage units on node(s).
1301

1302
  """
1303
  REQ_BGL = False
1304

    
1305
  def CheckArguments(self):
1306
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1307
                       self.op.output_fields)
1308

    
1309
  def ExpandNames(self):
1310
    self.share_locks = ShareAll()
1311

    
1312
    if self.op.nodes:
1313
      self.needed_locks = {
1314
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1315
        }
1316
    else:
1317
      self.needed_locks = {
1318
        locking.LEVEL_NODE: locking.ALL_SET,
1319
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1320
        }
1321

    
1322
  def _DetermineStorageType(self):
1323
    """Determines the default storage type of the cluster.
1324

1325
    """
1326
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1327
    default_storage_type = \
1328
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1329
    return default_storage_type
1330

    
1331
  def CheckPrereq(self):
1332
    """Check prerequisites.
1333

1334
    """
1335
    if self.op.storage_type:
1336
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1337
      self.storage_type = self.op.storage_type
1338
    else:
1339
      self.storage_type = self._DetermineStorageType()
1340
      if self.storage_type not in constants.STS_REPORT:
1341
        raise errors.OpPrereqError(
1342
            "Storage reporting for storage type '%s' is not supported. Please"
1343
            " use the --storage-type option to specify one of the supported"
1344
            " storage types (%s) or set the default disk template to one that"
1345
            " supports storage reporting." %
1346
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1347

    
1348
  def Exec(self, feedback_fn):
1349
    """Computes the list of nodes and their attributes.
1350

1351
    """
1352
    if self.op.storage_type:
1353
      self.storage_type = self.op.storage_type
1354
    else:
1355
      self.storage_type = self._DetermineStorageType()
1356

    
1357
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1358

    
1359
    # Always get name to sort by
1360
    if constants.SF_NAME in self.op.output_fields:
1361
      fields = self.op.output_fields[:]
1362
    else:
1363
      fields = [constants.SF_NAME] + self.op.output_fields
1364

    
1365
    # Never ask for node or type as it's only known to the LU
1366
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1367
      while extra in fields:
1368
        fields.remove(extra)
1369

    
1370
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1371
    name_idx = field_idx[constants.SF_NAME]
1372

    
1373
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1374
    data = self.rpc.call_storage_list(self.node_uuids,
1375
                                      self.storage_type, st_args,
1376
                                      self.op.name, fields)
1377

    
1378
    result = []
1379

    
1380
    for node_uuid in utils.NiceSort(self.node_uuids):
1381
      node_name = self.cfg.GetNodeName(node_uuid)
1382
      nresult = data[node_uuid]
1383
      if nresult.offline:
1384
        continue
1385

    
1386
      msg = nresult.fail_msg
1387
      if msg:
1388
        self.LogWarning("Can't get storage data from node %s: %s",
1389
                        node_name, msg)
1390
        continue
1391

    
1392
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1393

    
1394
      for name in utils.NiceSort(rows.keys()):
1395
        row = rows[name]
1396

    
1397
        out = []
1398

    
1399
        for field in self.op.output_fields:
1400
          if field == constants.SF_NODE:
1401
            val = node_name
1402
          elif field == constants.SF_TYPE:
1403
            val = self.storage_type
1404
          elif field in field_idx:
1405
            val = row[field_idx[field]]
1406
          else:
1407
            raise errors.ParameterError(field)
1408

    
1409
          out.append(val)
1410

    
1411
        result.append(out)
1412

    
1413
    return result
1414

    
1415

    
1416
class LUNodeRemove(LogicalUnit):
1417
  """Logical unit for removing a node.
1418

1419
  """
1420
  HPATH = "node-remove"
1421
  HTYPE = constants.HTYPE_NODE
1422

    
1423
  def BuildHooksEnv(self):
1424
    """Build hooks env.
1425

1426
    """
1427
    return {
1428
      "OP_TARGET": self.op.node_name,
1429
      "NODE_NAME": self.op.node_name,
1430
      }
1431

    
1432
  def BuildHooksNodes(self):
1433
    """Build hooks nodes.
1434

1435
    This doesn't run on the target node in the pre phase as a failed
1436
    node would then be impossible to remove.
1437

1438
    """
1439
    all_nodes = self.cfg.GetNodeList()
1440
    try:
1441
      all_nodes.remove(self.op.node_uuid)
1442
    except ValueError:
1443
      pass
1444
    return (all_nodes, all_nodes)
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    This checks:
1450
     - the node exists in the configuration
1451
     - it does not have primary or secondary instances
1452
     - it's not the master
1453

1454
    Any errors are signaled by raising errors.OpPrereqError.
1455

1456
    """
1457
    (self.op.node_uuid, self.op.node_name) = \
1458
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1459
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1460
    assert node is not None
1461

    
1462
    masternode = self.cfg.GetMasterNode()
1463
    if node.uuid == masternode:
1464
      raise errors.OpPrereqError("Node is the master node, failover to another"
1465
                                 " node is required", errors.ECODE_INVAL)
1466

    
1467
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1468
      if node.uuid in instance.all_nodes:
1469
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1470
                                   " please remove first" % instance.name,
1471
                                   errors.ECODE_INVAL)
1472
    self.op.node_name = node.name
1473
    self.node = node
1474

    
1475
  def Exec(self, feedback_fn):
1476
    """Removes the node from the cluster.
1477

1478
    """
1479
    logging.info("Stopping the node daemon and removing configs from node %s",
1480
                 self.node.name)
1481

    
1482
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1483

    
1484
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1485
      "Not owning BGL"
1486

    
1487
    # Promote nodes to master candidate as needed
1488
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1489
    self.context.RemoveNode(self.node)
1490

    
1491
    # Run post hooks on the node before it's removed
1492
    RunPostHook(self, self.node.name)
1493

    
1494
    # we have to call this by name rather than by UUID, as the node is no longer
1495
    # in the config
1496
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1497
    msg = result.fail_msg
1498
    if msg:
1499
      self.LogWarning("Errors encountered on the remote node while leaving"
1500
                      " the cluster: %s", msg)
1501

    
1502
    # Remove node from our /etc/hosts
1503
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1504
      master_node_uuid = self.cfg.GetMasterNode()
1505
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1506
                                              constants.ETC_HOSTS_REMOVE,
1507
                                              self.node.name, None)
1508
      result.Raise("Can't update hosts file with new host data")
1509
      RedistributeAncillaryFiles(self)
1510

    
1511

    
1512
class LURepairNodeStorage(NoHooksLU):
1513
  """Repairs the volume group on a node.
1514

1515
  """
1516
  REQ_BGL = False
1517

    
1518
  def CheckArguments(self):
1519
    (self.op.node_uuid, self.op.node_name) = \
1520
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1521

    
1522
    storage_type = self.op.storage_type
1523

    
1524
    if (constants.SO_FIX_CONSISTENCY not in
1525
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1526
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1527
                                 " repaired" % storage_type,
1528
                                 errors.ECODE_INVAL)
1529

    
1530
  def ExpandNames(self):
1531
    self.needed_locks = {
1532
      locking.LEVEL_NODE: [self.op.node_uuid],
1533
      }
1534

    
1535
  def _CheckFaultyDisks(self, instance, node_uuid):
1536
    """Ensure faulty disks abort the opcode or at least warn."""
1537
    try:
1538
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1539
                                 node_uuid, True):
1540
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1541
                                   " node '%s'" %
1542
                                   (instance.name,
1543
                                    self.cfg.GetNodeName(node_uuid)),
1544
                                   errors.ECODE_STATE)
1545
    except errors.OpPrereqError, err:
1546
      if self.op.ignore_consistency:
1547
        self.LogWarning(str(err.args[0]))
1548
      else:
1549
        raise
1550

    
1551
  def CheckPrereq(self):
1552
    """Check prerequisites.
1553

1554
    """
1555
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1556

    
1557
    # Check whether any instance on this node has faulty disks
1558
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1559
      if not inst.disks_active:
1560
        continue
1561
      check_nodes = set(inst.all_nodes)
1562
      check_nodes.discard(self.op.node_uuid)
1563
      for inst_node_uuid in check_nodes:
1564
        self._CheckFaultyDisks(inst, inst_node_uuid)
1565

    
1566
  def Exec(self, feedback_fn):
1567
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1568
                (self.op.name, self.op.node_name))
1569

    
1570
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1571
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1572
                                           self.op.storage_type, st_args,
1573
                                           self.op.name,
1574
                                           constants.SO_FIX_CONSISTENCY)
1575
    result.Raise("Failed to repair storage unit '%s' on %s" %
1576
                 (self.op.name, self.op.node_name))