Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 5a904197

History | View | Annotate | Download (56.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
46

    
47

    
48
def _DecideSelfPromotion(lu, exceptions=None):
49
  """Decide whether I should promote myself as a master candidate.
50

51
  """
52
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
53
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
54
  # the new node will increase mc_max with one, so:
55
  mc_should = min(mc_should + 1, cp_size)
56
  return mc_now < mc_should
57

    
58

    
59
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
60
  """Ensure that a node has the given secondary ip.
61

62
  @type lu: L{LogicalUnit}
63
  @param lu: the LU on behalf of which we make the check
64
  @type node: L{objects.Node}
65
  @param node: the node to check
66
  @type secondary_ip: string
67
  @param secondary_ip: the ip to check
68
  @type prereq: boolean
69
  @param prereq: whether to throw a prerequisite or an execute error
70
  @raise errors.OpPrereqError: if the node doesn't have the ip,
71
  and prereq=True
72
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
73

74
  """
75
  # this can be called with a new node, which has no UUID yet, so perform the
76
  # RPC call using its name
77
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
78
  result.Raise("Failure checking secondary ip on node %s" % node.name,
79
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
80
  if not result.payload:
81
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82
           " please fix and re-run this command" % secondary_ip)
83
    if prereq:
84
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85
    else:
86
      raise errors.OpExecError(msg)
87

    
88

    
89
class LUNodeAdd(LogicalUnit):
90
  """Logical unit for adding node to the cluster.
91

92
  """
93
  HPATH = "node-add"
94
  HTYPE = constants.HTYPE_NODE
95
  _NFLAGS = ["master_capable", "vm_capable"]
96

    
97
  def CheckArguments(self):
98
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99
    # validate/normalize the node name
100
    self.hostname = netutils.GetHostname(name=self.op.node_name,
101
                                         family=self.primary_ip_family)
102
    self.op.node_name = self.hostname.name
103

    
104
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
105
      raise errors.OpPrereqError("Cannot readd the master node",
106
                                 errors.ECODE_STATE)
107

    
108
    if self.op.readd and self.op.group:
109
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
110
                                 " being readded", errors.ECODE_INVAL)
111

    
112
  def BuildHooksEnv(self):
113
    """Build hooks env.
114

115
    This will run on all nodes before, and on all nodes + the new node after.
116

117
    """
118
    return {
119
      "OP_TARGET": self.op.node_name,
120
      "NODE_NAME": self.op.node_name,
121
      "NODE_PIP": self.op.primary_ip,
122
      "NODE_SIP": self.op.secondary_ip,
123
      "MASTER_CAPABLE": str(self.op.master_capable),
124
      "VM_CAPABLE": str(self.op.vm_capable),
125
      }
126

    
127
  def BuildHooksNodes(self):
128
    """Build hooks nodes.
129

130
    """
131
    hook_nodes = self.cfg.GetNodeList()
132
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
133
    if new_node_info is not None:
134
      # Exclude added node
135
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
136

    
137
    # add the new node as post hook node by name; it does not have an UUID yet
138
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
139

    
140
  def CheckPrereq(self):
141
    """Check prerequisites.
142

143
    This checks:
144
     - the new node is not already in the config
145
     - it is resolvable
146
     - its parameters (single/dual homed) matches the cluster
147

148
    Any errors are signaled by raising errors.OpPrereqError.
149

150
    """
151
    node_name = self.hostname.name
152
    self.op.primary_ip = self.hostname.ip
153
    if self.op.secondary_ip is None:
154
      if self.primary_ip_family == netutils.IP6Address.family:
155
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
156
                                   " IPv4 address must be given as secondary",
157
                                   errors.ECODE_INVAL)
158
      self.op.secondary_ip = self.op.primary_ip
159

    
160
    secondary_ip = self.op.secondary_ip
161
    if not netutils.IP4Address.IsValid(secondary_ip):
162
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
163
                                 " address" % secondary_ip, errors.ECODE_INVAL)
164

    
165
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
166
    if not self.op.readd and existing_node_info is not None:
167
      raise errors.OpPrereqError("Node %s is already in the configuration" %
168
                                 node_name, errors.ECODE_EXISTS)
169
    elif self.op.readd and existing_node_info is None:
170
      raise errors.OpPrereqError("Node %s is not in the configuration" %
171
                                 node_name, errors.ECODE_NOENT)
172

    
173
    self.changed_primary_ip = False
174

    
175
    for existing_node in self.cfg.GetAllNodesInfo().values():
176
      if self.op.readd and node_name == existing_node.name:
177
        if existing_node.secondary_ip != secondary_ip:
178
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
179
                                     " address configuration as before",
180
                                     errors.ECODE_INVAL)
181
        if existing_node.primary_ip != self.op.primary_ip:
182
          self.changed_primary_ip = True
183

    
184
        continue
185

    
186
      if (existing_node.primary_ip == self.op.primary_ip or
187
          existing_node.secondary_ip == self.op.primary_ip or
188
          existing_node.primary_ip == secondary_ip or
189
          existing_node.secondary_ip == secondary_ip):
190
        raise errors.OpPrereqError("New node ip address(es) conflict with"
191
                                   " existing node %s" % existing_node.name,
192
                                   errors.ECODE_NOTUNIQUE)
193

    
194
    # After this 'if' block, None is no longer a valid value for the
195
    # _capable op attributes
196
    if self.op.readd:
197
      assert existing_node_info is not None, \
198
        "Can't retrieve locked node %s" % node_name
199
      for attr in self._NFLAGS:
200
        if getattr(self.op, attr) is None:
201
          setattr(self.op, attr, getattr(existing_node_info, attr))
202
    else:
203
      for attr in self._NFLAGS:
204
        if getattr(self.op, attr) is None:
205
          setattr(self.op, attr, True)
206

    
207
    if self.op.readd and not self.op.vm_capable:
208
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
209
      if pri or sec:
210
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
211
                                   " flag set to false, but it already holds"
212
                                   " instances" % node_name,
213
                                   errors.ECODE_STATE)
214

    
215
    # check that the type of the node (single versus dual homed) is the
216
    # same as for the master
217
    myself = self.cfg.GetMasterNodeInfo()
218
    master_singlehomed = myself.secondary_ip == myself.primary_ip
219
    newbie_singlehomed = secondary_ip == self.op.primary_ip
220
    if master_singlehomed != newbie_singlehomed:
221
      if master_singlehomed:
222
        raise errors.OpPrereqError("The master has no secondary ip but the"
223
                                   " new node has one",
224
                                   errors.ECODE_INVAL)
