Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 4922cd73

History | View | Annotate | Download (60.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes)
142

    
143
  def PreparePostHookNodes(self, post_hook_node_uuids):
144
    return post_hook_node_uuids + [self.new_node.uuid]
145

    
146
  def CheckPrereq(self):
147
    """Check prerequisites.
148

149
    This checks:
150
     - the new node is not already in the config
151
     - it is resolvable
152
     - its parameters (single/dual homed) matches the cluster
153

154
    Any errors are signaled by raising errors.OpPrereqError.
155

156
    """
157
    node_name = self.hostname.name
158
    self.op.primary_ip = self.hostname.ip
159
    if self.op.secondary_ip is None:
160
      if self.primary_ip_family == netutils.IP6Address.family:
161
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
162
                                   " IPv4 address must be given as secondary",
163
                                   errors.ECODE_INVAL)
164
      self.op.secondary_ip = self.op.primary_ip
165

    
166
    secondary_ip = self.op.secondary_ip
167
    if not netutils.IP4Address.IsValid(secondary_ip):
168
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
169
                                 " address" % secondary_ip, errors.ECODE_INVAL)
170

    
171
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
172
    if not self.op.readd and existing_node_info is not None:
173
      raise errors.OpPrereqError("Node %s is already in the configuration" %
174
                                 node_name, errors.ECODE_EXISTS)
175
    elif self.op.readd and existing_node_info is None:
176
      raise errors.OpPrereqError("Node %s is not in the configuration" %
177
                                 node_name, errors.ECODE_NOENT)
178

    
179
    self.changed_primary_ip = False
180

    
181
    for existing_node in self.cfg.GetAllNodesInfo().values():
182
      if self.op.readd and node_name == existing_node.name:
183
        if existing_node.secondary_ip != secondary_ip:
184
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
185
                                     " address configuration as before",
186
                                     errors.ECODE_INVAL)
187
        if existing_node.primary_ip != self.op.primary_ip:
188
          self.changed_primary_ip = True
189

    
190
        continue
191

    
192
      if (existing_node.primary_ip == self.op.primary_ip or
193
          existing_node.secondary_ip == self.op.primary_ip or
194
          existing_node.primary_ip == secondary_ip or
195
          existing_node.secondary_ip == secondary_ip):
196
        raise errors.OpPrereqError("New node ip address(es) conflict with"
197
                                   " existing node %s" % existing_node.name,
198
                                   errors.ECODE_NOTUNIQUE)
199

    
200
    # After this 'if' block, None is no longer a valid value for the
201
    # _capable op attributes
202
    if self.op.readd:
203
      assert existing_node_info is not None, \
204
        "Can't retrieve locked node %s" % node_name
205
      for attr in self._NFLAGS:
206
        if getattr(self.op, attr) is None:
207
          setattr(self.op, attr, getattr(existing_node_info, attr))
208
    else:
209
      for attr in self._NFLAGS:
210
        if getattr(self.op, attr) is None:
211
          setattr(self.op, attr, True)
212

    
213
    if self.op.readd and not self.op.vm_capable:
214
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
215
      if pri or sec:
216
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
217
                                   " flag set to false, but it already holds"
218
                                   " instances" % node_name,
219
                                   errors.ECODE_STATE)
220

    
221
    # check that the type of the node (single versus dual homed) is the
222
    # same as for the master
223
    myself = self.cfg.GetMasterNodeInfo()
224
    master_singlehomed = myself.secondary_ip == myself.primary_ip
225
    newbie_singlehomed = secondary_ip == self.op.primary_ip
226
    if master_singlehomed != newbie_singlehomed:
227
      if master_singlehomed:
228
        raise errors.OpPrereqError("The master has no secondary ip but the"
229
                                   " new node has one",
230
                                   errors.ECODE_INVAL)
231
      else:
232
        raise errors.OpPrereqError("The master has a secondary ip but the"
233
                                   " new node doesn't have one",
234
                                   errors.ECODE_INVAL)
235

    
236
    # checks reachability
237
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
238
      raise errors.OpPrereqError("Node not reachable by ping",
239
                                 errors.ECODE_ENVIRON)
240

    
241
    if not newbie_singlehomed:
242
      # check reachability from my secondary ip to newbie's secondary ip
243
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
244
                              source=myself.secondary_ip):
245
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
246
                                   " based ping to node daemon port",
247
                                   errors.ECODE_ENVIRON)
248

    
249
    if self.op.readd:
250
      exceptions = [existing_node_info.uuid]
251
    else:
252
      exceptions = []
253

    
254
    if self.op.master_capable:
255
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
256
    else:
257
      self.master_candidate = False
258

    
259
    if self.op.readd:
260
      self.new_node = existing_node_info
261
    else:
262
      node_group = self.cfg.LookupNodeGroup(self.op.group)
263
      self.new_node = objects.Node(name=node_name,
264
                                   primary_ip=self.op.primary_ip,
265
                                   secondary_ip=secondary_ip,
266
                                   master_candidate=self.master_candidate,
267
                                   offline=False, drained=False,
268
                                   group=node_group, ndparams={})
269

    
270
    if self.op.ndparams:
271
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
272
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
273
                           "node", "cluster or group")
274

    
275
    if self.op.hv_state:
276
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
277

    
278
    if self.op.disk_state:
279
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
280

    
281
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
282
    #       it a property on the base class.
283
    rpcrunner = rpc.DnsOnlyRunner()
284
    result = rpcrunner.call_version([node_name])[node_name]
285
    result.Raise("Can't get version information from node %s" % node_name,
286
                 prereq=True)
287
    if constants.PROTOCOL_VERSION == result.payload:
288
      logging.info("Communication to node %s fine, sw version %s match",
289
                   node_name, result.payload)
290
    else:
291
      raise errors.OpPrereqError("Version mismatch master version %s,"
292
                                 " node version %s" %
293
                                 (constants.PROTOCOL_VERSION, result.payload),
294
                                 errors.ECODE_ENVIRON)
295

    
296
    vg_name = self.cfg.GetVGName()
297
    if vg_name is not None:
298
      vparams = {constants.NV_PVLIST: [vg_name]}
299
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
300
      cname = self.cfg.GetClusterName()
301
      result = rpcrunner.call_node_verify_light(
302
          [node_name], vparams, cname,
303
          self.cfg.GetClusterInfo().hvparams)[node_name]
304
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
305
      if errmsgs:
306
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
307
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
308

    
309
  def _InitOpenVSwitch(self):
310
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
311
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
312

    
313
    ovs = filled_ndparams.get(constants.ND_OVS, None)
