Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 602db636

History | View | Annotate | Download (56.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
46

    
47

    
48
def _DecideSelfPromotion(lu, exceptions=None):
49
  """Decide whether I should promote myself as a master candidate.
50

51
  """
52
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
53
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
54
  # the new node will increase mc_max with one, so:
55
  mc_should = min(mc_should + 1, cp_size)
56
  return mc_now < mc_should
57

    
58

    
59
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
60
  """Ensure that a node has the given secondary ip.
61

62
  @type lu: L{LogicalUnit}
63
  @param lu: the LU on behalf of which we make the check
64
  @type node: L{objects.Node}
65
  @param node: the node to check
66
  @type secondary_ip: string
67
  @param secondary_ip: the ip to check
68
  @type prereq: boolean
69
  @param prereq: whether to throw a prerequisite or an execute error
70
  @raise errors.OpPrereqError: if the node doesn't have the ip,
71
  and prereq=True
72
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
73

74
  """
75
  # this can be called with a new node, which has no UUID yet, so perform the
76
  # RPC call using its name
77
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
78
  result.Raise("Failure checking secondary ip on node %s" % node.name,
79
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
80
  if not result.payload:
81
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82
           " please fix and re-run this command" % secondary_ip)
83
    if prereq:
84
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85
    else:
86
      raise errors.OpExecError(msg)
87

    
88

    
89
class LUNodeAdd(LogicalUnit):
90
  """Logical unit for adding node to the cluster.
91

92
  """
93
  HPATH = "node-add"
94
  HTYPE = constants.HTYPE_NODE
95
  _NFLAGS = ["master_capable", "vm_capable"]
96

    
97
  def CheckArguments(self):
98
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99
    # validate/normalize the node name
100
    self.hostname = netutils.GetHostname(name=self.op.node_name,
101
                                         family=self.primary_ip_family)
102
    self.op.node_name = self.hostname.name
103

    
104
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
105
      raise errors.OpPrereqError("Cannot readd the master node",
106
                                 errors.ECODE_STATE)
107

    
108
    if self.op.readd and self.op.group:
109
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
110
                                 " being readded", errors.ECODE_INVAL)
111

    
112
  def BuildHooksEnv(self):
113
    """Build hooks env.
114

115
    This will run on all nodes before, and on all nodes + the new node after.
116

117
    """
118
    return {
119
      "OP_TARGET": self.op.node_name,
120
      "NODE_NAME": self.op.node_name,
121
      "NODE_PIP": self.op.primary_ip,
122
      "NODE_SIP": self.op.secondary_ip,
123
      "MASTER_CAPABLE": str(self.op.master_capable),
124
      "VM_CAPABLE": str(self.op.vm_capable),
125
      }
126

    
127
  def BuildHooksNodes(self):
128
    """Build hooks nodes.
129

130
    """
131
    hook_nodes = self.cfg.GetNodeList()
132
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
133
    if new_node_info is not None:
134
      # Exclude added node
135
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
136

    
137
    # add the new node as post hook node by name; it does not have an UUID yet
138
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks:
144
     - the new node is not already in the config
145
     - it is resolvable
146
     - its parameters (single/dual homed) matches the cluster
147

148
    Any errors are signaled by raising errors.OpPrereqError.
149