225
      else:
226
        raise errors.OpPrereqError("The master has a secondary ip but the"
227
                                   " new node doesn't have one",
228
                                   errors.ECODE_INVAL)
229

    
230
    # checks reachability
231
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
232
      raise errors.OpPrereqError("Node not reachable by ping",
233
                                 errors.ECODE_ENVIRON)
234

    
235
    if not newbie_singlehomed:
236
      # check reachability from my secondary ip to newbie's secondary ip
237
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
238
                              source=myself.secondary_ip):
239
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
240
                                   " based ping to node daemon port",
241
                                   errors.ECODE_ENVIRON)
242

    
243
    if self.op.readd:
244
      exceptions = [existing_node_info.uuid]
245
    else:
246
      exceptions = []
247

    
248
    if self.op.master_capable:
249
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
250
    else:
251
      self.master_candidate = False
252

    
253
    node_group = self.cfg.LookupNodeGroup(self.op.group)
254

    
255
    if self.op.readd:
256
      self.new_node = existing_node_info
257
    else:
258
      self.new_node = objects.Node(name=node_name,
259
                                   primary_ip=self.op.primary_ip,
260
                                   secondary_ip=secondary_ip,
261
                                   master_candidate=self.master_candidate,
262
                                   offline=False, drained=False,
263
                                   group=node_group, ndparams={})
264

    
265
    if self.op.ndparams:
266
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
267
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
268
                           "node", "cluster or group")
269

    
270
    if self.op.hv_state:
271
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
272

    
273
    if self.op.disk_state:
274
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
275

    
276
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
277
    #       it a property on the base class.
278
    rpcrunner = rpc.DnsOnlyRunner()
279
    result = rpcrunner.call_version([node_name])[node_name]
280
    result.Raise("Can't get version information from node %s" % node_name)
281
    if constants.PROTOCOL_VERSION == result.payload:
282
      logging.info("Communication to node %s fine, sw version %s match",
283
                   node_name, result.payload)
284
    else:
285
      raise errors.OpPrereqError("Version mismatch master version %s,"
286
                                 " node version %s" %
287
                                 (constants.PROTOCOL_VERSION, result.payload),
288
                                 errors.ECODE_ENVIRON)
289

    
290
    vg_name = self.cfg.GetVGName()
291
    if vg_name is not None:
292
      vparams = {constants.NV_PVLIST: [vg_name]}
293
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
294
      cname = self.cfg.GetClusterName()
295
      result = rpcrunner.call_node_verify_light(
296
          [node_name], vparams, cname,
297
          self.cfg.GetClusterInfo().hvparams,
298
          {node_name: node_group},
299
          self.cfg.GetAllNodeGroupsInfoDict()
300
        )[node_name]
301
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302
      if errmsgs:
303
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
304
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
305

    
306
  def _InitOpenVSwitch(self):
307
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
308
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
309

    
310
    ovs = filled_ndparams.get(constants.ND_OVS, None)
311
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
312
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
313

    
314
    if ovs:
315
      if not ovs_link:
316
        self.LogInfo("No physical interface for OpenvSwitch was given."
317
                     " OpenvSwitch will not have an outside connection. This"
318
                     " might not be what you want.")
319

    
320
      result = self.rpc.call_node_configure_ovs(
321
                 self.new_node.name, ovs_name, ovs_link)
322
      result.Raise("Failed to initialize OpenVSwitch on new node")
323

    
324
  def Exec(self, feedback_fn):
325
    """Adds the new node to the cluster.
326

327
    """
328
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
329
      "Not owning BGL"
330

    
331
    # We adding a new node so we assume it's powered
332
    self.new_node.powered = True
333

    
334
    # for re-adds, reset the offline/drained/master-candidate flags;
335
    # we need to reset here, otherwise offline would prevent RPC calls
336
    # later in the procedure; this also means that if the re-add
337
    # fails, we are left with a non-offlined, broken node
338
    if self.op.readd:
339
      self.new_node.offline = False
340
      self.new_node.drained = False
341
      self.LogInfo("Readding a node, the offline/drained flags were reset")
342
      # if we demote the node, we do cleanup later in the procedure
343
      self.new_node.master_candidate = self.master_candidate
344
      if self.changed_primary_ip:
345
        self.new_node.primary_ip = self.op.primary_ip
346

    
347
    # copy the master/vm_capable flags
348
    for attr in self._NFLAGS:
349
      setattr(self.new_node, attr, getattr(self.op, attr))
350

    
351
    # notify the user about any possible mc promotion
352
    if self.new_node.master_candidate:
353
      self.LogInfo("Node will be a master candidate")
354

    
355
    if self.op.ndparams:
356
      self.new_node.ndparams = self.op.ndparams
357
    else:
358
      self.new_node.ndparams = {}
359

    
360
    if self.op.hv_state:
361
      self.new_node.hv_state_static = self.new_hv_state
362

    
363
    if self.op.disk_state:
364
      self.new_node.disk_state_static = self.new_disk_state
365

    
366
    # Add node to our /etc/hosts, and add key to known_hosts
367
    if self.cfg.GetClusterInfo().modify_etc_hosts:
368
      master_node = self.cfg.GetMasterNode()
369
      result = self.rpc.call_etc_hosts_modify(
370
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
371
                 self.hostname.ip)
372
      result.Raise("Can't update hosts file with new host data")
373

    
374
    if self.new_node.secondary_ip != self.new_node.primary_ip:
375
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
376
                               False)
377

    
378
    node_verifier_uuids = [self.cfg.GetMasterNode()]
379
    node_verify_param = {
380
      constants.NV_NODELIST: ([self.new_node.name], {}),
381
      # TODO: do a node-net-test as well?
382
    }
383

    
384
    result = self.rpc.call_node_verify(
385
               node_verifier_uuids, node_verify_param,
386
               self.cfg.GetClusterName(),
387
               self.cfg.GetClusterInfo().hvparams,
388
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
389
               self.cfg.GetAllNodeGroupsInfoDict()
390
               )
391
    for verifier in node_verifier_uuids:
392
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
393
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
394
      if nl_payload:
395
        for failed in nl_payload:
396
          feedback_fn("ssh/hostname verification failed"
397
                      " (checking from %s): %s" %
398
                      (verifier, nl_payload[failed]))
399
        raise errors.OpExecError("ssh/hostname verification failed")
400

    
401
    self._InitOpenVSwitch()
402

    
403
    if self.op.readd:
404
      self.context.ReaddNode(self.new_node)