314
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
315
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
316

    
317
    if ovs:
318
      if not ovs_link:
319
        self.LogInfo("No physical interface for OpenvSwitch was given."
320
                     " OpenvSwitch will not have an outside connection. This"
321
                     " might not be what you want.")
322

    
323
      result = self.rpc.call_node_configure_ovs(
324
                 self.new_node.name, ovs_name, ovs_link)
325
      result.Raise("Failed to initialize OpenVSwitch on new node")
326

    
327
  def Exec(self, feedback_fn):
328
    """Adds the new node to the cluster.
329

330
    """
331
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
332
      "Not owning BGL"
333

    
334
    # We adding a new node so we assume it's powered
335
    self.new_node.powered = True
336

    
337
    # for re-adds, reset the offline/drained/master-candidate flags;
338
    # we need to reset here, otherwise offline would prevent RPC calls
339
    # later in the procedure; this also means that if the re-add
340
    # fails, we are left with a non-offlined, broken node
341
    if self.op.readd:
342
      self.new_node.offline = False
343
      self.new_node.drained = False
344
      self.LogInfo("Readding a node, the offline/drained flags were reset")
345
      # if we demote the node, we do cleanup later in the procedure
346
      self.new_node.master_candidate = self.master_candidate
347
      if self.changed_primary_ip:
348
        self.new_node.primary_ip = self.op.primary_ip
349

    
350
    # copy the master/vm_capable flags
351
    for attr in self._NFLAGS:
352
      setattr(self.new_node, attr, getattr(self.op, attr))
353

    
354
    # notify the user about any possible mc promotion
355
    if self.new_node.master_candidate:
356
      self.LogInfo("Node will be a master candidate")
357

    
358
    if self.op.ndparams:
359
      self.new_node.ndparams = self.op.ndparams
360
    else:
361
      self.new_node.ndparams = {}
362

    
363
    if self.op.hv_state:
364
      self.new_node.hv_state_static = self.new_hv_state
365

    
366
    if self.op.disk_state:
367
      self.new_node.disk_state_static = self.new_disk_state
368

    
369
    # Add node to our /etc/hosts, and add key to known_hosts
370
    if self.cfg.GetClusterInfo().modify_etc_hosts:
371
      master_node = self.cfg.GetMasterNode()
372
      result = self.rpc.call_etc_hosts_modify(
373
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
374
                 self.hostname.ip)
375
      result.Raise("Can't update hosts file with new host data")
376

    
377
    if self.new_node.secondary_ip != self.new_node.primary_ip:
378
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
379
                               False)
380

    
381
    node_verifier_uuids = [self.cfg.GetMasterNode()]
382
    node_verify_param = {
383
      constants.NV_NODELIST: ([self.new_node.name], {}),
384
      # TODO: do a node-net-test as well?
385
    }
386

    
387
    result = self.rpc.call_node_verify(
388
               node_verifier_uuids, node_verify_param,
389
               self.cfg.GetClusterName(),
390
               self.cfg.GetClusterInfo().hvparams)
391
    for verifier in node_verifier_uuids:
392
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
393
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
394
      if nl_payload:
395
        for failed in nl_payload:
396
          feedback_fn("ssh/hostname verification failed"
397
                      " (checking from %s): %s" %
398
                      (verifier, nl_payload[failed]))
399
        raise errors.OpExecError("ssh/hostname verification failed")
400

    
401
    self._InitOpenVSwitch()
402

    
403
    if self.op.readd:
404
      self.context.ReaddNode(self.new_node)
405
      RedistributeAncillaryFiles(self)
406
      # make sure we redistribute the config
407
      self.cfg.Update(self.new_node, feedback_fn)
408
      # and make sure the new node will not have old files around
409
      if not self.new_node.master_candidate:
410
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
411
        result.Warn("Node failed to demote itself from master candidate status",
412
                    self.LogWarning)
413
    else:
414
      self.context.AddNode(self.new_node, self.proc.GetECId())
415
      RedistributeAncillaryFiles(self)
416

    
417

    
418
class LUNodeSetParams(LogicalUnit):
419
  """Modifies the parameters of a node.
420

421
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
422
      to the node role (as _ROLE_*)
423
  @cvar _R2F: a dictionary from node role to tuples of flags
424
  @cvar _FLAGS: a list of attribute names corresponding to the flags
425

426
  """
427
  HPATH = "node-modify"
428
  HTYPE = constants.HTYPE_NODE
429
  REQ_BGL = False
430
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
431
  _F2R = {
432
    (True, False, False): _ROLE_CANDIDATE,
433
    (False, True, False): _ROLE_DRAINED,
434
    (False, False, True): _ROLE_OFFLINE,
435
    (False, False, False): _ROLE_REGULAR,
436
    }
437
  _R2F = dict((v, k) for k, v in _F2R.items())
438
  _FLAGS = ["master_candidate", "drained", "offline"]
439

    
440
  def CheckArguments(self):
441
    (self.op.node_uuid, self.op.node_name) = \
442
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
443
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
444
                self.op.master_capable, self.op.vm_capable,
445
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
446
                self.op.disk_state]
447
    if all_mods.count(None) == len(all_mods):
448
      raise errors.OpPrereqError("Please pass at least one modification",
449
                                 errors.ECODE_INVAL)
450
    if all_mods.count(True) > 1:
451
      raise errors.OpPrereqError("Can't set the node into more than one"
452
                                 " state at the same time",
453
                                 errors.ECODE_INVAL)
454

    
455
    # Boolean value that tells us whether we might be demoting from MC
456
    self.might_demote = (self.op.master_candidate is False or
457
                         self.op.offline is True or
458
                         self.op.drained is True or
459
                         self.op.master_capable is False)
460

    
461
    if self.op.secondary_ip:
462
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
463
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
464
                                   " address" % self.op.secondary_ip,
465
                                   errors.ECODE_INVAL)
466

    
467
    self.lock_all = self.op.auto_promote and self.might_demote
468
    self.lock_instances = self.op.secondary_ip is not None
469

    
470
  def _InstanceFilter(self, instance):
471
    """Filter for getting affected instances.
472

473
    """
474
    return (instance.disk_template in constants.DTS_INT_MIRROR and
475
            self.op.node_uuid in instance.all_nodes)
476

    
477
  def ExpandNames(self):
478
    if self.lock_all:
479
      self.needed_locks = {
480
        locking.LEVEL_NODE: locking.ALL_SET,
481

    
482
        # Block allocations when all nodes are locked
483
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
484
        }
485
    else:
486
      self.needed_locks = {
487
        locking.LEVEL_NODE: self.op.node_uuid,
488
        }
489

    
490
    # Since modifying a node can have severe effects on currently running
491
    # operations the resource lock is at least acquired in shared mode
492
    self.needed_locks[locking.LEVEL_NODE_RES] = \
493
      self.needed_locks[locking.LEVEL_NODE]
494

    
495
    # Get all locks except nodes in shared mode; they are not used for anything
496
    # but read-only access
497
    self.share_locks = ShareAll()
498
    self.share_locks[locking.LEVEL_NODE] = 0
499
    self.share_locks[locking.LEVEL_NODE_RES] = 0
500
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
501

    
502
    if self.lock_instances:
503
      self.needed_locks[locking.LEVEL_INSTANCE] = \
504
        self.cfg.GetInstanceNames(
505
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
506

    
507
  def BuildHooksEnv(self):
508
    """Build hooks env.
509

510
    This runs on the master node.
511

512
    """
513
    return {
514
      "OP_TARGET": self.op.node_name,
515
      "MASTER_CANDIDATE": str(self.op.master_candidate),
516
      "OFFLINE": str(self.op.offline),
517
      "DRAINED": str(self.op.drained),
518
      "MASTER_CAPABLE": str(self.op.master_capable),
519
      "VM_CAPABLE": str(self.op.vm_capable),
520
      }
521

    
522
  def BuildHooksNodes(self):
523
    """Build hooks nodes.
524

525
    """
526
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
527
    return (nl, nl)
528

    
529
  def CheckPrereq(self):
530
    """Check prerequisites.
531

532
    This only checks the instance list against the existing names.
533

534
    """
535
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
536
    if self.lock_instances:
537
      affected_instances = \
538
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
539

    
540
      # Verify instance locks
541
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
542
      wanted_instance_names = frozenset([inst.name for inst in
543
                                         affected_instances.values()])
544
      if wanted_instance_names - owned_instance_names:
545
        raise errors.OpPrereqError("Instances affected by changing node %s's"
546
                                   " secondary IP address have changed since"
547
                                   " locks were acquired, wanted '%s', have"
548
                                   " '%s'; retry the operation" %
549
                                   (node.name,
550
                                    utils.CommaJoin(wanted_instance_names),
551
                                    utils.CommaJoin(owned_instance_names)),
552
                                   errors.ECODE_STATE)
553
    else:
554
      affected_instances = None
555

    
556
    if (self.op.master_candidate is not None or
557
        self.op.drained is not None or
558
        self.op.offline is not None):
559
      # we can't change the master's node flags
560
      if node.uuid == self.cfg.GetMasterNode():
561
        raise errors.OpPrereqError("The master role can be changed"
562
                                   " only via master-failover",
563
                                   errors.ECODE_INVAL)
564

    
565
    if self.op.master_candidate and not node.master_capable:
566
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
567
                                 " it a master candidate" % node.name,
568
                                 errors.ECODE_STATE)
569

    
570
    if self.op.vm_capable is False:
571
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
572
      if ipri or isec:
573
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
574
                                   " the vm_capable flag" % node.name,
575
                                   errors.ECODE_STATE)
576

    
577
    if node.master_candidate and self.might_demote and not self.lock_all:
578
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
579
      # check if after removing the current node, we're missing master
580
      # candidates
581
      (mc_remaining, mc_should, _) = \
582
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
583
      if mc_remaining < mc_should:
584
        raise errors.OpPrereqError("Not enough master candidates, please"
585
                                   " pass auto promote option to allow"
586
                                   " promotion (--auto-promote or RAPI"
587
                                   " auto_promote=True)", errors.ECODE_STATE)
588

    
589
    self.old_flags = old_flags = (node.master_candidate,
590
                                  node.drained, node.offline)
591
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
592
    self.old_role = old_role = self._F2R[old_flags]
593

    
594
    # Check for ineffective changes
595
    for attr in self._FLAGS:
596
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
597
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
598
        setattr(self.op, attr, None)
599

    
600
    # Past this point, any flag change to False means a transition
601
    # away from the respective state, as only real changes are kept
602

    
603
    # TODO: We might query the real power state if it supports OOB
604
    if SupportsOob(self.cfg, node):
605
      if self.op.offline is False and not (node.powered or
606
                                           self.op.powered is True):
607
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
608
                                    " offline status can be reset") %
609
                                   self.op.node_name, errors.ECODE_STATE)
610
    elif self.op.powered is not None:
611
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
612
                                  " as it does not support out-of-band"
613
                                  " handling") % self.op.node_name,
614
                                 errors.ECODE_STATE)
615

    
616
    # If we're being deofflined/drained, we'll MC ourself if needed
617
    if (self.op.drained is False or self.op.offline is False or
618
        (self.op.master_capable and not node.master_capable)):
619
      if _DecideSelfPromotion(self):
620
        self.op.master_candidate = True
621
        self.LogInfo("Auto-promoting node to master candidate")
622

    
623
    # If we're no longer master capable, we'll demote ourselves from MC
624
    if self.op.master_capable is False and node.master_candidate:
625
      self.LogInfo("Demoting from master candidate")
626
      self.op.master_candidate = False
627

    
628
    # Compute new role
629
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
630
    if self.op.master_candidate:
631
      new_role = self._ROLE_CANDIDATE
632
    elif self.op.drained:
633
      new_role = self._ROLE_DRAINED
634
    elif self.op.offline:
635
      new_role = self._ROLE_OFFLINE
636
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
637
      # False is still in new flags, which means we're un-setting (the
638
      # only) True flag
639
      new_role = self._ROLE_REGULAR
640
    else: # no new flags, nothing, keep old role
641
      new_role = old_role
642

    
643
    self.new_role = new_role
644

    
645
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
646
      # Trying to transition out of offline status
647
      result = self.rpc.call_version([node.uuid])[node.uuid]
648
      if result.fail_msg:
649
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
650
                                   " to report its version: %s" %
651
                                   (node.name, result.fail_msg),
652
                                   errors.ECODE_STATE)
653
      else:
654
        self.LogWarning("Transitioning node from offline to online state"
655
                        " without using re-add. Please make sure the node"
656
                        " is healthy!")
657

    
658
    # When changing the secondary ip, verify if this is a single-homed to
659
    # multi-homed transition or vice versa, and apply the relevant
660
    # restrictions.
661
    if self.op.secondary_ip:
662
      # Ok even without locking, because this can't be changed by any LU
663
      master = self.cfg.GetMasterNodeInfo()
664
      master_singlehomed = master.secondary_ip == master.primary_ip