150
    """
151
    node_name = self.hostname.name
152
    self.op.primary_ip = self.hostname.ip
153
    if self.op.secondary_ip is None:
154
      if self.primary_ip_family == netutils.IP6Address.family:
155
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
156
                                   " IPv4 address must be given as secondary",
157
                                   errors.ECODE_INVAL)
158
      self.op.secondary_ip = self.op.primary_ip
159

    
160
    secondary_ip = self.op.secondary_ip
161
    if not netutils.IP4Address.IsValid(secondary_ip):
162
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
163
                                 " address" % secondary_ip, errors.ECODE_INVAL)
164

    
165
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
166
    if not self.op.readd and existing_node_info is not None:
167
      raise errors.OpPrereqError("Node %s is already in the configuration" %
168
                                 node_name, errors.ECODE_EXISTS)
169
    elif self.op.readd and existing_node_info is None:
170
      raise errors.OpPrereqError("Node %s is not in the configuration" %
171
                                 node_name, errors.ECODE_NOENT)
172

    
173
    self.changed_primary_ip = False
174

    
175
    for existing_node in self.cfg.GetAllNodesInfo().values():
176
      if self.op.readd and node_name == existing_node.name:
177
        if existing_node.secondary_ip != secondary_ip:
178
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
179
                                     " address configuration as before",
180
                                     errors.ECODE_INVAL)
181
        if existing_node.primary_ip != self.op.primary_ip:
182
          self.changed_primary_ip = True
183

    
184
        continue
185

    
186
      if (existing_node.primary_ip == self.op.primary_ip or
187
          existing_node.secondary_ip == self.op.primary_ip or
188
          existing_node.primary_ip == secondary_ip or
189
          existing_node.secondary_ip == secondary_ip):
190
        raise errors.OpPrereqError("New node ip address(es) conflict with"
191
                                   " existing node %s" % existing_node.name,
192
                                   errors.ECODE_NOTUNIQUE)
193

    
194
    # After this 'if' block, None is no longer a valid value for the
195
    # _capable op attributes
196
    if self.op.readd:
197
      assert existing_node_info is not None, \
198
        "Can't retrieve locked node %s" % node_name
199
      for attr in self._NFLAGS:
200
        if getattr(self.op, attr) is None:
201
          setattr(self.op, attr, getattr(existing_node_info, attr))
202
    else:
203
      for attr in self._NFLAGS:
204
        if getattr(self.op, attr) is None:
205
          setattr(self.op, attr, True)
206

    
207
    if self.op.readd and not self.op.vm_capable:
208
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
209
      if pri or sec:
210
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
211
                                   " flag set to false, but it already holds"
212
                                   " instances" % node_name,
213
                                   errors.ECODE_STATE)
214

    
215
    # check that the type of the node (single versus dual homed) is the
216
    # same as for the master
217
    myself = self.cfg.GetMasterNodeInfo()
218
    master_singlehomed = myself.secondary_ip == myself.primary_ip
219
    newbie_singlehomed = secondary_ip == self.op.primary_ip
220
    if master_singlehomed != newbie_singlehomed:
221
      if master_singlehomed:
222
        raise errors.OpPrereqError("The master has no secondary ip but the"
223
                                   " new node has one",
224
                                   errors.ECODE_INVAL)
225
      else:
226
        raise errors.OpPrereqError("The master has a secondary ip but the"
227
                                   " new node doesn't have one",
228
                                   errors.ECODE_INVAL)
229

    
230
    # checks reachability
231
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
232
      raise errors.OpPrereqError("Node not reachable by ping",
233
                                 errors.ECODE_ENVIRON)
234

    
235
    if not newbie_singlehomed:
236
      # check reachability from my secondary ip to newbie's secondary ip
237
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
238
                              source=myself.secondary_ip):
239
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
240
                                   " based ping to node daemon port",
241
                                   errors.ECODE_ENVIRON)
242

    
243
    if self.op.readd:
244
      exceptions = [existing_node_info.uuid]
245
    else:
246
      exceptions = []
247

    
248
    if self.op.master_capable:
249
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
250
    else:
251
      self.master_candidate = False
252

    
253
    node_group = self.cfg.LookupNodeGroup(self.op.group)
254

    
255
    if self.op.readd:
256
      self.new_node = existing_node_info
257
    else:
258
      self.new_node = objects.Node(name=node_name,
259
                                   primary_ip=self.op.primary_ip,
260
                                   secondary_ip=secondary_ip,
261
                                   master_candidate=self.master_candidate,
262
                                   offline=False, drained=False,
263
                                   group=node_group, ndparams={})
264

    
265
    if self.op.ndparams:
266
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
267
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
268
                           "node", "cluster or group")
269

    
270
    if self.op.hv_state:
271
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
272

    
273
    if self.op.disk_state:
274
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
275

    
276
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
277
    #       it a property on the base class.
278
    rpcrunner = rpc.DnsOnlyRunner()
279
    result = rpcrunner.call_version([node_name])[node_name]
280
    result.Raise("Can't get version information from node %s" % node_name)
281
    if constants.PROTOCOL_VERSION == result.payload:
282
      logging.info("Communication to node %s fine, sw version %s match",
283
                   node_name, result.payload)
284
    else:
285
      raise errors.OpPrereqError("Version mismatch master version %s,"
286
                                 " node version %s" %
287
                                 (constants.PROTOCOL_VERSION, result.payload),
288
                                 errors.ECODE_ENVIRON)
289

    
290
    vg_name = self.cfg.GetVGName()
291
    if vg_name is not None:
292
      vparams = {constants.NV_PVLIST: [vg_name]}
293
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
294
      cname = self.cfg.GetClusterName()
295
      result = rpcrunner.call_node_verify_light(
296
          [node_name], vparams, cname,
297
          self.cfg.GetClusterInfo().hvparams,
298
          {node_name: node_group},
299
          self.cfg.GetAllNodeGroupsInfoDict()
300
        )[node_name]
301
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302
      if errmsgs:
303
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
304
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
305

    
306
  def _InitOpenVSwitch(self):
307
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
308
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
309

    
310
    ovs = filled_ndparams.get(constants.ND_OVS, None)
311
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
312
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
313

    
314
    if ovs:
315
      if not ovs_link:
316
        self.LogInfo("No physical interface for OpenvSwitch was given."
317
                     " OpenvSwitch will not have an outside connection. This"
318
                     " might not be what you want.")
319

    
320
      result = self.rpc.call_node_configure_ovs(
321
                 self.new_node.name, ovs_name, ovs_link)
322
      result.Raise("Failed to initialize OpenVSwitch on new node")
323

    
324
  def Exec(self, feedback_fn):
325
    """Adds the new node to the cluster.
326

327
    """
328
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
329
      "Not owning BGL"
330

    
331
    # We adding a new node so we assume it's powered
332
    self.new_node.powered = True
333

    
334
    # for re-adds, reset the offline/drained/master-candidate flags;
335
    # we need to reset here, otherwise offline would prevent RPC calls
336
    # later in the procedure; this also means that if the re-add
337
    # fails, we are left with a non-offlined, broken node
338
    if self.op.readd:
339
      self.new_node.offline = False
340
      self.new_node.drained = False
341
      self.LogInfo("Readding a node, the offline/drained flags were reset")
342
      # if we demote the node, we do cleanup later in the procedure
343
      self.new_node.master_candidate = self.master_candidate
344
      if self.changed_primary_ip:
345
        self.new_node.primary_ip = self.op.primary_ip
346

    
347
    # copy the master/vm_capable flags
348
    for attr in self._NFLAGS:
349
      setattr(self.new_node, attr, getattr(self.op, attr))
350

    
351
    # notify the user about any possible mc promotion
352
    if self.new_node.master_candidate:
353
      self.LogInfo("Node will be a master candidate")
354

    
355
    if self.op.ndparams:
356
      self.new_node.ndparams = self.op.ndparams
357
    else:
358
      self.new_node.ndparams = {}
359

    
360
    if self.op.hv_state:
361
      self.new_node.hv_state_static = self.new_hv_state
362

    
363
    if self.op.disk_state:
364
      self.new_node.disk_state_static = self.new_disk_state
365

    
366
    # Add node to our /etc/hosts, and add key to known_hosts
367
    if self.cfg.GetClusterInfo().modify_etc_hosts:
368
      master_node = self.cfg.GetMasterNode()
369
      result = self.rpc.call_etc_hosts_modify(
370
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
371
                 self.hostname.ip)
372
      result.Raise("Can't update hosts file with new host data")
373

    
374
    if self.new_node.secondary_ip != self.new_node.primary_ip:
375
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
376
                               False)
377

    
378
    node_verifier_uuids = [self.cfg.GetMasterNode()]
379
    node_verify_param = {
380
      constants.NV_NODELIST: ([self.new_node.name], {}),
381
      # TODO: do a node-net-test as well?
382
    }
383

    
384
    result = self.rpc.call_node_verify(
385
               node_verifier_uuids, node_verify_param,
386
               self.cfg.GetClusterName(),
387
               self.cfg.GetClusterInfo().hvparams,
388
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
389
               self.cfg.GetAllNodeGroupsInfoDict()
390
               )
391
    for verifier in node_verifier_uuids:
392
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
393
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
394
      if nl_payload:
395
        for failed in nl_payload:
396
          feedback_fn("ssh/hostname verification failed"
397
                      " (checking from %s): %s" %
398
                      (verifier, nl_payload[failed]))
399
        raise errors.OpExecError("ssh/hostname verification failed")
400

    
401
    self._InitOpenVSwitch()
402

    
403
    if self.op.readd:
404
      self.context.ReaddNode(self.new_node)
405
      RedistributeAncillaryFiles(self)
406
      # make sure we redistribute the config
407
      self.cfg.Update(self.new_node, feedback_fn)
408
      # and make sure the new node will not have old files around
409
      if not self.new_node.master_candidate:
410
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
411
        result.Warn("Node failed to demote itself from master candidate status",
412
                    self.LogWarning)
413
    else:
414
      self.context.AddNode(self.new_node, self.proc.GetECId())
415
      RedistributeAncillaryFiles(self)
416

    
417

    
418
class LUNodeSetParams(LogicalUnit):
419
  """Modifies the parameters of a node.