405
      RedistributeAncillaryFiles(self)
406
      # make sure we redistribute the config
407
      self.cfg.Update(self.new_node, feedback_fn)
408
      # and make sure the new node will not have old files around
409
      if not self.new_node.master_candidate:
410
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
411
        result.Warn("Node failed to demote itself from master candidate status",
412
                    self.LogWarning)
413
    else:
414
      self.context.AddNode(self.new_node, self.proc.GetECId())
415
      RedistributeAncillaryFiles(self)
416

    
417

    
418
class LUNodeSetParams(LogicalUnit):
419
  """Modifies the parameters of a node.
420

421
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
422
      to the node role (as _ROLE_*)
423
  @cvar _R2F: a dictionary from node role to tuples of flags
424
  @cvar _FLAGS: a list of attribute names corresponding to the flags
425

426
  """
427
  HPATH = "node-modify"
428
  HTYPE = constants.HTYPE_NODE
429
  REQ_BGL = False
430
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
431
  _F2R = {
432
    (True, False, False): _ROLE_CANDIDATE,
433
    (False, True, False): _ROLE_DRAINED,
434
    (False, False, True): _ROLE_OFFLINE,
435
    (False, False, False): _ROLE_REGULAR,
436
    }
437
  _R2F = dict((v, k) for k, v in _F2R.items())
438
  _FLAGS = ["master_candidate", "drained", "offline"]
439

    
440
  def CheckArguments(self):
441
    (self.op.node_uuid, self.op.node_name) = \
442
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
443
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
444
                self.op.master_capable, self.op.vm_capable,
445
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
446
                self.op.disk_state]
447
    if all_mods.count(None) == len(all_mods):
448
      raise errors.OpPrereqError("Please pass at least one modification",
449
                                 errors.ECODE_INVAL)
450
    if all_mods.count(True) > 1:
451
      raise errors.OpPrereqError("Can't set the node into more than one"
452
                                 " state at the same time",
453
                                 errors.ECODE_INVAL)
454

    
455
    # Boolean value that tells us whether we might be demoting from MC
456
    self.might_demote = (self.op.master_candidate is False or
457
                         self.op.offline is True or
458
                         self.op.drained is True or
459
                         self.op.master_capable is False)
460

    
461
    if self.op.secondary_ip:
462
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
463
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
464
                                   " address" % self.op.secondary_ip,
465
                                   errors.ECODE_INVAL)
466

    
467
    self.lock_all = self.op.auto_promote and self.might_demote
468
    self.lock_instances = self.op.secondary_ip is not None
469

    
470
  def _InstanceFilter(self, instance):
471
    """Filter for getting affected instances.
472

473
    """
474
    return (instance.disk_template in constants.DTS_INT_MIRROR and
475
            self.op.node_uuid in instance.all_nodes)
476

    
477
  def ExpandNames(self):
478
    if self.lock_all:
479
      self.needed_locks = {
480
        locking.LEVEL_NODE: locking.ALL_SET,
481

    
482
        # Block allocations when all nodes are locked
483
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
484
        }
485
    else:
486
      self.needed_locks = {
487
        locking.LEVEL_NODE: self.op.node_uuid,
488
        }
489

    
490
    # Since modifying a node can have severe effects on currently running
491
    # operations the resource lock is at least acquired in shared mode
492
    self.needed_locks[locking.LEVEL_NODE_RES] = \
493
      self.needed_locks[locking.LEVEL_NODE]
494

    
495
    # Get all locks except nodes in shared mode; they are not used for anything
496
    # but read-only access
497
    self.share_locks = ShareAll()
498
    self.share_locks[locking.LEVEL_NODE] = 0
499
    self.share_locks[locking.LEVEL_NODE_RES] = 0
500
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
501

    
502
    if self.lock_instances:
503
      self.needed_locks[locking.LEVEL_INSTANCE] = \
504
        self.cfg.GetInstanceNames(
505
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
506

    
507
  def BuildHooksEnv(self):
508
    """Build hooks env.
509

510
    This runs on the master node.
511

512
    """
513
    return {
514
      "OP_TARGET": self.op.node_name,
515
      "MASTER_CANDIDATE": str(self.op.master_candidate),
516
      "OFFLINE": str(self.op.offline),
517
      "DRAINED": str(self.op.drained),
518
      "MASTER_CAPABLE": str(self.op.master_capable),
519
      "VM_CAPABLE": str(self.op.vm_capable),
520
      }
521

    
522
  def BuildHooksNodes(self):
523
    """Build hooks nodes.
524

525
    """
526
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
527
    return (nl, nl)
528

    
529
  def CheckPrereq(self):
530
    """Check prerequisites.
531

532
    This only checks the instance list against the existing names.
533

534
    """
535
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
536
    if self.lock_instances:
537
      affected_instances = \
538
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
539

    
540
      # Verify instance locks
541
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
542
      wanted_instance_names = frozenset([inst.name for inst in
543
                                         affected_instances.values()])
544
      if wanted_instance_names - owned_instance_names:
545
        raise errors.OpPrereqError("Instances affected by changing node %s's"
546
                                   " secondary IP address have changed since"
547
                                   " locks were acquired, wanted '%s', have"
548
                                   " '%s'; retry the operation" %
549
                                   (node.name,
550
                                    utils.CommaJoin(wanted_instance_names),
551
                                    utils.CommaJoin(owned_instance_names)),
552
                                   errors.ECODE_STATE)
553
    else:
554
      affected_instances = None
555

    
556
    if (self.op.master_candidate is not None or
557
        self.op.drained is not None or
558
        self.op.offline is not None):
559
      # we can't change the master's node flags
560
      if node.uuid == self.cfg.GetMasterNode():
561
        raise errors.OpPrereqError("The master role can be changed"
562
                                   " only via master-failover",
563
                                   errors.ECODE_INVAL)
564

    
565
    if self.op.master_candidate and not node.master_capable:
566
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
567
                                 " it a master candidate" % node.name,
568
                                 errors.ECODE_STATE)
569

    
570
    if self.op.vm_capable is False:
571
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
572
      if ipri or isec:
573
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
574
                                   " the vm_capable flag" % node.name,
575
                                   errors.ECODE_STATE)
576

    
577
    if node.master_candidate and self.might_demote and not self.lock_all:
578
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
579
      # check if after removing the current node, we're missing master
580
      # candidates
581
      (mc_remaining, mc_should, _) = \
582
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
583
      if mc_remaining < mc_should:
584
        raise errors.OpPrereqError("Not enough master candidates, please"
585
                                   " pass auto promote option to allow"
586
                                   " promotion (--auto-promote or RAPI"
587
                                   " auto_promote=True)", errors.ECODE_STATE)
588

    
589
    self.old_flags = old_flags = (node.master_candidate,
590
                                  node.drained, node.offline)
591
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
592
    self.old_role = old_role = self._F2R[old_flags]
593

    
594
    # Check for ineffective changes
595
    for attr in self._FLAGS:
596
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
597
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
598
        setattr(self.op, attr, None)
599

    
600
    # Past this point, any flag change to False means a transition
601
    # away from the respective state, as only real changes are kept
602

    
603
    # TODO: We might query the real power state if it supports OOB
604
    if SupportsOob(self.cfg, node):
605
      if self.op.offline is False and not (node.powered or
606
                                           self.op.powered is True):
607
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
608
                                    " offline status can be reset") %
609
                                   self.op.node_name, errors.ECODE_STATE)
610
    elif self.op.powered is not None:
611
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
612
                                  " as it does not support out-of-band"
613
                                  " handling") % self.op.node_name,
614
                                 errors.ECODE_STATE)
615

    
616
    # If we're being deofflined/drained, we'll MC ourself if needed
617
    if (self.op.drained is False or self.op.offline is False or
618
        (self.op.master_capable and not node.master_capable)):
619
      if _DecideSelfPromotion(self):
620
        self.op.master_candidate = True
621
        self.LogInfo("Auto-promoting node to master candidate")
622

    
623
    # If we're no longer master capable, we'll demote ourselves from MC
624
    if self.op.master_capable is False and node.master_candidate:
625
      self.LogInfo("Demoting from master candidate")
626
      self.op.master_candidate = False
627

    
628
    # Compute new role
629
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
630
    if self.op.master_candidate:
631
      new_role = self._ROLE_CANDIDATE
632
    elif self.op.drained:
633
      new_role = self._ROLE_DRAINED
634
    elif self.op.offline:
635
      new_role = self._ROLE_OFFLINE
636
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
637
      # False is still in new flags, which means we're un-setting (the
638
      # only) True flag
639
      new_role = self._ROLE_REGULAR
640
    else: # no new flags, nothing, keep old role
641
      new_role = old_role
642

    
643
    self.new_role = new_role
644

    
645
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
646
      # Trying to transition out of offline status
647
      result = self.rpc.call_version([node.uuid])[node.uuid]
648
      if result.fail_msg:
649
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
650
                                   " to report its version: %s" %
651
                                   (node.name, result.fail_msg),
652
                                   errors.ECODE_STATE)
653
      else:
654
        self.LogWarning("Transitioning node from offline to online state"
655
                        " without using re-add. Please make sure the node"
656
                        " is healthy!")
657

    
658
    # When changing the secondary ip, verify if this is a single-homed to
659
    # multi-homed transition or vice versa, and apply the relevant
660
    # restrictions.
661
    if self.op.secondary_ip:
662
      # Ok even without locking, because this can't be changed by any LU
663
      master = self.cfg.GetMasterNodeInfo()
664
      master_singlehomed = master.secondary_ip == master.primary_ip
665
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
666
        if self.op.force and node.uuid == master.uuid:
667
          self.LogWarning("Transitioning from single-homed to multi-homed"
668
                          " cluster; all nodes will require a secondary IP"
669
                          " address")
670
        else:
671
          raise errors.OpPrereqError("Changing the secondary ip on a"
672
                                     " single-homed cluster requires the"
673
                                     " --force option to be passed, and the"
674
                                     " target node to be the master",
675
                                     errors.ECODE_INVAL)
676
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
677
        if self.op.force and node.uuid == master.uuid:
678
          self.LogWarning("Transitioning from multi-homed to single-homed"
679
                          " cluster; secondary IP addresses will have to be"
680
                          " removed")
681
        else:
682
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
683
                                     " same as the primary IP on a multi-homed"
684
                                     " cluster, unless the --force option is"
685
                                     " passed, and the target node is the"
686
                                     " master", errors.ECODE_INVAL)
687

    
688
      assert not (set([inst.name for inst in affected_instances.values()]) -
689
                  self.owned_locks(locking.LEVEL_INSTANCE))
690

    
691
      if node.offline:
692
        if affected_instances:
693
          msg = ("Cannot change secondary IP address: offline node has"
694
                 " instances (%s) configured to use it" %
695
                 utils.CommaJoin(
696
                   [inst.name for inst in affected_instances.values()]))
697
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
698
      else:
699
        # On online nodes, check that no instances are running, and that
700
        # the node has the new ip and we can reach it.
701
        for instance in affected_instances.values():
702
          CheckInstanceState(self, instance, INSTANCE_DOWN,
703
                             msg="cannot change secondary ip")
704

    
705
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
706
        if master.uuid != node.uuid:
707
          # check reachability from master secondary ip to new secondary ip
708
          if not netutils.TcpPing(self.op.secondary_ip,
709
                                  constants.DEFAULT_NODED_PORT,
710
                                  source=master.secondary_ip):
711
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
712
                                       " based ping to node daemon port",
713
                                       errors.ECODE_ENVIRON)
714

    
715
    if self.op.ndparams:
716
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
717
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
718
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
719
                           "node", "cluster or group")
720
      self.new_ndparams = new_ndparams
721

    
722
    if self.op.hv_state:
723
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
724
                                                node.hv_state_static)
725

    
726
    if self.op.disk_state:
727
      self.new_disk_state = \
728
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
729

    
730
  def Exec(self, feedback_fn):
731
    """Modifies a node.
732

733
    """
734
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
735
    result = []
736

    
737
    if self.op.ndparams:
738
      node.ndparams = self.new_ndparams
739

    
740
    if self.op.powered is not None:
741
      node.powered = self.op.powered
742

    
743
    if self.op.hv_state:
744
      node.hv_state_static = self.new_hv_state
745

    
746
    if self.op.disk_state:
747
      node.disk_state_static = self.new_disk_state
748

    
749
    for attr in ["master_capable", "vm_capable"]:
750
      val = getattr(self.op, attr)
751
      if val is not None:
752
        setattr(node, attr, val)
753
        result.append((attr, str(val)))
754

    
755
    if self.new_role != self.old_role:
756
      # Tell the node to demote itself, if no longer MC and not offline
757
      if self.old_role == self._ROLE_CANDIDATE and \
758
          self.new_role != self._ROLE_OFFLINE:
759
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
760
        if msg:
761
          self.LogWarning("Node failed to demote itself: %s", msg)