665
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
666
        if self.op.force and node.uuid == master.uuid:
667
          self.LogWarning("Transitioning from single-homed to multi-homed"
668
                          " cluster; all nodes will require a secondary IP"
669
                          " address")
670
        else:
671
          raise errors.OpPrereqError("Changing the secondary ip on a"
672
                                     " single-homed cluster requires the"
673
                                     " --force option to be passed, and the"
674
                                     " target node to be the master",
675
                                     errors.ECODE_INVAL)
676
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
677
        if self.op.force and node.uuid == master.uuid:
678
          self.LogWarning("Transitioning from multi-homed to single-homed"
679
                          " cluster; secondary IP addresses will have to be"
680
                          " removed")
681
        else:
682
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
683
                                     " same as the primary IP on a multi-homed"
684
                                     " cluster, unless the --force option is"
685
                                     " passed, and the target node is the"
686
                                     " master", errors.ECODE_INVAL)
687

    
688
      assert not (set([inst.name for inst in affected_instances.values()]) -
689
                  self.owned_locks(locking.LEVEL_INSTANCE))
690

    
691
      if node.offline:
692
        if affected_instances:
693
          msg = ("Cannot change secondary IP address: offline node has"
694
                 " instances (%s) configured to use it" %
695
                 utils.CommaJoin(
696
                   [inst.name for inst in affected_instances.values()]))
697
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
698
      else:
699
        # On online nodes, check that no instances are running, and that
700
        # the node has the new ip and we can reach it.
701
        for instance in affected_instances.values():
702
          CheckInstanceState(self, instance, INSTANCE_DOWN,
703
                             msg="cannot change secondary ip")
704

    
705
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
706
        if master.uuid != node.uuid:
707
          # check reachability from master secondary ip to new secondary ip
708
          if not netutils.TcpPing(self.op.secondary_ip,
709
                                  constants.DEFAULT_NODED_PORT,
710
                                  source=master.secondary_ip):
711
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
712
                                       " based ping to node daemon port",
713
                                       errors.ECODE_ENVIRON)
714

    
715
    if self.op.ndparams:
716
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
717
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
718
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
719
                           "node", "cluster or group")
720
      self.new_ndparams = new_ndparams
721

    
722
    if self.op.hv_state:
723
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
724
                                                node.hv_state_static)
725

    
726
    if self.op.disk_state:
727
      self.new_disk_state = \
728
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
729

    
730
  def Exec(self, feedback_fn):
731
    """Modifies a node.
732

733
    """
734
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
735
    result = []
736

    
737
    if self.op.ndparams:
738
      node.ndparams = self.new_ndparams
739

    
740
    if self.op.powered is not None:
741
      node.powered = self.op.powered
742

    
743
    if self.op.hv_state:
744
      node.hv_state_static = self.new_hv_state
745

    
746
    if self.op.disk_state:
747
      node.disk_state_static = self.new_disk_state
748

    
749
    for attr in ["master_capable", "vm_capable"]:
750
      val = getattr(self.op, attr)
751
      if val is not None:
752
        setattr(node, attr, val)
753
        result.append((attr, str(val)))
754

    
755
    if self.new_role != self.old_role:
756
      # Tell the node to demote itself, if no longer MC and not offline
757
      if self.old_role == self._ROLE_CANDIDATE and \
758
          self.new_role != self._ROLE_OFFLINE:
759
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
760
        if msg:
761
          self.LogWarning("Node failed to demote itself: %s", msg)
762

    
763
      new_flags = self._R2F[self.new_role]
764
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
765
        if of != nf:
766
          result.append((desc, str(nf)))
767
      (node.master_candidate, node.drained, node.offline) = new_flags
768

    
769
      # we locked all nodes, we adjust the CP before updating this node
770
      if self.lock_all:
771
        AdjustCandidatePool(self, [node.uuid])
772

    
773
    if self.op.secondary_ip:
774
      node.secondary_ip = self.op.secondary_ip
775
      result.append(("secondary_ip", self.op.secondary_ip))
776

    
777
    # this will trigger configuration file update, if needed
778
    self.cfg.Update(node, feedback_fn)
779

    
780
    # this will trigger job queue propagation or cleanup if the mc
781
    # flag changed
782
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
783
      self.context.ReaddNode(node)
784

    
785
    return result
786

    
787

    
788
class LUNodePowercycle(NoHooksLU):
789
  """Powercycles a node.
790

791
  """
792
  REQ_BGL = False
793

    
794
  def CheckArguments(self):
795
    (self.op.node_uuid, self.op.node_name) = \
796
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
797

    
798
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
799
      raise errors.OpPrereqError("The node is the master and the force"
800
                                 " parameter was not set",
801
                                 errors.ECODE_INVAL)
802

    
803
  def ExpandNames(self):
804
    """Locking for PowercycleNode.
805

806
    This is a last-resort option and shouldn't block on other
807
    jobs. Therefore, we grab no locks.
808

809
    """
810
    self.needed_locks = {}
811

    
812
  def Exec(self, feedback_fn):
813
    """Reboots a node.
814

815
    """
816
    default_hypervisor = self.cfg.GetHypervisorType()
817
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
818
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
819
                                           default_hypervisor,
820
                                           hvparams)
821
    result.Raise("Failed to schedule the reboot")
822
    return result.payload
823

    
824

    
825
def _GetNodeInstancesInner(cfg, fn):
826
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
827

    
828

    
829
def _GetNodePrimaryInstances(cfg, node_uuid):
830
  """Returns primary instances on a node.
831

832
  """
833
  return _GetNodeInstancesInner(cfg,
834
                                lambda inst: node_uuid == inst.primary_node)
835

    
836

    
837
def _GetNodeSecondaryInstances(cfg, node_uuid):
838
  """Returns secondary instances on a node.
839

840
  """
841
  return _GetNodeInstancesInner(cfg,
842
                                lambda inst: node_uuid in inst.secondary_nodes)
843

    
844

    
845
def _GetNodeInstances(cfg, node_uuid):
846
  """Returns a list of all primary and secondary instances on a node.
847

848
  """
849

    
850
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
851

    
852

    
853
class LUNodeEvacuate(NoHooksLU):
854
  """Evacuates instances off a list of nodes.
855

856
  """
857
  REQ_BGL = False
858

    
859
  def CheckArguments(self):
860
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
861

    
862
  def ExpandNames(self):
863
    (self.op.node_uuid, self.op.node_name) = \
864
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
865

    
866
    if self.op.remote_node is not None:
867
      (self.op.remote_node_uuid, self.op.remote_node) = \