420

421
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
422
      to the node role (as _ROLE_*)
423
  @cvar _R2F: a dictionary from node role to tuples of flags
424
  @cvar _FLAGS: a list of attribute names corresponding to the flags
425

426
  """
427
  HPATH = "node-modify"
428
  HTYPE = constants.HTYPE_NODE
429
  REQ_BGL = False
430
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
431
  _F2R = {
432
    (True, False, False): _ROLE_CANDIDATE,
433
    (False, True, False): _ROLE_DRAINED,
434
    (False, False, True): _ROLE_OFFLINE,
435
    (False, False, False): _ROLE_REGULAR,
436
    }
437
  _R2F = dict((v, k) for k, v in _F2R.items())
438
  _FLAGS = ["master_candidate", "drained", "offline"]
439

    
440
  def CheckArguments(self):
441
    (self.op.node_uuid, self.op.node_name) = \
442
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
443
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
444
                self.op.master_capable, self.op.vm_capable,
445
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
446
                self.op.disk_state]
447
    if all_mods.count(None) == len(all_mods):
448
      raise errors.OpPrereqError("Please pass at least one modification",
449
                                 errors.ECODE_INVAL)
450
    if all_mods.count(True) > 1:
451
      raise errors.OpPrereqError("Can't set the node into more than one"
452
                                 " state at the same time",
453
                                 errors.ECODE_INVAL)
454

    
455
    # Boolean value that tells us whether we might be demoting from MC
456
    self.might_demote = (self.op.master_candidate is False or
457
                         self.op.offline is True or
458
                         self.op.drained is True or
459
                         self.op.master_capable is False)
460

    
461
    if self.op.secondary_ip:
462
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
463
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
464
                                   " address" % self.op.secondary_ip,
465
                                   errors.ECODE_INVAL)
466

    
467
    self.lock_all = self.op.auto_promote and self.might_demote
468
    self.lock_instances = self.op.secondary_ip is not None
469

    
470
  def _InstanceFilter(self, instance):
471
    """Filter for getting affected instances.
472

473
    """
474
    return (instance.disk_template in constants.DTS_INT_MIRROR and
475
            self.op.node_uuid in instance.all_nodes)
476

    
477
  def ExpandNames(self):
478
    if self.lock_all:
479
      self.needed_locks = {
480
        locking.LEVEL_NODE: locking.ALL_SET,
481

    
482
        # Block allocations when all nodes are locked
483
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
484
        }
485
    else:
486
      self.needed_locks = {
487
        locking.LEVEL_NODE: self.op.node_uuid,
488
        }
489

    
490
    # Since modifying a node can have severe effects on currently running
491
    # operations the resource lock is at least acquired in shared mode
492
    self.needed_locks[locking.LEVEL_NODE_RES] = \
493
      self.needed_locks[locking.LEVEL_NODE]
494

    
495
    # Get all locks except nodes in shared mode; they are not used for anything
496
    # but read-only access
497
    self.share_locks = ShareAll()
498
    self.share_locks[locking.LEVEL_NODE] = 0
499
    self.share_locks[locking.LEVEL_NODE_RES] = 0
500
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
501

    
502
    if self.lock_instances:
503
      self.needed_locks[locking.LEVEL_INSTANCE] = \
504
        self.cfg.GetInstanceNames(
505
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
506

    
507
  def BuildHooksEnv(self):
508
    """Build hooks env.
509

510
    This runs on the master node.
511

512
    """
513
    return {
514
      "OP_TARGET": self.op.node_name,
515
      "MASTER_CANDIDATE": str(self.op.master_candidate),
516
      "OFFLINE": str(self.op.offline),
517
      "DRAINED": str(self.op.drained),
518
      "MASTER_CAPABLE": str(self.op.master_capable),
519
      "VM_CAPABLE": str(self.op.vm_capable),
520
      }
521

    
522
  def BuildHooksNodes(self):
523
    """Build hooks nodes.
524

525
    """
526
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
527
    return (nl, nl)
528

    
529
  def CheckPrereq(self):
530
    """Check prerequisites.
531

532
    This only checks the instance list against the existing names.
533