762

    
763
      new_flags = self._R2F[self.new_role]
764
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
765
        if of != nf:
766
          result.append((desc, str(nf)))
767
      (node.master_candidate, node.drained, node.offline) = new_flags
768

    
769
      # we locked all nodes, we adjust the CP before updating this node
770
      if self.lock_all:
771
        AdjustCandidatePool(self, [node.uuid])
772

    
773
    if self.op.secondary_ip:
774
      node.secondary_ip = self.op.secondary_ip
775
      result.append(("secondary_ip", self.op.secondary_ip))
776

    
777
    # this will trigger configuration file update, if needed
778
    self.cfg.Update(node, feedback_fn)
779

    
780
    # this will trigger job queue propagation or cleanup if the mc
781
    # flag changed
782
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
783
      self.context.ReaddNode(node)
784

    
785
    return result
786

    
787

    
788
class LUNodePowercycle(NoHooksLU):
789
  """Powercycles a node.
790

791
  """
792
  REQ_BGL = False
793

    
794
  def CheckArguments(self):
795
    (self.op.node_uuid, self.op.node_name) = \
796
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
797

    
798
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
799
      raise errors.OpPrereqError("The node is the master and the force"
800
                                 " parameter was not set",
801
                                 errors.ECODE_INVAL)
802

    
803
  def ExpandNames(self):
804
    """Locking for PowercycleNode.
805

806
    This is a last-resort option and shouldn't block on other
807
    jobs. Therefore, we grab no locks.
808

809
    """
810
    self.needed_locks = {}
811

    
812
  def Exec(self, feedback_fn):
813
    """Reboots a node.
814

815
    """
816
    default_hypervisor = self.cfg.GetHypervisorType()
817
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
818
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
819
                                           default_hypervisor,
820
                                           hvparams)
821
    result.Raise("Failed to schedule the reboot")
822
    return result.payload
823

    
824

    
825
def _GetNodeInstancesInner(cfg, fn):
826
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
827

    
828

    
829
def _GetNodePrimaryInstances(cfg, node_uuid):
830
  """Returns primary instances on a node.
831

832
  """
833
  return _GetNodeInstancesInner(cfg,
834
                                lambda inst: node_uuid == inst.primary_node)
835

    
836

    
837
def _GetNodeSecondaryInstances(cfg, node_uuid):
838
  """Returns secondary instances on a node.
839

840
  """
841
  return _GetNodeInstancesInner(cfg,
842
                                lambda inst: node_uuid in inst.secondary_nodes)
843

    
844

    
845
def _GetNodeInstances(cfg, node_uuid):
846
  """Returns a list of all primary and secondary instances on a node.
847

848
  """
849

    
850
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
851

    
852

    
853
class LUNodeEvacuate(NoHooksLU):
854
  """Evacuates instances off a list of nodes.
855

856
  """
857
  REQ_BGL = False
858

    
859
  def CheckArguments(self):
860
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
861

    
862
  def ExpandNames(self):
863
    (self.op.node_uuid, self.op.node_name) = \
864
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
865

    
866
    if self.op.remote_node is not None:
867
      (self.op.remote_node_uuid, self.op.remote_node) = \
868
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
869
                              self.op.remote_node)
870
      assert self.op.remote_node
871

    
872
      if self.op.node_uuid == self.op.remote_node_uuid:
873
        raise errors.OpPrereqError("Can not use evacuated node as a new"
874
                                   " secondary node", errors.ECODE_INVAL)
875

    
876
      if self.op.mode != constants.NODE_EVAC_SEC:
877
        raise errors.OpPrereqError("Without the use of an iallocator only"
878
                                   " secondary instances can be evacuated",
879
                                   errors.ECODE_INVAL)
880

    
881
    # Declare locks
882
    self.share_locks = ShareAll()
883
    self.needed_locks = {
884
      locking.LEVEL_INSTANCE: [],
885
      locking.LEVEL_NODEGROUP: [],
886
      locking.LEVEL_NODE: [],
887
      }
888

    
889
    # Determine nodes (via group) optimistically, needs verification once locks
890
    # have been acquired
891
    self.lock_nodes = self._DetermineNodes()
892

    
893
  def _DetermineNodes(self):
894
    """Gets the list of node UUIDs to operate on.
895

896
    """
897
    if self.op.remote_node is None:
898
      # Iallocator will choose any node(s) in the same group
899
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
900
    else:
901
      group_nodes = frozenset([self.op.remote_node_uuid])
902

    
903
    # Determine nodes to be locked
904
    return set([self.op.node_uuid]) | group_nodes
905

    
906
  def _DetermineInstances(self):
907
    """Builds list of instances to operate on.
908

909
    """
910
    assert self.op.mode in constants.NODE_EVAC_MODES
911

    
912
    if self.op.mode == constants.NODE_EVAC_PRI:
913
      # Primary instances only
914
      inst_fn = _GetNodePrimaryInstances
915
      assert self.op.remote_node is None, \
916
        "Evacuating primary instances requires iallocator"
917
    elif self.op.mode == constants.NODE_EVAC_SEC:
918
      # Secondary instances only
919
      inst_fn = _GetNodeSecondaryInstances
920
    else:
921
      # All instances
922
      assert self.op.mode == constants.NODE_EVAC_ALL
923
      inst_fn = _GetNodeInstances
924
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
925
      # per instance
926
      raise errors.OpPrereqError("Due to an issue with the iallocator"
927
                                 " interface it is not possible to evacuate"
928
                                 " all instances at once; specify explicitly"
929
                                 " whether to evacuate primary or secondary"
930
                                 " instances",
931
                                 errors.ECODE_INVAL)
932

    
933
    return inst_fn(self.cfg, self.op.node_uuid)
934

    
935
  def DeclareLocks(self, level):
936
    if level == locking.LEVEL_INSTANCE:
937
      # Lock instances optimistically, needs verification once node and group
938
      # locks have been acquired
939
      self.needed_locks[locking.LEVEL_INSTANCE] = \
940
        set(i.name for i in self._DetermineInstances())
941

    
942
    elif level == locking.LEVEL_NODEGROUP:
943
      # Lock node groups for all potential target nodes optimistically, needs
944
      # verification once nodes have been acquired
945
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
946
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
947

    
948
    elif level == locking.LEVEL_NODE:
949
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
950

    
951
  def CheckPrereq(self):
952
    # Verify locks
953
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
954
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
955
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
956

    
957
    need_nodes = self._DetermineNodes()
958

    
959
    if not owned_nodes.issuperset(need_nodes):