868
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
869
                              self.op.remote_node)
870
      assert self.op.remote_node
871

    
872
      if self.op.node_uuid == self.op.remote_node_uuid:
873
        raise errors.OpPrereqError("Can not use evacuated node as a new"
874
                                   " secondary node", errors.ECODE_INVAL)
875

    
876
      if self.op.mode != constants.NODE_EVAC_SEC:
877
        raise errors.OpPrereqError("Without the use of an iallocator only"
878
                                   " secondary instances can be evacuated",
879
                                   errors.ECODE_INVAL)
880

    
881
    # Declare locks
882
    self.share_locks = ShareAll()
883
    self.needed_locks = {
884
      locking.LEVEL_INSTANCE: [],
885
      locking.LEVEL_NODEGROUP: [],
886
      locking.LEVEL_NODE: [],
887
      }
888

    
889
    # Determine nodes (via group) optimistically, needs verification once locks
890
    # have been acquired
891
    self.lock_nodes = self._DetermineNodes()
892

    
893
  def _DetermineNodes(self):
894
    """Gets the list of node UUIDs to operate on.
895

896
    """
897
    if self.op.remote_node is None:
898
      # Iallocator will choose any node(s) in the same group
899
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
900
    else:
901
      group_nodes = frozenset([self.op.remote_node_uuid])
902

    
903
    # Determine nodes to be locked
904
    return set([self.op.node_uuid]) | group_nodes
905

    
906
  def _DetermineInstances(self):
907
    """Builds list of instances to operate on.
908

909
    """
910
    assert self.op.mode in constants.NODE_EVAC_MODES
911

    
912
    if self.op.mode == constants.NODE_EVAC_PRI:
913
      # Primary instances only
914
      inst_fn = _GetNodePrimaryInstances
915
      assert self.op.remote_node is None, \
916
        "Evacuating primary instances requires iallocator"
917
    elif self.op.mode == constants.NODE_EVAC_SEC:
918
      # Secondary instances only
919
      inst_fn = _GetNodeSecondaryInstances
920
    else:
921
      # All instances
922
      assert self.op.mode == constants.NODE_EVAC_ALL
923
      inst_fn = _GetNodeInstances
924
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
925
      # per instance
926
      raise errors.OpPrereqError("Due to an issue with the iallocator"
927
                                 " interface it is not possible to evacuate"
928
                                 " all instances at once; specify explicitly"
929
                                 " whether to evacuate primary or secondary"
930
                                 " instances",
931
                                 errors.ECODE_INVAL)
932

    
933
    return inst_fn(self.cfg, self.op.node_uuid)
934

    
935
  def DeclareLocks(self, level):
936
    if level == locking.LEVEL_INSTANCE:
937
      # Lock instances optimistically, needs verification once node and group
938
      # locks have been acquired
939
      self.needed_locks[locking.LEVEL_INSTANCE] = \
940
        set(i.name for i in self._DetermineInstances())
941

    
942
    elif level == locking.LEVEL_NODEGROUP:
943
      # Lock node groups for all potential target nodes optimistically, needs
944
      # verification once nodes have been acquired
945
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
946
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
947

    
948
    elif level == locking.LEVEL_NODE:
949
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
950

    
951
  def CheckPrereq(self):
952
    # Verify locks
953
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
954
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
955
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
956

    
957
    need_nodes = self._DetermineNodes()
958

    
959
    if not owned_nodes.issuperset(need_nodes):
960
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
961
                                 " locks were acquired, current nodes are"
962
                                 " are '%s', used to be '%s'; retry the"
963
                                 " operation" %
964
                                 (self.op.node_name,
965
                                  utils.CommaJoin(need_nodes),
966
                                  utils.CommaJoin(owned_nodes)),
967
                                 errors.ECODE_STATE)
968

    
969
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
970
    if owned_groups != wanted_groups:
971
      raise errors.OpExecError("Node groups changed since locks were acquired,"
972
                               " current groups are '%s', used to be '%s';"
973
                               " retry the operation" %
974
                               (utils.CommaJoin(wanted_groups),
975
                                utils.CommaJoin(owned_groups)))
976

    
977
    # Determine affected instances
978
    self.instances = self._DetermineInstances()
979
    self.instance_names = [i.name for i in self.instances]
980

    
981
    if set(self.instance_names) != owned_instance_names:
982
      raise errors.OpExecError("Instances on node '%s' changed since locks"
983
                               " were acquired, current instances are '%s',"
984
                               " used to be '%s'; retry the operation" %
985
                               (self.op.node_name,
986
                                utils.CommaJoin(self.instance_names),
987
                                utils.CommaJoin(owned_instance_names)))
988

    
989
    if self.instance_names:
990
      self.LogInfo("Evacuating instances from node '%s': %s",
991
                   self.op.node_name,
992
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
993
    else:
994
      self.LogInfo("No instances to evacuate from node '%s'",
995
                   self.op.node_name)
996

    
997
    if self.op.remote_node is not None:
998
      for i in self.instances:
999
        if i.primary_node == self.op.remote_node_uuid:
1000
          raise errors.OpPrereqError("Node %s is the primary node of"
1001
                                     " instance %s, cannot use it as"
1002
                                     " secondary" %
1003
                                     (self.op.remote_node, i.name),
1004
                                     errors.ECODE_INVAL)
1005

    
1006
  def Exec(self, feedback_fn):
1007
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1008

    
1009
    if not self.instance_names:
1010
      # No instances to evacuate
1011
      jobs = []
1012

    
1013
    elif self.op.iallocator is not None:
1014
      # TODO: Implement relocation to other group
1015
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1016
                                     instances=list(self.instance_names))
1017
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1018

    
1019
      ial.Run(self.op.iallocator)
1020

    
1021
      if not ial.success:
1022
        raise errors.OpPrereqError("Can't compute node evacuation using"
1023
                                   " iallocator '%s': %s" %
1024
                                   (self.op.iallocator, ial.info),
1025
                                   errors.ECODE_NORES)
1026

    
1027
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1028

    
1029
    elif self.op.remote_node is not None:
1030
      assert self.op.mode == constants.NODE_EVAC_SEC
1031
      jobs = [
1032
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1033
                                        remote_node=self.op.remote_node,
1034
                                        disks=[],
1035
                                        mode=constants.REPLACE_DISK_CHG,
1036
                                        early_release=self.op.early_release)]
1037
        for instance_name in self.instance_names]
1038

    
1039
    else:
1040
      raise errors.ProgrammerError("No iallocator or remote node")
1041

    
1042
    return ResultWithJobs(jobs)