534
    """
535
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
536
    if self.lock_instances:
537
      affected_instances = \
538
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
539

    
540
      # Verify instance locks
541
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
542
      wanted_instance_names = frozenset([inst.name for inst in
543
                                         affected_instances.values()])
544
      if wanted_instance_names - owned_instance_names:
545
        raise errors.OpPrereqError("Instances affected by changing node %s's"
546
                                   " secondary IP address have changed since"
547
                                   " locks were acquired, wanted '%s', have"
548
                                   " '%s'; retry the operation" %
549
                                   (node.name,
550
                                    utils.CommaJoin(wanted_instance_names),
551
                                    utils.CommaJoin(owned_instance_names)),
552
                                   errors.ECODE_STATE)
553
    else:
554
      affected_instances = None
555

    
556
    if (self.op.master_candidate is not None or
557
        self.op.drained is not None or
558
        self.op.offline is not None):
559
      # we can't change the master's node flags
560
      if node.uuid == self.cfg.GetMasterNode():
561
        raise errors.OpPrereqError("The master role can be changed"
562
                                   " only via master-failover",
563
                                   errors.ECODE_INVAL)
564

    
565
    if self.op.master_candidate and not node.master_capable:
566
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
567
                                 " it a master candidate" % node.name,
568
                                 errors.ECODE_STATE)
569

    
570
    if self.op.vm_capable is False:
571
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
572
      if ipri or isec:
573
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
574
                                   " the vm_capable flag" % node.name,
575
                                   errors.ECODE_STATE)
576

    
577
    if node.master_candidate and self.might_demote and not self.lock_all:
578
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
579
      # check if after removing the current node, we're missing master
580
      # candidates
581
      (mc_remaining, mc_should, _) = \
582
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
583
      if mc_remaining < mc_should:
584
        raise errors.OpPrereqError("Not enough master candidates, please"
585
                                   " pass auto promote option to allow"
586
                                   " promotion (--auto-promote or RAPI"
587
                                   " auto_promote=True)", errors.ECODE_STATE)
588

    
589
    self.old_flags = old_flags = (node.master_candidate,
590
                                  node.drained, node.offline)
591
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
592
    self.old_role = old_role = self._F2R[old_flags]
593

    
594
    # Check for ineffective changes
595
    for attr in self._FLAGS:
596
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
597
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
598
        setattr(self.op, attr, None)
599

    
600
    # Past this point, any flag change to False means a transition
601
    # away from the respective state, as only real changes are kept
602

    
603
    # TODO: We might query the real power state if it supports OOB
604
    if SupportsOob(self.cfg, node):
605
      if self.op.offline is False and not (node.powered or
606
                                           self.op.powered is True):
607
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
608
                                    " offline status can be reset") %
609
                                   self.op.node_name, errors.ECODE_STATE)
610
    elif self.op.powered is not None:
611
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
612
                                  " as it does not support out-of-band"
613
                                  " handling") % self.op.node_name,
614
                                 errors.ECODE_STATE)
615

    
616
    # If we're being deofflined/drained, we'll MC ourself if needed
617
    if (self.op.drained is False or self.op.offline is False or
618
        (self.op.master_capable and not node.master_capable)):
619
      if _DecideSelfPromotion(self):
620
        self.op.master_candidate = True
621
        self.LogInfo("Auto-promoting node to master candidate")
622

    
623
    # If we're no longer master capable, we'll demote ourselves from MC
624
    if self.op.master_capable is False and node.master_candidate:
625
      self.LogInfo("Demoting from master candidate")
626
      self.op.master_candidate = False
627

    
628
    # Compute new role
629
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
630
    if self.op.master_candidate:
631
      new_role = self._ROLE_CANDIDATE
632
    elif self.op.drained:
633
      new_role = self._ROLE_DRAINED
634
    elif self.op.offline:
635
      new_role = self._ROLE_OFFLINE
636
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
637
      # False is still in new flags, which means we're un-setting (the
638
      # only) True flag
639
      new_role = self._ROLE_REGULAR
640
    else: # no new flags, nothing, keep old role
641
      new_role = old_role
642

    
643
    self.new_role = new_role
644

    
645
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
646
      # Trying to transition out of offline status
647
      result = self.rpc.call_version([node.uuid])[node.uuid]
648
      if result.fail_msg:
649
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
650
                                   " to report its version: %s" %
651
                                   (node.name, result.fail_msg),
652
                                   errors.ECODE_STATE)
653
      else:
654
        self.LogWarning("Transitioning node from offline to online state"
655
                        " without using re-add. Please make sure the node"
656
                        " is healthy!")
657

    
658
    # When changing the secondary ip, verify if this is a single-homed to
659
    # multi-homed transition or vice versa, and apply the relevant
660
    # restrictions.
661
    if self.op.secondary_ip:
662
      # Ok even without locking, because this can't be changed by any LU
663
      master = self.cfg.GetMasterNodeInfo()
664
      master_singlehomed = master.secondary_ip == master.primary_ip
665
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
666
        if self.op.force and node.uuid == master.uuid:
667
          self.LogWarning("Transitioning from single-homed to multi-homed"
668
                          " cluster; all nodes will require a secondary IP"
669
                          " address")
670
        else:
671
          raise errors.OpPrereqError("Changing the secondary ip on a"
672
                                     " single-homed cluster requires the"
673
                                     " --force option to be passed, and the"
674
                                     " target node to be the master",
675
                                     errors.ECODE_INVAL)
676
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
677
        if self.op.force and node.uuid == master.uuid:
678
          self.LogWarning("Transitioning from multi-homed to single-homed"
679
                          " cluster; secondary IP addresses will have to be"
680
                          " removed")
681
        else:
682
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
683
                                     " same as the primary IP on a multi-homed"
684
                                     " cluster, unless the --force option is"
685
                                     " passed, and the target node is the"
686
                                     " master", errors.ECODE_INVAL)
687

    
688
      assert not (set([inst.name for inst in affected_instances.values()]) -
689
                  self.owned_locks(locking.LEVEL_INSTANCE))
690

    
691
      if node.offline:
692
        if affected_instances:
693
          msg = ("Cannot change secondary IP address: offline node has"
694
                 " instances (%s) configured to use it" %
695
                 utils.CommaJoin(
696
                   [inst.name for inst in affected_instances.values()]))
697
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
698
      else:
699
        # On online nodes, check that no instances are running, and that
700
        # the node has the new ip and we can reach it.
701
        for instance in affected_instances.values():
702
          CheckInstanceState(self, instance, INSTANCE_DOWN,
703
                             msg="cannot change secondary ip")
704

    
705
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
706
        if master.uuid != node.uuid:
707
          # check reachability from master secondary ip to new secondary ip
708
          if not netutils.TcpPing(self.op.secondary_ip,
709
                                  constants.DEFAULT_NODED_PORT,
710
                                  source=master.secondary_ip):
711
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
712
                                       " based ping to node daemon port",
713
                                       errors.ECODE_ENVIRON)
714

    
715
    if self.op.ndparams:
716
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
717
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
718
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
719
                           "node", "cluster or group")
720
      self.new_ndparams = new_ndparams
721

    
722
    if self.op.hv_state:
723
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
724
                                                node.hv_state_static)
725

    
726
    if self.op.disk_state:
727
      self.new_disk_state = \
728
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
729

    
730
  def Exec(self, feedback_fn):
731
    """Modifies a node.
732