960
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
961
                                 " locks were acquired, current nodes are"
962
                                 " are '%s', used to be '%s'; retry the"
963
                                 " operation" %
964
                                 (self.op.node_name,
965
                                  utils.CommaJoin(need_nodes),
966
                                  utils.CommaJoin(owned_nodes)),
967
                                 errors.ECODE_STATE)
968

    
969
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
970
    if owned_groups != wanted_groups:
971
      raise errors.OpExecError("Node groups changed since locks were acquired,"
972
                               " current groups are '%s', used to be '%s';"
973
                               " retry the operation" %
974
                               (utils.CommaJoin(wanted_groups),
975
                                utils.CommaJoin(owned_groups)))
976

    
977
    # Determine affected instances
978
    self.instances = self._DetermineInstances()
979
    self.instance_names = [i.name for i in self.instances]
980

    
981
    if set(self.instance_names) != owned_instance_names:
982
      raise errors.OpExecError("Instances on node '%s' changed since locks"
983
                               " were acquired, current instances are '%s',"
984
                               " used to be '%s'; retry the operation" %
985
                               (self.op.node_name,
986
                                utils.CommaJoin(self.instance_names),
987
                                utils.CommaJoin(owned_instance_names)))
988

    
989
    if self.instance_names:
990
      self.LogInfo("Evacuating instances from node '%s': %s",
991
                   self.op.node_name,
992
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
993
    else:
994
      self.LogInfo("No instances to evacuate from node '%s'",
995
                   self.op.node_name)
996

    
997
    if self.op.remote_node is not None:
998
      for i in self.instances:
999
        if i.primary_node == self.op.remote_node_uuid:
1000
          raise errors.OpPrereqError("Node %s is the primary node of"
1001
                                     " instance %s, cannot use it as"
1002
                                     " secondary" %
1003
                                     (self.op.remote_node, i.name),
1004
                                     errors.ECODE_INVAL)
1005

    
1006
  def Exec(self, feedback_fn):
1007
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1008

    
1009
    if not self.instance_names:
1010
      # No instances to evacuate
1011
      jobs = []
1012

    
1013
    elif self.op.iallocator is not None:
1014
      # TODO: Implement relocation to other group
1015
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1016
                                     instances=list(self.instance_names))
1017
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1018

    
1019
      ial.Run(self.op.iallocator)
1020

    
1021
      if not ial.success:
1022
        raise errors.OpPrereqError("Can't compute node evacuation using"
1023
                                   " iallocator '%s': %s" %
1024
                                   (self.op.iallocator, ial.info),
1025
                                   errors.ECODE_NORES)
1026

    
1027
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1028

    
1029
    elif self.op.remote_node is not None:
1030
      assert self.op.mode == constants.NODE_EVAC_SEC
1031
      jobs = [
1032
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1033
                                        remote_node=self.op.remote_node,
1034
                                        disks=[],
1035
                                        mode=constants.REPLACE_DISK_CHG,
1036
                                        early_release=self.op.early_release)]
1037
        for instance_name in self.instance_names]
1038

    
1039
    else:
1040
      raise errors.ProgrammerError("No iallocator or remote node")
1041

    
1042
    return ResultWithJobs(jobs)
1043

    
1044

    
1045
class LUNodeMigrate(LogicalUnit):
1046
  """Migrate all instances from a node.
1047

1048
  """
1049
  HPATH = "node-migrate"
1050
  HTYPE = constants.HTYPE_NODE
1051
  REQ_BGL = False
1052

    
1053
  def CheckArguments(self):
1054
    pass
1055

    
1056
  def ExpandNames(self):
1057
    (self.op.node_uuid, self.op.node_name) = \
1058
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1059

    
1060
    self.share_locks = ShareAll()
1061
    self.needed_locks = {
1062
      locking.LEVEL_NODE: [self.op.node_uuid],
1063
      }
1064

    
1065
  def BuildHooksEnv(self):
1066
    """Build hooks env.
1067

1068
    This runs on the master, the primary and all the secondaries.
1069

1070
    """
1071
    return {
1072
      "NODE_NAME": self.op.node_name,
1073
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1074
      }
1075

    
1076
  def BuildHooksNodes(self):
1077
    """Build hooks nodes.
1078

1079
    """
1080
    nl = [self.cfg.GetMasterNode()]
1081
    return (nl, nl)
1082

    
1083
  def CheckPrereq(self):
1084
    pass
1085

    
1086
  def Exec(self, feedback_fn):
1087
    # Prepare jobs for migration instances
1088
    jobs = [
1089
      [opcodes.OpInstanceMigrate(
1090
        instance_name=inst.name,
1091
        mode=self.op.mode,
1092
        live=self.op.live,
1093
        iallocator=self.op.iallocator,
1094
        target_node=self.op.target_node,
1095
        allow_runtime_changes=self.op.allow_runtime_changes,
1096
        ignore_ipolicy=self.op.ignore_ipolicy)]
1097
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1098

    
1099
    # TODO: Run iallocator in this opcode and pass correct placement options to
1100
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1101
    # running the iallocator and the actual migration, a good consistency model
1102
    # will have to be found.
1103

    
1104
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1105
            frozenset([self.op.node_uuid]))
1106

    
1107
    return ResultWithJobs(jobs)
1108

    
1109

    
1110
def _GetStorageTypeArgs(cfg, storage_type):
1111
  """Returns the arguments for a storage type.
1112

1113
  """
1114
  # Special case for file storage
1115

    
1116
  if storage_type == constants.ST_FILE:
1117
    return [[cfg.GetFileStorageDir()]]
1118
  elif storage_type == constants.ST_SHARED_FILE:
1119
    dts = cfg.GetClusterInfo().enabled_disk_templates
1120
    paths = []
1121
    if constants.DT_SHARED_FILE in dts:
1122
      paths.append(cfg.GetSharedFileStorageDir())
1123
    if constants.DT_GLUSTER in dts:
1124
      paths.append(cfg.GetGlusterStorageDir())
1125
    return [paths]
1126
  else:
1127
    return []
1128

    
1129

    
1130
class LUNodeModifyStorage(NoHooksLU):
1131
  """Logical unit for modifying a storage volume on a node.
1132

1133
  """
1134
  REQ_BGL = False
1135

    
1136
  def CheckArguments(self):
1137
    (self.op.node_uuid, self.op.node_name) = \
1138
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1139

    
1140
    storage_type = self.op.storage_type
1141

    
1142
    try:
1143
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1144
    except KeyError:
1145
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1146
                                 " modified" % storage_type,