1043

    
1044

    
1045
class LUNodeMigrate(LogicalUnit):
1046
  """Migrate all instances from a node.
1047

1048
  """
1049
  HPATH = "node-migrate"
1050
  HTYPE = constants.HTYPE_NODE
1051
  REQ_BGL = False
1052

    
1053
  def CheckArguments(self):
1054
    pass
1055

    
1056
  def ExpandNames(self):
1057
    (self.op.node_uuid, self.op.node_name) = \
1058
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1059

    
1060
    self.share_locks = ShareAll()
1061
    self.needed_locks = {
1062
      locking.LEVEL_NODE: [self.op.node_uuid],
1063
      }
1064

    
1065
  def BuildHooksEnv(self):
1066
    """Build hooks env.
1067

1068
    This runs on the master, the primary and all the secondaries.
1069

1070
    """
1071
    return {
1072
      "NODE_NAME": self.op.node_name,
1073
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1074
      }
1075

    
1076
  def BuildHooksNodes(self):
1077
    """Build hooks nodes.
1078

1079
    """
1080
    nl = [self.cfg.GetMasterNode()]
1081
    return (nl, nl)
1082

    
1083
  def CheckPrereq(self):
1084
    pass
1085

    
1086
  def Exec(self, feedback_fn):
1087
    # Prepare jobs for migration instances
1088
    jobs = [
1089
      [opcodes.OpInstanceMigrate(
1090
        instance_name=inst.name,
1091
        mode=self.op.mode,
1092
        live=self.op.live,
1093
        iallocator=self.op.iallocator,
1094
        target_node=self.op.target_node,
1095
        allow_runtime_changes=self.op.allow_runtime_changes,
1096
        ignore_ipolicy=self.op.ignore_ipolicy)]
1097
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1098

    
1099
    # TODO: Run iallocator in this opcode and pass correct placement options to
1100
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1101
    # running the iallocator and the actual migration, a good consistency model
1102
    # will have to be found.
1103

    
1104
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1105
            frozenset([self.op.node_uuid]))
1106

    
1107
    return ResultWithJobs(jobs)
1108

    
1109

    
1110
def _GetStorageTypeArgs(cfg, storage_type):
1111
  """Returns the arguments for a storage type.
1112

1113
  """
1114
  # Special case for file storage
1115
  if storage_type == constants.ST_FILE:
1116
    # storage.FileStorage wants a list of storage directories
1117
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1118

    
1119
  return []
1120

    
1121

    
1122
class LUNodeModifyStorage(NoHooksLU):
1123
  """Logical unit for modifying a storage volume on a node.
1124

1125
  """
1126
  REQ_BGL = False
1127

    
1128
  def CheckArguments(self):
1129
    (self.op.node_uuid, self.op.node_name) = \
1130
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1131

    
1132
    storage_type = self.op.storage_type
1133

    
1134
    try:
1135
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1136
    except KeyError:
1137
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1138
                                 " modified" % storage_type,
1139
                                 errors.ECODE_INVAL)
1140

    
1141
    diff = set(self.op.changes.keys()) - modifiable
1142
    if diff:
1143
      raise errors.OpPrereqError("The following fields can not be modified for"
1144
                                 " storage units of type '%s': %r" %
1145
                                 (storage_type, list(diff)),
1146
                                 errors.ECODE_INVAL)
1147

    
1148
  def CheckPrereq(self):
1149
    """Check prerequisites.
1150

1151
    """
1152
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1153

    
1154
  def ExpandNames(self):
1155
    self.needed_locks = {
1156
      locking.LEVEL_NODE: self.op.node_uuid,
1157
      }
1158

    
1159
  def Exec(self, feedback_fn):
1160
    """Computes the list of nodes and their attributes.
1161

1162
    """
1163
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1164
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1165
                                          self.op.storage_type, st_args,
1166
                                          self.op.name, self.op.changes)
1167
    result.Raise("Failed to modify storage unit '%s' on %s" %
1168
                 (self.op.name, self.op.node_name))
1169

    
1170

    
1171
class NodeQuery(QueryBase):
1172
  FIELDS = query.NODE_FIELDS
1173

    
1174
  def ExpandNames(self, lu):
1175
    lu.needed_locks = {}
1176
    lu.share_locks = ShareAll()
1177

    
1178
    if self.names:
1179
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1180
    else:
1181
      self.wanted = locking.ALL_SET
1182

    
1183
    self.do_locking = (self.use_locking and
1184
                       query.NQ_LIVE in self.requested_data)
1185

    
1186
    if self.do_locking:
1187
      # If any non-static field is requested we need to lock the nodes
1188
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1189
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1190

    
1191
  def DeclareLocks(self, lu, level):
1192
    pass
1193

    
1194
  def _GetQueryData(self, lu):
1195
    """Computes the list of nodes and their attributes.
1196

1197
    """
1198
    all_info = lu.cfg.GetAllNodesInfo()
1199

    
1200
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1201

    
1202
    # Gather data as requested
1203
    if query.NQ_LIVE in self.requested_data:
1204
      # filter out non-vm_capable nodes
1205
      toquery_node_uuids = [node.uuid for node in all_info.values()
1206
                            if node.vm_capable and node.uuid in node_uuids]
1207
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1208
      raw_storage_units = utils.storage.GetStorageUnits(
1209
          lu.cfg, [default_template])
1210
      storage_units = rpc.PrepareStorageUnitsForNodes(
1211
          lu.cfg, raw_storage_units, toquery_node_uuids)
1212
      default_hypervisor = lu.cfg.GetHypervisorType()
1213
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1214
      hvspecs = [(default_hypervisor, hvparams)]
1215
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1216
                                        hvspecs)
1217
      live_data = dict(
1218
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1219
          for (uuid, nresult) in node_data.items()
1220
          if not nresult.fail_msg and nresult.payload)
1221
    else:
1222
      live_data = None
1223

    
1224
    if query.NQ_INST in self.requested_data:
1225
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1226
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1227

    
1228
      inst_data = lu.cfg.GetAllInstancesInfo()
1229
      inst_uuid_to_inst_name = {}
1230

    
1231
      for inst in inst_data.values():
1232
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1233
        if inst.primary_node in node_to_primary:
1234
          node_to_primary[inst.primary_node].add(inst.uuid)
1235
        for secnode in inst.secondary_nodes:
1236
          if secnode in node_to_secondary:
1237
            node_to_secondary[secnode].add(inst.uuid)
1238
    else:
1239
      node_to_primary = None
1240
      node_to_secondary = None
1241
      inst_uuid_to_inst_name = None