733
    """
734
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
735
    result = []
736

    
737
    if self.op.ndparams:
738
      node.ndparams = self.new_ndparams
739

    
740
    if self.op.powered is not None:
741
      node.powered = self.op.powered
742

    
743
    if self.op.hv_state:
744
      node.hv_state_static = self.new_hv_state
745

    
746
    if self.op.disk_state:
747
      node.disk_state_static = self.new_disk_state
748

    
749
    for attr in ["master_capable", "vm_capable"]:
750
      val = getattr(self.op, attr)
751
      if val is not None:
752
        setattr(node, attr, val)
753
        result.append((attr, str(val)))
754

    
755
    if self.new_role != self.old_role:
756
      # Tell the node to demote itself, if no longer MC and not offline
757
      if self.old_role == self._ROLE_CANDIDATE and \
758
          self.new_role != self._ROLE_OFFLINE:
759
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
760
        if msg:
761
          self.LogWarning("Node failed to demote itself: %s", msg)
762

    
763
      new_flags = self._R2F[self.new_role]
764
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
765
        if of != nf:
766
          result.append((desc, str(nf)))
767
      (node.master_candidate, node.drained, node.offline) = new_flags
768

    
769
      # we locked all nodes, we adjust the CP before updating this node
770
      if self.lock_all:
771
        AdjustCandidatePool(self, [node.uuid])
772

    
773
    if self.op.secondary_ip:
774
      node.secondary_ip = self.op.secondary_ip
775
      result.append(("secondary_ip", self.op.secondary_ip))
776

    
777
    # this will trigger configuration file update, if needed
778
    self.cfg.Update(node, feedback_fn)
779

    
780
    # this will trigger job queue propagation or cleanup if the mc
781
    # flag changed
782
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
783
      self.context.ReaddNode(node)
784

    
785
    return result
786

    
787

    
788
class LUNodePowercycle(NoHooksLU):
789
  """Powercycles a node.
790

791
  """
792
  REQ_BGL = False
793

    
794
  def CheckArguments(self):
795
    (self.op.node_uuid, self.op.node_name) = \
796
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
797

    
798
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
799
      raise errors.OpPrereqError("The node is the master and the force"
800
                                 " parameter was not set",
801
                                 errors.ECODE_INVAL)
802

    
803
  def ExpandNames(self):
804
    """Locking for PowercycleNode.
805

806
    This is a last-resort option and shouldn't block on other
807
    jobs. Therefore, we grab no locks.
808

809
    """
810
    self.needed_locks = {}
811

    
812
  def Exec(self, feedback_fn):
813
    """Reboots a node.
814

815
    """
816
    default_hypervisor = self.cfg.GetHypervisorType()
817
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
818
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
819
                                           default_hypervisor,
820
                                           hvparams)
821
    result.Raise("Failed to schedule the reboot")
822
    return result.payload
823

    
824

    
825
def _GetNodeInstancesInner(cfg, fn):
826
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
827

    
828

    
829
def _GetNodePrimaryInstances(cfg, node_uuid):
830
  """Returns primary instances on a node.
831

832
  """
833
  return _GetNodeInstancesInner(cfg,
834
                                lambda inst: node_uuid == inst.primary_node)
835

    
836

    
837
def _GetNodeSecondaryInstances(cfg, node_uuid):
838
  """Returns secondary instances on a node.
839

840
  """
841
  return _GetNodeInstancesInner(cfg,
842
                                lambda inst: node_uuid in inst.secondary_nodes)
843

    
844

    
845
def _GetNodeInstances(cfg, node_uuid):
846
  """Returns a list of all primary and secondary instances on a node.
847

848
  """
849

    
850
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
851

    
852

    
853
class LUNodeEvacuate(NoHooksLU):
854
  """Evacuates instances off a list of nodes.
855

856
  """
857
  REQ_BGL = False
858

    
859
  def CheckArguments(self):
860
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
861

    
862
  def ExpandNames(self):
863
    (self.op.node_uuid, self.op.node_name) = \
864
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
865

    
866
    if self.op.remote_node is not None:
867
      (self.op.remote_node_uuid, self.op.remote_node) = \
868
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
869
                              self.op.remote_node)
870
      assert self.op.remote_node
871

    
872
      if self.op.node_uuid == self.op.remote_node_uuid:
873
        raise errors.OpPrereqError("Can not use evacuated node as a new"
874
                                   " secondary node", errors.ECODE_INVAL)
875

    
876
      if self.op.mode != constants.NODE_EVAC_SEC:
877
        raise errors.OpPrereqError("Without the use of an iallocator only"
878
                                   " secondary instances can be evacuated",
879
                                   errors.ECODE_INVAL)
880

    
881
    # Declare locks
882
    self.share_locks = ShareAll()
883
    self.needed_locks = {
884
      locking.LEVEL_INSTANCE: [],
885
      locking.LEVEL_NODEGROUP: [],
886
      locking.LEVEL_NODE: [],
887
      }
888

    
889
    # Determine nodes (via group) optimistically, needs verification once locks
890
    # have been acquired
891
    self.lock_nodes = self._DetermineNodes()
892

    
893
  def _DetermineNodes(self):
894
    """Gets the list of node UUIDs to operate on.
895

896
    """
897
    if self.op.remote_node is None:
898
      # Iallocator will choose any node(s) in the same group
899
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
900
    else:
901
      group_nodes = frozenset([self.op.remote_node_uuid])
902

    
903
    # Determine nodes to be locked
904
    return set([self.op.node_uuid]) | group_nodes
905

    
906
  def _DetermineInstances(self):
907
    """Builds list of instances to operate on.
908