1147
                                 errors.ECODE_INVAL)
1148

    
1149
    diff = set(self.op.changes.keys()) - modifiable
1150
    if diff:
1151
      raise errors.OpPrereqError("The following fields can not be modified for"
1152
                                 " storage units of type '%s': %r" %
1153
                                 (storage_type, list(diff)),
1154
                                 errors.ECODE_INVAL)
1155

    
1156
  def CheckPrereq(self):
1157
    """Check prerequisites.
1158

1159
    """
1160
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1161

    
1162
  def ExpandNames(self):
1163
    self.needed_locks = {
1164
      locking.LEVEL_NODE: self.op.node_uuid,
1165
      }
1166

    
1167
  def Exec(self, feedback_fn):
1168
    """Computes the list of nodes and their attributes.
1169

1170
    """
1171
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1172
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1173
                                          self.op.storage_type, st_args,
1174
                                          self.op.name, self.op.changes)
1175
    result.Raise("Failed to modify storage unit '%s' on %s" %
1176
                 (self.op.name, self.op.node_name))
1177

    
1178

    
1179
def _CheckOutputFields(fields, selected):
1180
  """Checks whether all selected fields are valid according to fields.
1181

1182
  @type fields: L{utils.FieldSet}
1183
  @param fields: fields set
1184
  @type selected: L{utils.FieldSet}
1185
  @param selected: fields set
1186

1187
  """
1188
  delta = fields.NonMatching(selected)
1189
  if delta:
1190
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1191
                               % ",".join(delta), errors.ECODE_INVAL)
1192

    
1193

    
1194
class LUNodeQueryvols(NoHooksLU):
1195
  """Logical unit for getting volumes on node(s).
1196

1197
  """
1198
  REQ_BGL = False
1199

    
1200
  def CheckArguments(self):
1201
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1202
                                      constants.VF_VG, constants.VF_NAME,
1203
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1204
                       self.op.output_fields)
1205

    
1206
  def ExpandNames(self):
1207
    self.share_locks = ShareAll()
1208

    
1209
    if self.op.nodes:
1210
      self.needed_locks = {
1211
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1212
        }
1213
    else:
1214
      self.needed_locks = {
1215
        locking.LEVEL_NODE: locking.ALL_SET,
1216
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1217
        }
1218

    
1219
  def Exec(self, feedback_fn):
1220
    """Computes the list of nodes and their attributes.
1221

1222
    """
1223
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1224
    volumes = self.rpc.call_node_volumes(node_uuids)
1225

    
1226
    ilist = self.cfg.GetAllInstancesInfo()
1227
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1228

    
1229
    output = []
1230
    for node_uuid in node_uuids:
1231
      nresult = volumes[node_uuid]
1232
      if nresult.offline:
1233
        continue
1234
      msg = nresult.fail_msg
1235
      if msg:
1236
        self.LogWarning("Can't compute volume data on node %s: %s",
1237
                        self.cfg.GetNodeName(node_uuid), msg)
1238
        continue
1239

    
1240
      node_vols = sorted(nresult.payload,
1241
                         key=operator.itemgetter(constants.VF_DEV))
1242

    
1243
      for vol in node_vols:
1244
        node_output = []
1245
        for field in self.op.output_fields:
1246
          if field == constants.VF_NODE:
1247
            val = self.cfg.GetNodeName(node_uuid)
1248
          elif field == constants.VF_PHYS:
1249
            val = vol[constants.VF_DEV]
1250
          elif field == constants.VF_VG:
1251
            val = vol[constants.VF_VG]
1252
          elif field == constants.VF_NAME:
1253
            val = vol[constants.VF_NAME]
1254
          elif field == constants.VF_SIZE:
1255
            val = int(float(vol[constants.VF_SIZE]))
1256
          elif field == constants.VF_INSTANCE:
1257
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1258
                                 vol[constants.VF_NAME]), None)
1259
            if inst is not None:
1260
              val = inst.name
1261
            else:
1262
              val = "-"
1263
          else:
1264
            raise errors.ParameterError(field)
1265
          node_output.append(str(val))
1266

    
1267
        output.append(node_output)
1268

    
1269
    return output
1270

    
1271

    
1272
class LUNodeQueryStorage(NoHooksLU):
1273
  """Logical unit for getting information on storage units on node(s).
1274

1275
  """
1276
  REQ_BGL = False
1277

    
1278
  def CheckArguments(self):
1279
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1280
                       self.op.output_fields)
1281

    
1282
  def ExpandNames(self):
1283
    self.share_locks = ShareAll()
1284

    
1285
    if self.op.nodes:
1286
      self.needed_locks = {
1287
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1288
        }
1289
    else:
1290
      self.needed_locks = {
1291
        locking.LEVEL_NODE: locking.ALL_SET,
1292
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1293
        }
1294

    
1295
  def _DetermineStorageType(self):
1296
    """Determines the default storage type of the cluster.
1297

1298
    """
1299
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1300
    default_storage_type = \
1301
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1302
    return default_storage_type
1303

    
1304
  def CheckPrereq(self):
1305
    """Check prerequisites.
1306

1307
    """
1308
    if self.op.storage_type:
1309
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1310
      self.storage_type = self.op.storage_type
1311
    else:
1312
      self.storage_type = self._DetermineStorageType()
1313
      supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1314
      if self.storage_type not in supported_storage_types:
1315
        raise errors.OpPrereqError(
1316
            "Storage reporting for storage type '%s' is not supported. Please"
1317
            " use the --storage-type option to specify one of the supported"
1318
            " storage types (%s) or set the default disk template to one that"
1319
            " supports storage reporting." %
1320
            (self.storage_type, utils.CommaJoin(supported_storage_types)))
1321

    
1322
  def Exec(self, feedback_fn):
1323
    """Computes the list of nodes and their attributes.
1324

1325
    """
1326
    if self.op.storage_type:
1327
      self.storage_type = self.op.storage_type
1328
    else:
1329
      self.storage_type = self._DetermineStorageType()
1330

    
1331
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1332

    
1333
    # Always get name to sort by
1334
    if constants.SF_NAME in self.op.output_fields:
1335
      fields = self.op.output_fields[:]
1336
    else:
1337
      fields = [constants.SF_NAME] + self.op.output_fields
1338

    
1339
    # Never ask for node or type as it's only known to the LU
1340
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1341
      while extra in fields:
1342
        fields.remove(extra)
1343

    
1344
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1345
    name_idx = field_idx[constants.SF_NAME]