1242

    
1243
    if query.NQ_OOB in self.requested_data:
1244
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1245
                         for uuid, node in all_info.iteritems())
1246
    else:
1247
      oob_support = None
1248

    
1249
    if query.NQ_GROUP in self.requested_data:
1250
      groups = lu.cfg.GetAllNodeGroupsInfo()
1251
    else:
1252
      groups = {}
1253

    
1254
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1255
                               live_data, lu.cfg.GetMasterNode(),
1256
                               node_to_primary, node_to_secondary,
1257
                               inst_uuid_to_inst_name, groups, oob_support,
1258
                               lu.cfg.GetClusterInfo())
1259

    
1260

    
1261
class LUNodeQuery(NoHooksLU):
1262
  """Logical unit for querying nodes.
1263

1264
  """
1265
  # pylint: disable=W0142
1266
  REQ_BGL = False
1267

    
1268
  def CheckArguments(self):
1269
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1270
                         self.op.output_fields, self.op.use_locking)
1271

    
1272
  def ExpandNames(self):
1273
    self.nq.ExpandNames(self)
1274

    
1275
  def DeclareLocks(self, level):
1276
    self.nq.DeclareLocks(self, level)
1277

    
1278
  def Exec(self, feedback_fn):
1279
    return self.nq.OldStyleQuery(self)
1280

    
1281

    
1282
def _CheckOutputFields(fields, selected):
1283
  """Checks whether all selected fields are valid according to fields.
1284

1285
  @type fields: L{utils.FieldSet}
1286
  @param fields: fields set
1287
  @type selected: L{utils.FieldSet}
1288
  @param selected: fields set
1289

1290
  """
1291
  delta = fields.NonMatching(selected)
1292
  if delta:
1293
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1294
                               % ",".join(delta), errors.ECODE_INVAL)
1295

    
1296

    
1297
class LUNodeQueryvols(NoHooksLU):
1298
  """Logical unit for getting volumes on node(s).
1299

1300
  """
1301
  REQ_BGL = False
1302

    
1303
  def CheckArguments(self):
1304
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1305
                                      constants.VF_VG, constants.VF_NAME,
1306
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1307
                       self.op.output_fields)
1308

    
1309
  def ExpandNames(self):
1310
    self.share_locks = ShareAll()
1311

    
1312
    if self.op.nodes:
1313
      self.needed_locks = {
1314
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1315
        }
1316
    else:
1317
      self.needed_locks = {
1318
        locking.LEVEL_NODE: locking.ALL_SET,
1319
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1320
        }
1321

    
1322
  def Exec(self, feedback_fn):
1323
    """Computes the list of nodes and their attributes.
1324

1325
    """
1326
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1327
    volumes = self.rpc.call_node_volumes(node_uuids)
1328

    
1329
    ilist = self.cfg.GetAllInstancesInfo()
1330
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1331

    
1332
    output = []
1333
    for node_uuid in node_uuids:
1334
      nresult = volumes[node_uuid]
1335
      if nresult.offline:
1336
        continue
1337
      msg = nresult.fail_msg
1338
      if msg:
1339
        self.LogWarning("Can't compute volume data on node %s: %s",
1340
                        self.cfg.GetNodeName(node_uuid), msg)
1341
        continue
1342

    
1343
      node_vols = sorted(nresult.payload,
1344
                         key=operator.itemgetter(constants.VF_DEV))
1345

    
1346
      for vol in node_vols:
1347
        node_output = []
1348
        for field in self.op.output_fields:
1349
          if field == constants.VF_NODE:
1350
            val = self.cfg.GetNodeName(node_uuid)
1351
          elif field == constants.VF_PHYS:
1352
            val = vol[constants.VF_DEV]
1353
          elif field == constants.VF_VG:
1354
            val = vol[constants.VF_VG]
1355
          elif field == constants.VF_NAME:
1356
            val = vol[constants.VF_NAME]
1357
          elif field == constants.VF_SIZE:
1358
            val = int(float(vol[constants.VF_SIZE]))
1359
          elif field == constants.VF_INSTANCE:
1360
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1361
                                 vol[constants.VF_NAME]), None)
1362
            if inst is not None:
1363
              val = inst.name
1364
            else:
1365
              val = "-"
1366
          else:
1367
            raise errors.ParameterError(field)
1368
          node_output.append(str(val))
1369

    
1370
        output.append(node_output)
1371

    
1372
    return output
1373

    
1374

    
1375
class LUNodeQueryStorage(NoHooksLU):
1376
  """Logical unit for getting information on storage units on node(s).
1377

1378
  """
1379
  REQ_BGL = False
1380

    
1381
  def CheckArguments(self):
1382
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1383
                       self.op.output_fields)
1384

    
1385
  def ExpandNames(self):
1386
    self.share_locks = ShareAll()
1387

    
1388
    if self.op.nodes:
1389
      self.needed_locks = {
1390
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1391
        }
1392
    else:
1393
      self.needed_locks = {
1394
        locking.LEVEL_NODE: locking.ALL_SET,
1395
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1396
        }
1397

    
1398
  def _DetermineStorageType(self):
1399
    """Determines the default storage type of the cluster.
1400

1401
    """
1402
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1403
    default_storage_type = \
1404
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1405
    return default_storage_type
1406

    
1407
  def CheckPrereq(self):
1408
    """Check prerequisites.
1409

1410
    """
1411
    if self.op.storage_type:
1412
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1413
      self.storage_type = self.op.storage_type
1414
    else:
1415
      self.storage_type = self._DetermineStorageType()
1416
      if self.storage_type not in constants.STS_REPORT:
1417
        raise errors.OpPrereqError(
1418
            "Storage reporting for storage type '%s' is not supported. Please"
1419
            " use the --storage-type option to specify one of the supported"
1420
            " storage types (%s) or set the default disk template to one that"
1421
            " supports storage reporting." %
1422
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1423

    
1424
  def Exec(self, feedback_fn):
1425
    """Computes the list of nodes and their attributes.
1426

1427
    """
1428
    if self.op.storage_type:
1429
      self.storage_type = self.op.storage_type
1430
    else:
1431
      self.storage_type = self._DetermineStorageType()
1432

    
1433
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1434

    
1435
    # Always get name to sort by
1436
    if constants.SF_NAME in self.op.output_fields:
1437
      fields = self.op.output_fields[:]
1438
    else:
1439
      fields = [constants.SF_NAME] + self.op.output_fields
1440

    
1441
    # Never ask for node or type as it's only known to the LU
1442
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1443
      while extra in fields:
1444
        fields.remove(extra)
1445

    
1446
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1447
    name_idx = field_idx[constants.SF_NAME]
1448

    
1449
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1450
    data = self.rpc.call_storage_list(self.node_uuids,
1451
                                      self.storage_type, st_args,
1452
                                      self.op.name, fields)
1453

    
1454
    result = []
1455

    
1456
    for node_uuid in utils.NiceSort(self.node_uuids):
1457
      node_name = self.cfg.GetNodeName(node_uuid)
1458
      nresult = data[node_uuid]
1459
      if nresult.offline:
1460
        continue
1461

    
1462
      msg = nresult.fail_msg
1463
      if msg:
1464
        self.LogWarning("Can't get storage data from node %s: %s",
1465
                        node_name, msg)
1466
        continue
1467

    
1468
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1469

    
1470
      for name in utils.NiceSort(rows.keys()):
1471
        row = rows[name]
1472

    
1473
        out = []
1474

    
1475
        for field in self.op.output_fields:
1476
          if field == constants.SF_NODE:
1477
            val = node_name
1478
          elif field == constants.SF_TYPE:
1479
            val = self.storage_type
1480
          elif field in field_idx:
1481
            val = row[field_idx[field]]
1482
          else:
1483
            raise errors.ParameterError(field)
1484

    
1485
          out.append(val)
1486

    
1487
        result.append(out)
1488

    
1489
    return result
1490

    
1491

    
1492
class LUNodeRemove(LogicalUnit):
1493
  """Logical unit for removing a node.
1494

1495
  """
1496
  HPATH = "node-remove"
1497
  HTYPE = constants.HTYPE_NODE
1498

    
1499
  def BuildHooksEnv(self):
1500
    """Build hooks env.
1501

1502
    """
1503
    return {
1504
      "OP_TARGET": self.op.node_name,
1505
      "NODE_NAME": self.op.node_name,
1506
      }
1507

    
1508
  def BuildHooksNodes(self):
1509
    """Build hooks nodes.
1510

1511
    This doesn't run on the target node in the pre phase as a failed
1512
    node would then be impossible to remove.
1513

1514
    """
1515
    all_nodes = self.cfg.GetNodeList()
1516
    try:
1517
      all_nodes.remove(self.op.node_uuid)
1518
    except ValueError:
1519
      pass
1520
    return (all_nodes, all_nodes)
1521

    
1522
  def CheckPrereq(self):
1523
    """Check prerequisites.
1524

1525
    This checks:
1526
     - the node exists in the configuration
1527
     - it does not have primary or secondary instances
1528
     - it's not the master
1529

1530
    Any errors are signaled by raising errors.OpPrereqError.
1531

1532
    """
1533
    (self.op.node_uuid, self.op.node_name) = \
1534
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1535
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1536
    assert node is not None
1537

    
1538
    masternode = self.cfg.GetMasterNode()
1539
    if node.uuid == masternode:
1540
      raise errors.OpPrereqError("Node is the master node, failover to another"
1541
                                 " node is required", errors.ECODE_INVAL)
1542

    
1543
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1544
      if node.uuid in instance.all_nodes:
1545
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1546
                                   " please remove first" % instance.name,
1547
                                   errors.ECODE_INVAL)
1548
    self.op.node_name = node.name
1549
    self.node = node
1550

    
1551
  def Exec(self, feedback_fn):
1552
    """Removes the node from the cluster.
1553

1554
    """
1555
    logging.info("Stopping the node daemon and removing configs from node %s",
1556
                 self.node.name)
1557

    
1558
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1559

    
1560
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1561
      "Not owning BGL"
1562

    
1563
    # Promote nodes to master candidate as needed
1564
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1565
    self.context.RemoveNode(self.node)
1566

    
1567
    # Run post hooks on the node before it's removed
1568
    RunPostHook(self, self.node.name)
1569

    
1570
    # we have to call this by name rather than by UUID, as the node is no longer
1571
    # in the config
1572
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1573
    msg = result.fail_msg
1574
    if msg:
1575
      self.LogWarning("Errors encountered on the remote node while leaving"
1576
                      " the cluster: %s", msg)
1577

    
1578
    # Remove node from our /etc/hosts
1579
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1580
      master_node_uuid = self.cfg.GetMasterNode()
1581
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1582
                                              constants.ETC_HOSTS_REMOVE,
1583
                                              self.node.name, None)
1584
      result.Raise("Can't update hosts file with new host data")
1585
      RedistributeAncillaryFiles(self)
1586

    
1587

    
1588
class LURepairNodeStorage(NoHooksLU):
1589
  """Repairs the volume group on a node.
1590

1591
  """
1592
  REQ_BGL = False
1593

    
1594
  def CheckArguments(self):
1595
    (self.op.node_uuid, self.op.node_name) = \
1596
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1597

    
1598
    storage_type = self.op.storage_type
1599

    
1600
    if (constants.SO_FIX_CONSISTENCY not in
1601
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1602
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1603
                                 " repaired" % storage_type,
1604
                                 errors.ECODE_INVAL)
1605

    
1606
  def ExpandNames(self):
1607
    self.needed_locks = {
1608
      locking.LEVEL_NODE: [self.op.node_uuid],
1609
      }
1610

    
1611
  def _CheckFaultyDisks(self, instance, node_uuid):
1612
    """Ensure faulty disks abort the opcode or at least warn."""
1613
    try:
1614
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1615
                                 node_uuid, True):
1616
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1617
                                   " node '%s'" %
1618
                                   (instance.name,
1619
                                    self.cfg.GetNodeName(node_uuid)),
1620
                                   errors.ECODE_STATE)
1621
    except errors.OpPrereqError, err:
1622
      if self.op.ignore_consistency:
1623
        self.LogWarning(str(err.args[0]))
1624
      else:
1625
        raise
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    """
1631
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1632

    
1633
    # Check whether any instance on this node has faulty disks
1634
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1635
      if not inst.disks_active:
1636
        continue
1637
      check_nodes = set(inst.all_nodes)
1638
      check_nodes.discard(self.op.node_uuid)
1639
      for inst_node_uuid in check_nodes:
1640
        self._CheckFaultyDisks(inst, inst_node_uuid)
1641

    
1642
  def Exec(self, feedback_fn):
1643
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1644
                (self.op.name, self.op.node_name))
1645

    
1646
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1647
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1648
                                           self.op.storage_type, st_args,
1649
                                           self.op.name,
1650
                                           constants.SO_FIX_CONSISTENCY)
1651
    result.Raise("Failed to repair storage unit '%s' on %s" %
1652
                 (self.op.name, self.op.node_name))