909
    """
910
    assert self.op.mode in constants.NODE_EVAC_MODES
911

    
912
    if self.op.mode == constants.NODE_EVAC_PRI:
913
      # Primary instances only
914
      inst_fn = _GetNodePrimaryInstances
915
      assert self.op.remote_node is None, \
916
        "Evacuating primary instances requires iallocator"
917
    elif self.op.mode == constants.NODE_EVAC_SEC:
918
      # Secondary instances only
919
      inst_fn = _GetNodeSecondaryInstances
920
    else:
921
      # All instances
922
      assert self.op.mode == constants.NODE_EVAC_ALL
923
      inst_fn = _GetNodeInstances
924
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
925
      # per instance
926
      raise errors.OpPrereqError("Due to an issue with the iallocator"
927
                                 " interface it is not possible to evacuate"
928
                                 " all instances at once; specify explicitly"
929
                                 " whether to evacuate primary or secondary"
930
                                 " instances",
931
                                 errors.ECODE_INVAL)
932

    
933
    return inst_fn(self.cfg, self.op.node_uuid)
934

    
935
  def DeclareLocks(self, level):
936
    if level == locking.LEVEL_INSTANCE:
937
      # Lock instances optimistically, needs verification once node and group
938
      # locks have been acquired
939
      self.needed_locks[locking.LEVEL_INSTANCE] = \
940
        set(i.name for i in self._DetermineInstances())
941

    
942
    elif level == locking.LEVEL_NODEGROUP:
943
      # Lock node groups for all potential target nodes optimistically, needs
944
      # verification once nodes have been acquired
945
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
946
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
947

    
948
    elif level == locking.LEVEL_NODE:
949
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
950

    
951
  def CheckPrereq(self):
952
    # Verify locks
953
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
954
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
955
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
956

    
957
    need_nodes = self._DetermineNodes()
958

    
959
    if not owned_nodes.issuperset(need_nodes):
960
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
961
                                 " locks were acquired, current nodes are"
962
                                 " are '%s', used to be '%s'; retry the"
963
                                 " operation" %
964
                                 (self.op.node_name,
965
                                  utils.CommaJoin(need_nodes),
966
                                  utils.CommaJoin(owned_nodes)),
967
                                 errors.ECODE_STATE)
968

    
969
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
970
    if owned_groups != wanted_groups:
971
      raise errors.OpExecError("Node groups changed since locks were acquired,"
972
                               " current groups are '%s', used to be '%s';"
973
                               " retry the operation" %
974
                               (utils.CommaJoin(wanted_groups),
975
                                utils.CommaJoin(owned_groups)))
976

    
977
    # Determine affected instances
978
    self.instances = self._DetermineInstances()
979
    self.instance_names = [i.name for i in self.instances]
980

    
981
    if set(self.instance_names) != owned_instance_names:
982
      raise errors.OpExecError("Instances on node '%s' changed since locks"
983
                               " were acquired, current instances are '%s',"
984
                               " used to be '%s'; retry the operation" %
985
                               (self.op.node_name,
986
                                utils.CommaJoin(self.instance_names),
987
                                utils.CommaJoin(owned_instance_names)))
988

    
989
    if self.instance_names:
990
      self.LogInfo("Evacuating instances from node '%s': %s",
991
                   self.op.node_name,
992
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
993
    else:
994
      self.LogInfo("No instances to evacuate from node '%s'",
995
                   self.op.node_name)
996

    
997
    if self.op.remote_node is not None:
998
      for i in self.instances:
999
        if i.primary_node == self.op.remote_node_uuid:
1000
          raise errors.OpPrereqError("Node %s is the primary node of"
1001
                                     " instance %s, cannot use it as"
1002
                                     " secondary" %
1003
                                     (self.op.remote_node, i.name),
1004
                                     errors.ECODE_INVAL)
1005

    
1006
  def Exec(self, feedback_fn):
1007
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1008

    
1009
    if not self.instance_names:
1010
      # No instances to evacuate
1011
      jobs = []
1012

    
1013
    elif self.op.iallocator is not None:
1014
      # TODO: Implement relocation to other group
1015
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1016
                                     instances=list(self.instance_names))
1017
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1018

    
1019
      ial.Run(self.op.iallocator)
1020

    
1021
      if not ial.success:
1022
        raise errors.OpPrereqError("Can't compute node evacuation using"
1023
                                   " iallocator '%s': %s" %
1024
                                   (self.op.iallocator, ial.info),
1025
                                   errors.ECODE_NORES)
1026

    
1027
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1028

    
1029
    elif self.op.remote_node is not None:
1030
      assert self.op.mode == constants.NODE_EVAC_SEC
1031
      jobs = [
1032
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1033
                                        remote_node=self.op.remote_node,
1034
                                        disks=[],
1035
                                        mode=constants.REPLACE_DISK_CHG,
1036
                                        early_release=self.op.early_release)]
1037
        for instance_name in self.instance_names]
1038

    
1039
    else:
1040
      raise errors.ProgrammerError("No iallocator or remote node")
1041

    
1042
    return ResultWithJobs(jobs)
1043

    
1044

    
1045
class LUNodeMigrate(LogicalUnit):
1046
  """Migrate all instances from a node.
1047

1048
  """
1049
  HPATH = "node-migrate"
1050
  HTYPE = constants.HTYPE_NODE
1051
  REQ_BGL = False
1052

    
1053
  def CheckArguments(self):
1054
    pass
1055

    
1056
  def ExpandNames(self):
1057
    (self.op.node_uuid, self.op.node_name) = \
1058
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1059

    
1060
    self.share_locks = ShareAll()
1061
    self.needed_locks = {
1062
      locking.LEVEL_NODE: [self.op.node_uuid],
1063
      }
1064

    
1065
  def BuildHooksEnv(self):
1066
    """Build hooks env.
1067

1068
    This runs on the master, the primary and all the secondaries.
1069

1070
    """
1071
    return {
1072
      "NODE_NAME": self.op.node_name,
1073
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1074
      }
1075

    
1076
  def BuildHooksNodes(self):
1077
    """Build hooks nodes.
1078

1079
    """
1080
    nl = [self.cfg.GetMasterNode()]
1081
    return (nl, nl)
1082

    
1083
  def CheckPrereq(self):
1084
    pass
1085

    
1086
  def Exec(self, feedback_fn):
1087
    # Prepare jobs for migration instances
1088
    jobs = [
1089
      [opcodes.OpInstanceMigrate(
1090
        instance_name=inst.name,
1091
        mode=self.op.mode,
1092
        live=self.op.live,
1093
        iallocator=self.op.iallocator,
1094
        target_node=self.op.target_node,
1095
        allow_runtime_changes=self.op.allow_runtime_changes,
1096
        ignore_ipolicy=self.op.ignore_ipolicy)]
1097
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1098

    
1099
    # TODO: Run iallocator in this opcode and pass correct placement options to
1100
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1101
    # running the iallocator and the actual migration, a good consistency model
1102
    # will have to be found.
1103

    
1104
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1105
            frozenset([self.op.node_uuid]))
1106

    
1107
    return ResultWithJobs(jobs)
1108

    
1109

    
1110
def _GetStorageTypeArgs(cfg, storage_type):
1111
  """Returns the arguments for a storage type.
1112

1113
  """
1114
  # Special case for file storage
1115
  if storage_type == constants.ST_FILE:
1116
    # storage.FileStorage wants a list of storage directories
1117
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1118

    
1119
  return []
1120

    
1121

    
1122
class LUNodeModifyStorage(NoHooksLU):
1123
  """Logical unit for modifying a storage volume on a node.
1124

1125
  """
1126
  REQ_BGL = False
1127

    
1128
  def CheckArguments(self):
1129
    (self.op.node_uuid, self.op.node_name) = \
1130
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1131

    
1132
    storage_type = self.op.storage_type
1133

    
1134
    try:
1135
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1136
    except KeyError:
1137
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1138
                                 " modified" % storage_type,
1139
                                 errors.ECODE_INVAL)
1140

    
1141
    diff = set(self.op.changes.keys()) - modifiable
1142
    if diff:
1143
      raise errors.OpPrereqError("The following fields can not be modified for"
1144
                                 " storage units of type '%s': %r" %
1145
                                 (storage_type, list(diff)),
1146
                                 errors.ECODE_INVAL)
1147

    
1148
  def CheckPrereq(self):
1149
    """Check prerequisites.