1346

    
1347
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1348
    data = self.rpc.call_storage_list(self.node_uuids,
1349
                                      self.storage_type, st_args,
1350
                                      self.op.name, fields)
1351

    
1352
    result = []
1353

    
1354
    for node_uuid in utils.NiceSort(self.node_uuids):
1355
      node_name = self.cfg.GetNodeName(node_uuid)
1356
      nresult = data[node_uuid]
1357
      if nresult.offline:
1358
        continue
1359

    
1360
      msg = nresult.fail_msg
1361
      if msg:
1362
        self.LogWarning("Can't get storage data from node %s: %s",
1363
                        node_name, msg)
1364
        continue
1365

    
1366
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1367

    
1368
      for name in utils.NiceSort(rows.keys()):
1369
        row = rows[name]
1370

    
1371
        out = []
1372

    
1373
        for field in self.op.output_fields:
1374
          if field == constants.SF_NODE:
1375
            val = node_name
1376
          elif field == constants.SF_TYPE:
1377
            val = self.storage_type
1378
          elif field in field_idx:
1379
            val = row[field_idx[field]]
1380
          else:
1381
            raise errors.ParameterError(field)
1382

    
1383
          out.append(val)
1384

    
1385
        result.append(out)
1386

    
1387
    return result
1388

    
1389

    
1390
class LUNodeRemove(LogicalUnit):
1391
  """Logical unit for removing a node.
1392

1393
  """
1394
  HPATH = "node-remove"
1395
  HTYPE = constants.HTYPE_NODE
1396

    
1397
  def BuildHooksEnv(self):
1398
    """Build hooks env.
1399

1400
    """
1401
    return {
1402
      "OP_TARGET": self.op.node_name,
1403
      "NODE_NAME": self.op.node_name,
1404
      }
1405

    
1406
  def BuildHooksNodes(self):
1407
    """Build hooks nodes.
1408

1409
    This doesn't run on the target node in the pre phase as a failed
1410
    node would then be impossible to remove.
1411

1412
    """
1413
    all_nodes = self.cfg.GetNodeList()
1414
    try:
1415
      all_nodes.remove(self.op.node_uuid)
1416
    except ValueError:
1417
      pass
1418
    return (all_nodes, all_nodes)
1419

    
1420
  def CheckPrereq(self):
1421
    """Check prerequisites.
1422

1423
    This checks:
1424
     - the node exists in the configuration
1425
     - it does not have primary or secondary instances
1426
     - it's not the master
1427

1428
    Any errors are signaled by raising errors.OpPrereqError.
1429

1430
    """
1431
    (self.op.node_uuid, self.op.node_name) = \
1432
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1433
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1434
    assert node is not None
1435

    
1436
    masternode = self.cfg.GetMasterNode()
1437
    if node.uuid == masternode:
1438
      raise errors.OpPrereqError("Node is the master node, failover to another"
1439
                                 " node is required", errors.ECODE_INVAL)
1440

    
1441
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1442
      if node.uuid in instance.all_nodes:
1443
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1444
                                   " please remove first" % instance.name,
1445
                                   errors.ECODE_INVAL)
1446
    self.op.node_name = node.name
1447
    self.node = node
1448

    
1449
  def Exec(self, feedback_fn):
1450
    """Removes the node from the cluster.
1451

1452
    """
1453
    logging.info("Stopping the node daemon and removing configs from node %s",
1454
                 self.node.name)
1455

    
1456
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1457

    
1458
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1459
      "Not owning BGL"
1460

    
1461
    # Promote nodes to master candidate as needed
1462
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1463
    self.context.RemoveNode(self.node)
1464

    
1465
    # Run post hooks on the node before it's removed
1466
    RunPostHook(self, self.node.name)
1467

    
1468
    # we have to call this by name rather than by UUID, as the node is no longer
1469
    # in the config
1470
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1471
    msg = result.fail_msg
1472
    if msg:
1473
      self.LogWarning("Errors encountered on the remote node while leaving"
1474
                      " the cluster: %s", msg)
1475

    
1476
    # Remove node from our /etc/hosts
1477
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1478
      master_node_uuid = self.cfg.GetMasterNode()
1479
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1480
                                              constants.ETC_HOSTS_REMOVE,
1481
                                              self.node.name, None)
1482
      result.Raise("Can't update hosts file with new host data")
1483
      RedistributeAncillaryFiles(self)
1484

    
1485

    
1486
class LURepairNodeStorage(NoHooksLU):
1487
  """Repairs the volume group on a node.
1488

1489
  """
1490
  REQ_BGL = False
1491

    
1492
  def CheckArguments(self):
1493
    (self.op.node_uuid, self.op.node_name) = \
1494
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1495

    
1496
    storage_type = self.op.storage_type
1497

    
1498
    if (constants.SO_FIX_CONSISTENCY not in
1499
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1500
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1501
                                 " repaired" % storage_type,
1502
                                 errors.ECODE_INVAL)
1503

    
1504
  def ExpandNames(self):
1505
    self.needed_locks = {
1506
      locking.LEVEL_NODE: [self.op.node_uuid],
1507
      }
1508

    
1509
  def _CheckFaultyDisks(self, instance, node_uuid):
1510
    """Ensure faulty disks abort the opcode or at least warn."""
1511
    try:
1512
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1513
                                 node_uuid, True):
1514
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1515
                                   " node '%s'" %
1516
                                   (instance.name,
1517
                                    self.cfg.GetNodeName(node_uuid)),
1518
                                   errors.ECODE_STATE)
1519
    except errors.OpPrereqError, err:
1520
      if self.op.ignore_consistency:
1521
        self.LogWarning(str(err.args[0]))
1522
      else:
1523
        raise
1524

    
1525
  def CheckPrereq(self):
1526
    """Check prerequisites.
1527

1528
    """
1529
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1530

    
1531
    # Check whether any instance on this node has faulty disks
1532
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1533
      if not inst.disks_active:
1534
        continue
1535
      check_nodes = set(inst.all_nodes)
1536
      check_nodes.discard(self.op.node_uuid)
1537
      for inst_node_uuid in check_nodes:
1538
        self._CheckFaultyDisks(inst, inst_node_uuid)
1539

    
1540
  def Exec(self, feedback_fn):
1541
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1542
                (self.op.name, self.op.node_name))
1543

    
1544
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1545
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1546
                                           self.op.storage_type, st_args,
1547
                                           self.op.name,
1548
                                           constants.SO_FIX_CONSISTENCY)
1549
    result.Raise("Failed to repair storage unit '%s' on %s" %
1550
                 (self.op.name, self.op.node_name))