1150

1151
    """
1152
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1153

    
1154
  def ExpandNames(self):
1155
    self.needed_locks = {
1156
      locking.LEVEL_NODE: self.op.node_uuid,
1157
      }
1158

    
1159
  def Exec(self, feedback_fn):
1160
    """Computes the list of nodes and their attributes.
1161

1162
    """
1163
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1164
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1165
                                          self.op.storage_type, st_args,
1166
                                          self.op.name, self.op.changes)
1167
    result.Raise("Failed to modify storage unit '%s' on %s" %
1168
                 (self.op.name, self.op.node_name))
1169

    
1170

    
1171
def _CheckOutputFields(fields, selected):
1172
  """Checks whether all selected fields are valid according to fields.
1173

1174
  @type fields: L{utils.FieldSet}
1175
  @param fields: fields set
1176
  @type selected: L{utils.FieldSet}
1177
  @param selected: fields set
1178

1179
  """
1180
  delta = fields.NonMatching(selected)
1181
  if delta:
1182
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1183
                               % ",".join(delta), errors.ECODE_INVAL)
1184

    
1185

    
1186
class LUNodeQueryvols(NoHooksLU):
1187
  """Logical unit for getting volumes on node(s).
1188

1189
  """
1190
  REQ_BGL = False
1191

    
1192
  def CheckArguments(self):
1193
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1194
                                      constants.VF_VG, constants.VF_NAME,
1195
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1196
                       self.op.output_fields)
1197

    
1198
  def ExpandNames(self):
1199
    self.share_locks = ShareAll()
1200

    
1201
    if self.op.nodes:
1202
      self.needed_locks = {
1203
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1204
        }
1205
    else:
1206
      self.needed_locks = {
1207
        locking.LEVEL_NODE: locking.ALL_SET,
1208
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1209
        }
1210

    
1211
  def Exec(self, feedback_fn):
1212
    """Computes the list of nodes and their attributes.
1213

1214
    """
1215
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1216
    volumes = self.rpc.call_node_volumes(node_uuids)
1217

    
1218
    ilist = self.cfg.GetAllInstancesInfo()
1219
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1220

    
1221
    output = []
1222
    for node_uuid in node_uuids:
1223
      nresult = volumes[node_uuid]
1224
      if nresult.offline:
1225
        continue
1226
      msg = nresult.fail_msg
1227
      if msg:
1228
        self.LogWarning("Can't compute volume data on node %s: %s",
1229
                        self.cfg.GetNodeName(node_uuid), msg)
1230
        continue
1231

    
1232
      node_vols = sorted(nresult.payload,
1233
                         key=operator.itemgetter(constants.VF_DEV))
1234

    
1235
      for vol in node_vols:
1236
        node_output = []
1237
        for field in self.op.output_fields:
1238
          if field == constants.VF_NODE:
1239
            val = self.cfg.GetNodeName(node_uuid)
1240
          elif field == constants.VF_PHYS:
1241
            val = vol[constants.VF_DEV]
1242
          elif field == constants.VF_VG:
1243
            val = vol[constants.VF_VG]
1244
          elif field == constants.VF_NAME:
1245
            val = vol[constants.VF_NAME]
1246
          elif field == constants.VF_SIZE:
1247
            val = int(float(vol[constants.VF_SIZE]))
1248
          elif field == constants.VF_INSTANCE:
1249
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1250
                                 vol[constants.VF_NAME]), None)
1251
            if inst is not None:
1252
              val = inst.name
1253
            else:
1254
              val = "-"
1255
          else:
1256
            raise errors.ParameterError(field)
1257
          node_output.append(str(val))
1258

    
1259
        output.append(node_output)
1260

    
1261
    return output
1262

    
1263

    
1264
class LUNodeQueryStorage(NoHooksLU):
1265
  """Logical unit for getting information on storage units on node(s).
1266

1267
  """
1268
  REQ_BGL = False
1269

    
1270
  def CheckArguments(self):
1271
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1272
                       self.op.output_fields)
1273

    
1274
  def ExpandNames(self):
1275
    self.share_locks = ShareAll()
1276

    
1277
    if self.op.nodes:
1278
      self.needed_locks = {
1279
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1280
        }
1281
    else:
1282
      self.needed_locks = {
1283
        locking.LEVEL_NODE: locking.ALL_SET,
1284
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1285
        }
1286

    
1287
  def _DetermineStorageType(self):
1288
    """Determines the default storage type of the cluster.
1289

1290
    """
1291
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1292
    default_storage_type = \
1293
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1294
    return default_storage_type
1295

    
1296
  def CheckPrereq(self):
1297
    """Check prerequisites.
1298

1299
    """
1300
    if self.op.storage_type:
1301
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1302
      self.storage_type = self.op.storage_type
1303
    else:
1304
      self.storage_type = self._DetermineStorageType()
1305
      if self.storage_type not in constants.STS_REPORT:
1306
        raise errors.OpPrereqError(
1307
            "Storage reporting for storage type '%s' is not supported. Please"
1308
            " use the --storage-type option to specify one of the supported"
1309
            " storage types (%s) or set the default disk template to one that"
1310
            " supports storage reporting." %
1311
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1312

    
1313
  def Exec(self, feedback_fn):
1314
    """Computes the list of nodes and their attributes.
1315

1316
    """
1317
    if self.op.storage_type:
1318
      self.storage_type = self.op.storage_type
1319
    else:
1320
      self.storage_type = self._DetermineStorageType()
1321

    
1322
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1323

    
1324
    # Always get name to sort by
1325
    if constants.SF_NAME in self.op.output_fields:
1326
      fields = self.op.output_fields[:]
1327
    else:
1328
      fields = [constants.SF_NAME] + self.op.output_fields
1329

    
1330
    # Never ask for node or type as it's only known to the LU
1331
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1332
      while extra in fields:
1333
        fields.remove(extra)
1334

    
1335
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1336
    name_idx = field_idx[constants.SF_NAME]
1337

    
1338
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1339
    data = self.rpc.call_storage_list(self.node_uuids,
1340
                                      self.storage_type, st_args,
1341
                                      self.op.name, fields)
1342

    
1343
    result = []
1344

    
1345
    for node_uuid in utils.NiceSort(self.node_uuids):
1346
      node_name = self.cfg.GetNodeName(node_uuid)
1347
      nresult = data[node_uuid]
1348
      if nresult.offline:
1349
        continue
1350

    
1351
      msg = nresult.fail_msg
1352
      if msg:
1353
        self.LogWarning("Can't get storage data from node %s: %s",
1354
                        node_name, msg)
1355
        continue
1356

    
1357
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1358

    
1359
      for name in utils.NiceSort(rows.keys()):
1360
        row = rows[name]
1361

    
1362
        out = []
1363

    
1364
        for field in self.op.output_fields:
1365
          if field == constants.SF_NODE:
1366
            val = node_name
1367
          elif field == constants.SF_TYPE:
1368
            val = self.storage_type
1369
          elif field in field_idx:
1370
            val = row[field_idx[field]]
1371
          else:
1372
            raise errors.ParameterError(field)
1373

    
1374
          out.append(val)
1375

    
1376
        result.append(out)
1377

    
1378
    return result
1379

    
1380

    
1381
class LUNodeRemove(LogicalUnit):
1382
  """Logical unit for removing a node.
1383

1384
  """
1385
  HPATH = "node-remove"
1386
  HTYPE = constants.HTYPE_NODE
1387

    
1388
  def BuildHooksEnv(self):
1389
    """Build hooks env.
1390

1391
    """
1392
    return {
1393
      "OP_TARGET": self.op.node_name,
1394
      "NODE_NAME": self.op.node_name,
1395
      }
1396

    
1397
  def BuildHooksNodes(self):
1398
    """Build hooks nodes.
1399

1400
    This doesn't run on the target node in the pre phase as a failed
1401
    node would then be impossible to remove.
1402

1403
    """
1404
    all_nodes = self.cfg.GetNodeList()
1405
    try:
1406
      all_nodes.remove(self.op.node_uuid)
1407
    except ValueError:
1408
      pass
1409
    return (all_nodes, all_nodes)
1410

    
1411
  def CheckPrereq(self):
1412
    """Check prerequisites.
1413

1414
    This checks:
1415
     - the node exists in the configuration
1416
     - it does not have primary or secondary instances
1417
     - it's not the master
1418

1419
    Any errors are signaled by raising errors.OpPrereqError.
1420

1421
    """
1422
    (self.op.node_uuid, self.op.node_name) = \
1423
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1424
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1425
    assert node is not None
1426

    
1427
    masternode = self.cfg.GetMasterNode()
1428
    if node.uuid == masternode:
1429
      raise errors.OpPrereqError("Node is the master node, failover to another"
1430
                                 " node is required", errors.ECODE_INVAL)
1431

    
1432
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1433
      if node.uuid in instance.all_nodes:
1434
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1435
                                   " please remove first" % instance.name,
1436
                                   errors.ECODE_INVAL)
1437
    self.op.node_name = node.name
1438
    self.node = node
1439

    
1440
  def Exec(self, feedback_fn):
1441
    """Removes the node from the cluster.
1442

1443
    """
1444
    logging.info("Stopping the node daemon and removing configs from node %s",
1445
                 self.node.name)
1446

    
1447
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1448

    
1449
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1450
      "Not owning BGL"
1451

    
1452
    # Promote nodes to master candidate as needed
1453
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1454
    self.context.RemoveNode(self.node)
1455

    
1456
    # Run post hooks on the node before it's removed
1457
    RunPostHook(self, self.node.name)
1458

    
1459
    # we have to call this by name rather than by UUID, as the node is no longer
1460
    # in the config
1461
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1462
    msg = result.fail_msg
1463
    if msg:
1464
      self.LogWarning("Errors encountered on the remote node while leaving"
1465
                      " the cluster: %s", msg)
1466

    
1467
    # Remove node from our /etc/hosts
1468
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1469
      master_node_uuid = self.cfg.GetMasterNode()
1470
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1471
                                              constants.ETC_HOSTS_REMOVE,
1472
                                              self.node.name, None)
1473
      result.Raise("Can't update hosts file with new host data")
1474
      RedistributeAncillaryFiles(self)
1475

    
1476

    
1477
class LURepairNodeStorage(NoHooksLU):
1478
  """Repairs the volume group on a node.
1479

1480
  """
1481
  REQ_BGL = False
1482

    
1483
  def CheckArguments(self):
1484
    (self.op.node_uuid, self.op.node_name) = \
1485
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1486

    
1487
    storage_type = self.op.storage_type
1488

    
1489
    if (constants.SO_FIX_CONSISTENCY not in
1490
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1491
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1492
                                 " repaired" % storage_type,
1493
                                 errors.ECODE_INVAL)
1494

    
1495
  def ExpandNames(self):
1496
    self.needed_locks = {
1497
      locking.LEVEL_NODE: [self.op.node_uuid],
1498
      }
1499

    
1500
  def _CheckFaultyDisks(self, instance, node_uuid):
1501
    """Ensure faulty disks abort the opcode or at least warn."""
1502
    try:
1503
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1504
                                 node_uuid, True):
1505
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1506
                                   " node '%s'" %
1507
                                   (instance.name,
1508
                                    self.cfg.GetNodeName(node_uuid)),
1509
                                   errors.ECODE_STATE)
1510
    except errors.OpPrereqError, err:
1511
      if self.op.ignore_consistency:
1512
        self.LogWarning(str(err.args[0]))
1513
      else:
1514
        raise
1515

    
1516
  def CheckPrereq(self):
1517
    """Check prerequisites.
1518

1519
    """
1520
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1521

    
1522
    # Check whether any instance on this node has faulty disks
1523
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1524
      if not inst.disks_active:
1525
        continue
1526
      check_nodes = set(inst.all_nodes)
1527
      check_nodes.discard(self.op.node_uuid)
1528
      for inst_node_uuid in check_nodes:
1529
        self._CheckFaultyDisks(inst, inst_node_uuid)
1530

    
1531
  def Exec(self, feedback_fn):
1532
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1533
                (self.op.name, self.op.node_name))
1534

    
1535
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1536
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1537
                                           self.op.storage_type, st_args,
1538
                                           self.op.name,
1539
                                           constants.SO_FIX_CONSISTENCY)
1540
    result.Raise("Failed to repair storage unit '%s' on %s" %
1541
                 (self.op.name, self.op.node_name))