Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 30796ad6

History | View | Annotate | Download (59.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
142

    
143
  def CheckPrereq(self):
144
    """Check prerequisites.
145

146
    This checks:
147
     - the new node is not already in the config
148
     - it is resolvable
149
     - its parameters (single/dual homed) matches the cluster
150

151
    Any errors are signaled by raising errors.OpPrereqError.
152

153
    """
154
    node_name = self.hostname.name
155
    self.op.primary_ip = self.hostname.ip
156
    if self.op.secondary_ip is None:
157
      if self.primary_ip_family == netutils.IP6Address.family:
158
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159
                                   " IPv4 address must be given as secondary",
160
                                   errors.ECODE_INVAL)
161
      self.op.secondary_ip = self.op.primary_ip
162

    
163
    secondary_ip = self.op.secondary_ip
164
    if not netutils.IP4Address.IsValid(secondary_ip):
165
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166
                                 " address" % secondary_ip, errors.ECODE_INVAL)
167

    
168
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169
    if not self.op.readd and existing_node_info is not None:
170
      raise errors.OpPrereqError("Node %s is already in the configuration" %
171
                                 node_name, errors.ECODE_EXISTS)
172
    elif self.op.readd and existing_node_info is None:
173
      raise errors.OpPrereqError("Node %s is not in the configuration" %
174
                                 node_name, errors.ECODE_NOENT)
175

    
176
    self.changed_primary_ip = False
177

    
178
    for existing_node in self.cfg.GetAllNodesInfo().values():
179
      if self.op.readd and node_name == existing_node.name:
180
        if existing_node.secondary_ip != secondary_ip:
181
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
182
                                     " address configuration as before",
183
                                     errors.ECODE_INVAL)
184
        if existing_node.primary_ip != self.op.primary_ip:
185
          self.changed_primary_ip = True
186

    
187
        continue
188

    
189
      if (existing_node.primary_ip == self.op.primary_ip or
190
          existing_node.secondary_ip == self.op.primary_ip or
191
          existing_node.primary_ip == secondary_ip or
192
          existing_node.secondary_ip == secondary_ip):
193
        raise errors.OpPrereqError("New node ip address(es) conflict with"
194
                                   " existing node %s" % existing_node.name,
195
                                   errors.ECODE_NOTUNIQUE)
196

    
197
    # After this 'if' block, None is no longer a valid value for the
198
    # _capable op attributes
199
    if self.op.readd:
200
      assert existing_node_info is not None, \
201
        "Can't retrieve locked node %s" % node_name
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, getattr(existing_node_info, attr))
205
    else:
206
      for attr in self._NFLAGS:
207
        if getattr(self.op, attr) is None:
208
          setattr(self.op, attr, True)
209

    
210
    if self.op.readd and not self.op.vm_capable:
211
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212
      if pri or sec:
213
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214
                                   " flag set to false, but it already holds"
215
                                   " instances" % node_name,
216
                                   errors.ECODE_STATE)
217

    
218
    # check that the type of the node (single versus dual homed) is the
219
    # same as for the master
220
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221
    master_singlehomed = myself.secondary_ip == myself.primary_ip
222
    newbie_singlehomed = secondary_ip == self.op.primary_ip
223
    if master_singlehomed != newbie_singlehomed:
224
      if master_singlehomed:
225
        raise errors.OpPrereqError("The master has no secondary ip but the"
226
                                   " new node has one",
227
                                   errors.ECODE_INVAL)
228
      else:
229
        raise errors.OpPrereqError("The master has a secondary ip but the"
230
                                   " new node doesn't have one",
231
                                   errors.ECODE_INVAL)
232

    
233
    # checks reachability
234
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235
      raise errors.OpPrereqError("Node not reachable by ping",
236
                                 errors.ECODE_ENVIRON)
237

    
238
    if not newbie_singlehomed:
239
      # check reachability from my secondary ip to newbie's secondary ip
240
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241
                              source=myself.secondary_ip):
242
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243
                                   " based ping to node daemon port",
244
                                   errors.ECODE_ENVIRON)
245

    
246
    if self.op.readd:
247
      exceptions = [existing_node_info.uuid]
248
    else:
249
      exceptions = []
250

    
251
    if self.op.master_capable:
252
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253
    else:
254
      self.master_candidate = False
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      node_group = self.cfg.LookupNodeGroup(self.op.group)
260
      self.new_node = objects.Node(name=node_name,
261
                                   primary_ip=self.op.primary_ip,
262
                                   secondary_ip=secondary_ip,
263
                                   master_candidate=self.master_candidate,
264
                                   offline=False, drained=False,
265
                                   group=node_group, ndparams={})
266

    
267
    if self.op.ndparams:
268
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270
                           "node", "cluster or group")
271

    
272
    if self.op.hv_state:
273
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274

    
275
    if self.op.disk_state:
276
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277

    
278
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279
    #       it a property on the base class.
280
    rpcrunner = rpc.DnsOnlyRunner()
281
    result = rpcrunner.call_version([node_name])[node_name]
282
    result.Raise("Can't get version information from node %s" % node_name)
283
    if constants.PROTOCOL_VERSION == result.payload:
284
      logging.info("Communication to node %s fine, sw version %s match",
285
                   node_name, result.payload)
286
    else:
287
      raise errors.OpPrereqError("Version mismatch master version %s,"
288
                                 " node version %s" %
289
                                 (constants.PROTOCOL_VERSION, result.payload),
290
                                 errors.ECODE_ENVIRON)
291

    
292
    vg_name = self.cfg.GetVGName()
293
    if vg_name is not None:
294
      vparams = {constants.NV_PVLIST: [vg_name]}
295
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296
      cname = self.cfg.GetClusterName()
297
      result = rpcrunner.call_node_verify_light(
298
          [node_name], vparams, cname,
299
          self.cfg.GetClusterInfo().hvparams)[node_name]
300
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301
      if errmsgs:
302
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
304

    
305
  def Exec(self, feedback_fn):
306
    """Adds the new node to the cluster.
307

308
    """
309
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310
      "Not owning BGL"
311

    
312
    # We adding a new node so we assume it's powered
313
    self.new_node.powered = True
314

    
315
    # for re-adds, reset the offline/drained/master-candidate flags;
316
    # we need to reset here, otherwise offline would prevent RPC calls
317
    # later in the procedure; this also means that if the re-add
318
    # fails, we are left with a non-offlined, broken node
319
    if self.op.readd:
320
      self.new_node.offline = False
321
      self.new_node.drained = False
322
      self.LogInfo("Readding a node, the offline/drained flags were reset")
323
      # if we demote the node, we do cleanup later in the procedure
324
      self.new_node.master_candidate = self.master_candidate
325
      if self.changed_primary_ip:
326
        self.new_node.primary_ip = self.op.primary_ip
327

    
328
    # copy the master/vm_capable flags
329
    for attr in self._NFLAGS:
330
      setattr(self.new_node, attr, getattr(self.op, attr))
331

    
332
    # notify the user about any possible mc promotion
333
    if self.new_node.master_candidate:
334
      self.LogInfo("Node will be a master candidate")
335

    
336
    if self.op.ndparams:
337
      self.new_node.ndparams = self.op.ndparams
338
    else:
339
      self.new_node.ndparams = {}
340

    
341
    if self.op.hv_state:
342
      self.new_node.hv_state_static = self.new_hv_state
343

    
344
    if self.op.disk_state:
345
      self.new_node.disk_state_static = self.new_disk_state
346

    
347
    # Add node to our /etc/hosts, and add key to known_hosts
348
    if self.cfg.GetClusterInfo().modify_etc_hosts:
349
      master_node = self.cfg.GetMasterNode()
350
      result = self.rpc.call_etc_hosts_modify(
351
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
352
                 self.hostname.ip)
353
      result.Raise("Can't update hosts file with new host data")
354

    
355
    if self.new_node.secondary_ip != self.new_node.primary_ip:
356
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
357
                               False)
358

    
359
    node_verifier_uuids = [self.cfg.GetMasterNode()]
360
    node_verify_param = {
361
      constants.NV_NODELIST: ([self.new_node.name], {}),
362
      # TODO: do a node-net-test as well?
363
    }
364

    
365
    result = self.rpc.call_node_verify(
366
               node_verifier_uuids, node_verify_param,
367
               self.cfg.GetClusterName(),
368
               self.cfg.GetClusterInfo().hvparams)
369
    for verifier in node_verifier_uuids:
370
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
371
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
372
      if nl_payload:
373
        for failed in nl_payload:
374
          feedback_fn("ssh/hostname verification failed"
375
                      " (checking from %s): %s" %
376
                      (verifier, nl_payload[failed]))
377
        raise errors.OpExecError("ssh/hostname verification failed")
378

    
379
    if self.op.readd:
380
      self.context.ReaddNode(self.new_node)
381
      RedistributeAncillaryFiles(self)
382
      # make sure we redistribute the config
383
      self.cfg.Update(self.new_node, feedback_fn)
384
      # and make sure the new node will not have old files around
385
      if not self.new_node.master_candidate:
386
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
387
        result.Warn("Node failed to demote itself from master candidate status",
388
                    self.LogWarning)
389
    else:
390
      self.context.AddNode(self.new_node, self.proc.GetECId())
391
      RedistributeAncillaryFiles(self)
392

    
393

    
394
class LUNodeSetParams(LogicalUnit):
395
  """Modifies the parameters of a node.
396

397
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
398
      to the node role (as _ROLE_*)
399
  @cvar _R2F: a dictionary from node role to tuples of flags
400
  @cvar _FLAGS: a list of attribute names corresponding to the flags
401

402
  """
403
  HPATH = "node-modify"
404
  HTYPE = constants.HTYPE_NODE
405
  REQ_BGL = False
406
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
407
  _F2R = {
408
    (True, False, False): _ROLE_CANDIDATE,
409
    (False, True, False): _ROLE_DRAINED,
410
    (False, False, True): _ROLE_OFFLINE,
411
    (False, False, False): _ROLE_REGULAR,
412
    }
413
  _R2F = dict((v, k) for k, v in _F2R.items())
414
  _FLAGS = ["master_candidate", "drained", "offline"]
415

    
416
  def CheckArguments(self):
417
    (self.op.node_uuid, self.op.node_name) = \
418
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
419
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
420
                self.op.master_capable, self.op.vm_capable,
421
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
422
                self.op.disk_state]
423
    if all_mods.count(None) == len(all_mods):
424
      raise errors.OpPrereqError("Please pass at least one modification",
425
                                 errors.ECODE_INVAL)
426
    if all_mods.count(True) > 1:
427
      raise errors.OpPrereqError("Can't set the node into more than one"
428
                                 " state at the same time",
429
                                 errors.ECODE_INVAL)
430

    
431
    # Boolean value that tells us whether we might be demoting from MC
432
    self.might_demote = (self.op.master_candidate is False or
433
                         self.op.offline is True or
434
                         self.op.drained is True or
435
                         self.op.master_capable is False)
436

    
437
    if self.op.secondary_ip:
438
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
439
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
440
                                   " address" % self.op.secondary_ip,
441
                                   errors.ECODE_INVAL)
442

    
443
    self.lock_all = self.op.auto_promote and self.might_demote
444
    self.lock_instances = self.op.secondary_ip is not None
445

    
446
  def _InstanceFilter(self, instance):
447
    """Filter for getting affected instances.
448

449
    """
450
    return (instance.disk_template in constants.DTS_INT_MIRROR and
451
            self.op.node_uuid in instance.all_nodes)
452

    
453
  def ExpandNames(self):
454
    if self.lock_all:
455
      self.needed_locks = {
456
        locking.LEVEL_NODE: locking.ALL_SET,
457

    
458
        # Block allocations when all nodes are locked
459
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
460
        }
461
    else:
462
      self.needed_locks = {
463
        locking.LEVEL_NODE: self.op.node_uuid,
464
        }
465

    
466
    # Since modifying a node can have severe effects on currently running
467
    # operations the resource lock is at least acquired in shared mode
468
    self.needed_locks[locking.LEVEL_NODE_RES] = \
469
      self.needed_locks[locking.LEVEL_NODE]
470

    
471
    # Get all locks except nodes in shared mode; they are not used for anything
472
    # but read-only access
473
    self.share_locks = ShareAll()
474
    self.share_locks[locking.LEVEL_NODE] = 0
475
    self.share_locks[locking.LEVEL_NODE_RES] = 0
476
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
477

    
478
    if self.lock_instances:
479
      self.needed_locks[locking.LEVEL_INSTANCE] = \
480
        self.cfg.GetInstanceNames(
481
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
482

    
483
  def BuildHooksEnv(self):
484
    """Build hooks env.
485

486
    This runs on the master node.
487

488
    """
489
    return {
490
      "OP_TARGET": self.op.node_name,
491
      "MASTER_CANDIDATE": str(self.op.master_candidate),
492
      "OFFLINE": str(self.op.offline),
493
      "DRAINED": str(self.op.drained),
494
      "MASTER_CAPABLE": str(self.op.master_capable),
495
      "VM_CAPABLE": str(self.op.vm_capable),
496
      }
497

    
498
  def BuildHooksNodes(self):
499
    """Build hooks nodes.
500

501
    """
502
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
503
    return (nl, nl)
504

    
505
  def CheckPrereq(self):
506
    """Check prerequisites.
507

508
    This only checks the instance list against the existing names.
509

510
    """
511
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
512
    if self.lock_instances:
513
      affected_instances = \
514
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
515

    
516
      # Verify instance locks
517
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
518
      wanted_instance_names = frozenset([inst.name for inst in
519
                                         affected_instances.values()])
520
      if wanted_instance_names - owned_instance_names:
521
        raise errors.OpPrereqError("Instances affected by changing node %s's"
522
                                   " secondary IP address have changed since"
523
                                   " locks were acquired, wanted '%s', have"
524
                                   " '%s'; retry the operation" %
525
                                   (node.name,
526
                                    utils.CommaJoin(wanted_instance_names),
527
                                    utils.CommaJoin(owned_instance_names)),
528
                                   errors.ECODE_STATE)
529
    else:
530
      affected_instances = None
531

    
532
    if (self.op.master_candidate is not None or
533
        self.op.drained is not None or
534
        self.op.offline is not None):
535
      # we can't change the master's node flags
536
      if node.uuid == self.cfg.GetMasterNode():
537
        raise errors.OpPrereqError("The master role can be changed"
538
                                   " only via master-failover",
539
                                   errors.ECODE_INVAL)
540

    
541
    if self.op.master_candidate and not node.master_capable:
542
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543
                                 " it a master candidate" % node.name,
544
                                 errors.ECODE_STATE)
545

    
546
    if self.op.vm_capable is False:
547
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548
      if ipri or isec:
549
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550
                                   " the vm_capable flag" % node.name,
551
                                   errors.ECODE_STATE)
552

    
553
    if node.master_candidate and self.might_demote and not self.lock_all:
554
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
555
      # check if after removing the current node, we're missing master
556
      # candidates
557
      (mc_remaining, mc_should, _) = \
558
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559
      if mc_remaining < mc_should:
560
        raise errors.OpPrereqError("Not enough master candidates, please"
561
                                   " pass auto promote option to allow"
562
                                   " promotion (--auto-promote or RAPI"
563
                                   " auto_promote=True)", errors.ECODE_STATE)
564

    
565
    self.old_flags = old_flags = (node.master_candidate,
566
                                  node.drained, node.offline)
567
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568
    self.old_role = old_role = self._F2R[old_flags]
569

    
570
    # Check for ineffective changes
571
    for attr in self._FLAGS:
572
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
573
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574
        setattr(self.op, attr, None)
575

    
576
    # Past this point, any flag change to False means a transition
577
    # away from the respective state, as only real changes are kept
578

    
579
    # TODO: We might query the real power state if it supports OOB
580
    if SupportsOob(self.cfg, node):
581
      if self.op.offline is False and not (node.powered or
582
                                           self.op.powered is True):
583
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584
                                    " offline status can be reset") %
585
                                   self.op.node_name, errors.ECODE_STATE)
586
    elif self.op.powered is not None:
587
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
588
                                  " as it does not support out-of-band"
589
                                  " handling") % self.op.node_name,
590
                                 errors.ECODE_STATE)
591

    
592
    # If we're being deofflined/drained, we'll MC ourself if needed
593
    if (self.op.drained is False or self.op.offline is False or
594
        (self.op.master_capable and not node.master_capable)):
595
      if _DecideSelfPromotion(self):
596
        self.op.master_candidate = True
597
        self.LogInfo("Auto-promoting node to master candidate")
598

    
599
    # If we're no longer master capable, we'll demote ourselves from MC
600
    if self.op.master_capable is False and node.master_candidate:
601
      self.LogInfo("Demoting from master candidate")
602
      self.op.master_candidate = False
603

    
604
    # Compute new role
605
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606
    if self.op.master_candidate:
607
      new_role = self._ROLE_CANDIDATE
608
    elif self.op.drained:
609
      new_role = self._ROLE_DRAINED
610
    elif self.op.offline:
611
      new_role = self._ROLE_OFFLINE
612
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613
      # False is still in new flags, which means we're un-setting (the
614
      # only) True flag
615
      new_role = self._ROLE_REGULAR
616
    else: # no new flags, nothing, keep old role
617
      new_role = old_role
618

    
619
    self.new_role = new_role
620

    
621
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
622
      # Trying to transition out of offline status
623
      result = self.rpc.call_version([node.uuid])[node.uuid]
624
      if result.fail_msg:
625
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626
                                   " to report its version: %s" %
627
                                   (node.name, result.fail_msg),
628
                                   errors.ECODE_STATE)
629
      else:
630
        self.LogWarning("Transitioning node from offline to online state"
631
                        " without using re-add. Please make sure the node"
632
                        " is healthy!")
633

    
634
    # When changing the secondary ip, verify if this is a single-homed to
635
    # multi-homed transition or vice versa, and apply the relevant
636
    # restrictions.
637
    if self.op.secondary_ip:
638
      # Ok even without locking, because this can't be changed by any LU
639
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640
      master_singlehomed = master.secondary_ip == master.primary_ip
641
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642
        if self.op.force and node.uuid == master.uuid:
643
          self.LogWarning("Transitioning from single-homed to multi-homed"
644
                          " cluster; all nodes will require a secondary IP"
645
                          " address")
646
        else:
647
          raise errors.OpPrereqError("Changing the secondary ip on a"
648
                                     " single-homed cluster requires the"
649
                                     " --force option to be passed, and the"
650
                                     " target node to be the master",
651
                                     errors.ECODE_INVAL)
652
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653
        if self.op.force and node.uuid == master.uuid:
654
          self.LogWarning("Transitioning from multi-homed to single-homed"
655
                          " cluster; secondary IP addresses will have to be"
656
                          " removed")
657
        else:
658
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659
                                     " same as the primary IP on a multi-homed"
660
                                     " cluster, unless the --force option is"
661
                                     " passed, and the target node is the"
662
                                     " master", errors.ECODE_INVAL)
663

    
664
      assert not (set([inst.name for inst in affected_instances.values()]) -
665
                  self.owned_locks(locking.LEVEL_INSTANCE))
666

    
667
      if node.offline:
668
        if affected_instances:
669
          msg = ("Cannot change secondary IP address: offline node has"
670
                 " instances (%s) configured to use it" %
671
                 utils.CommaJoin(
672
                   [inst.name for inst in affected_instances.values()]))
673
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
674
      else:
675
        # On online nodes, check that no instances are running, and that
676
        # the node has the new ip and we can reach it.
677
        for instance in affected_instances.values():
678
          CheckInstanceState(self, instance, INSTANCE_DOWN,
679
                             msg="cannot change secondary ip")
680

    
681
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
682
        if master.uuid != node.uuid:
683
          # check reachability from master secondary ip to new secondary ip
684
          if not netutils.TcpPing(self.op.secondary_ip,
685
                                  constants.DEFAULT_NODED_PORT,
686
                                  source=master.secondary_ip):
687
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
688
                                       " based ping to node daemon port",
689
                                       errors.ECODE_ENVIRON)
690

    
691
    if self.op.ndparams:
692
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
693
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
694
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
695
                           "node", "cluster or group")
696
      self.new_ndparams = new_ndparams
697

    
698
    if self.op.hv_state:
699
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
700
                                                node.hv_state_static)
701

    
702
    if self.op.disk_state:
703
      self.new_disk_state = \
704
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705

    
706
  def Exec(self, feedback_fn):
707
    """Modifies a node.
708

709
    """
710
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
711
    result = []
712

    
713
    if self.op.ndparams:
714
      node.ndparams = self.new_ndparams
715

    
716
    if self.op.powered is not None:
717
      node.powered = self.op.powered
718

    
719
    if self.op.hv_state:
720
      node.hv_state_static = self.new_hv_state
721

    
722
    if self.op.disk_state:
723
      node.disk_state_static = self.new_disk_state
724

    
725
    for attr in ["master_capable", "vm_capable"]:
726
      val = getattr(self.op, attr)
727
      if val is not None:
728
        setattr(node, attr, val)
729
        result.append((attr, str(val)))
730

    
731
    if self.new_role != self.old_role:
732
      # Tell the node to demote itself, if no longer MC and not offline
733
      if self.old_role == self._ROLE_CANDIDATE and \
734
          self.new_role != self._ROLE_OFFLINE:
735
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
736
        if msg:
737
          self.LogWarning("Node failed to demote itself: %s", msg)
738

    
739
      new_flags = self._R2F[self.new_role]
740
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
741
        if of != nf:
742
          result.append((desc, str(nf)))
743
      (node.master_candidate, node.drained, node.offline) = new_flags
744

    
745
      # we locked all nodes, we adjust the CP before updating this node
746
      if self.lock_all:
747
        AdjustCandidatePool(self, [node.uuid])
748

    
749
    if self.op.secondary_ip:
750
      node.secondary_ip = self.op.secondary_ip
751
      result.append(("secondary_ip", self.op.secondary_ip))
752

    
753
    # this will trigger configuration file update, if needed
754
    self.cfg.Update(node, feedback_fn)
755

    
756
    # this will trigger job queue propagation or cleanup if the mc
757
    # flag changed
758
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
759
      self.context.ReaddNode(node)
760

    
761
    return result
762

    
763

    
764
class LUNodePowercycle(NoHooksLU):
765
  """Powercycles a node.
766

767
  """
768
  REQ_BGL = False
769

    
770
  def CheckArguments(self):
771
    (self.op.node_uuid, self.op.node_name) = \
772
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
773

    
774
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
775
      raise errors.OpPrereqError("The node is the master and the force"
776
                                 " parameter was not set",
777
                                 errors.ECODE_INVAL)
778

    
779
  def ExpandNames(self):
780
    """Locking for PowercycleNode.
781

782
    This is a last-resort option and shouldn't block on other
783
    jobs. Therefore, we grab no locks.
784

785
    """
786
    self.needed_locks = {}
787

    
788
  def Exec(self, feedback_fn):
789
    """Reboots a node.
790

791
    """
792
    default_hypervisor = self.cfg.GetHypervisorType()
793
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
794
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
795
                                           default_hypervisor,
796
                                           hvparams)
797
    result.Raise("Failed to schedule the reboot")
798
    return result.payload
799

    
800

    
801
def _GetNodeInstancesInner(cfg, fn):
802
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
803

    
804

    
805
def _GetNodePrimaryInstances(cfg, node_uuid):
806
  """Returns primary instances on a node.
807

808
  """
809
  return _GetNodeInstancesInner(cfg,
810
                                lambda inst: node_uuid == inst.primary_node)
811

    
812

    
813
def _GetNodeSecondaryInstances(cfg, node_uuid):
814
  """Returns secondary instances on a node.
815

816
  """
817
  return _GetNodeInstancesInner(cfg,
818
                                lambda inst: node_uuid in inst.secondary_nodes)
819

    
820

    
821
def _GetNodeInstances(cfg, node_uuid):
822
  """Returns a list of all primary and secondary instances on a node.
823

824
  """
825

    
826
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
827

    
828

    
829
class LUNodeEvacuate(NoHooksLU):
830
  """Evacuates instances off a list of nodes.
831

832
  """
833
  REQ_BGL = False
834

    
835
  _MODE2IALLOCATOR = {
836
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
837
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
838
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
839
    }
840
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
841
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
842
          constants.IALLOCATOR_NEVAC_MODES)
843

    
844
  def CheckArguments(self):
845
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
846

    
847
  def ExpandNames(self):
848
    (self.op.node_uuid, self.op.node_name) = \
849
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
850

    
851
    if self.op.remote_node is not None:
852
      (self.op.remote_node_uuid, self.op.remote_node) = \
853
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
854
                              self.op.remote_node)
855
      assert self.op.remote_node
856

    
857
      if self.op.node_uuid == self.op.remote_node_uuid:
858
        raise errors.OpPrereqError("Can not use evacuated node as a new"
859
                                   " secondary node", errors.ECODE_INVAL)
860

    
861
      if self.op.mode != constants.NODE_EVAC_SEC:
862
        raise errors.OpPrereqError("Without the use of an iallocator only"
863
                                   " secondary instances can be evacuated",
864
                                   errors.ECODE_INVAL)
865

    
866
    # Declare locks
867
    self.share_locks = ShareAll()
868
    self.needed_locks = {
869
      locking.LEVEL_INSTANCE: [],
870
      locking.LEVEL_NODEGROUP: [],
871
      locking.LEVEL_NODE: [],
872
      }
873

    
874
    # Determine nodes (via group) optimistically, needs verification once locks
875
    # have been acquired
876
    self.lock_nodes = self._DetermineNodes()
877

    
878
  def _DetermineNodes(self):
879
    """Gets the list of node UUIDs to operate on.
880

881
    """
882
    if self.op.remote_node is None:
883
      # Iallocator will choose any node(s) in the same group
884
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
885
    else:
886
      group_nodes = frozenset([self.op.remote_node_uuid])
887

    
888
    # Determine nodes to be locked
889
    return set([self.op.node_uuid]) | group_nodes
890

    
891
  def _DetermineInstances(self):
892
    """Builds list of instances to operate on.
893

894
    """
895
    assert self.op.mode in constants.NODE_EVAC_MODES
896

    
897
    if self.op.mode == constants.NODE_EVAC_PRI:
898
      # Primary instances only
899
      inst_fn = _GetNodePrimaryInstances
900
      assert self.op.remote_node is None, \
901
        "Evacuating primary instances requires iallocator"
902
    elif self.op.mode == constants.NODE_EVAC_SEC:
903
      # Secondary instances only
904
      inst_fn = _GetNodeSecondaryInstances
905
    else:
906
      # All instances
907
      assert self.op.mode == constants.NODE_EVAC_ALL
908
      inst_fn = _GetNodeInstances
909
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
910
      # per instance
911
      raise errors.OpPrereqError("Due to an issue with the iallocator"
912
                                 " interface it is not possible to evacuate"
913
                                 " all instances at once; specify explicitly"
914
                                 " whether to evacuate primary or secondary"
915
                                 " instances",
916
                                 errors.ECODE_INVAL)
917

    
918
    return inst_fn(self.cfg, self.op.node_uuid)
919

    
920
  def DeclareLocks(self, level):
921
    if level == locking.LEVEL_INSTANCE:
922
      # Lock instances optimistically, needs verification once node and group
923
      # locks have been acquired
924
      self.needed_locks[locking.LEVEL_INSTANCE] = \
925
        set(i.name for i in self._DetermineInstances())
926

    
927
    elif level == locking.LEVEL_NODEGROUP:
928
      # Lock node groups for all potential target nodes optimistically, needs
929
      # verification once nodes have been acquired
930
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
931
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
932

    
933
    elif level == locking.LEVEL_NODE:
934
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
935

    
936
  def CheckPrereq(self):
937
    # Verify locks
938
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
939
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
940
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
941

    
942
    need_nodes = self._DetermineNodes()
943

    
944
    if not owned_nodes.issuperset(need_nodes):
945
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
946
                                 " locks were acquired, current nodes are"
947
                                 " are '%s', used to be '%s'; retry the"
948
                                 " operation" %
949
                                 (self.op.node_name,
950
                                  utils.CommaJoin(need_nodes),
951
                                  utils.CommaJoin(owned_nodes)),
952
                                 errors.ECODE_STATE)
953

    
954
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
955
    if owned_groups != wanted_groups:
956
      raise errors.OpExecError("Node groups changed since locks were acquired,"
957
                               " current groups are '%s', used to be '%s';"
958
                               " retry the operation" %
959
                               (utils.CommaJoin(wanted_groups),
960
                                utils.CommaJoin(owned_groups)))
961

    
962
    # Determine affected instances
963
    self.instances = self._DetermineInstances()
964
    self.instance_names = [i.name for i in self.instances]
965

    
966
    if set(self.instance_names) != owned_instance_names:
967
      raise errors.OpExecError("Instances on node '%s' changed since locks"
968
                               " were acquired, current instances are '%s',"
969
                               " used to be '%s'; retry the operation" %
970
                               (self.op.node_name,
971
                                utils.CommaJoin(self.instance_names),
972
                                utils.CommaJoin(owned_instance_names)))
973

    
974
    if self.instance_names:
975
      self.LogInfo("Evacuating instances from node '%s': %s",
976
                   self.op.node_name,
977
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
978
    else:
979
      self.LogInfo("No instances to evacuate from node '%s'",
980
                   self.op.node_name)
981

    
982
    if self.op.remote_node is not None:
983
      for i in self.instances:
984
        if i.primary_node == self.op.remote_node_uuid:
985
          raise errors.OpPrereqError("Node %s is the primary node of"
986
                                     " instance %s, cannot use it as"
987
                                     " secondary" %
988
                                     (self.op.remote_node, i.name),
989
                                     errors.ECODE_INVAL)
990

    
991
  def Exec(self, feedback_fn):
992
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
993

    
994
    if not self.instance_names:
995
      # No instances to evacuate
996
      jobs = []
997

    
998
    elif self.op.iallocator is not None:
999
      # TODO: Implement relocation to other group
1000
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1001
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1002
                                     instances=list(self.instance_names))
1003
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1004

    
1005
      ial.Run(self.op.iallocator)
1006

    
1007
      if not ial.success:
1008
        raise errors.OpPrereqError("Can't compute node evacuation using"
1009
                                   " iallocator '%s': %s" %
1010
                                   (self.op.iallocator, ial.info),
1011
                                   errors.ECODE_NORES)
1012

    
1013
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1014

    
1015
    elif self.op.remote_node is not None:
1016
      assert self.op.mode == constants.NODE_EVAC_SEC
1017
      jobs = [
1018
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1019
                                        remote_node=self.op.remote_node,
1020
                                        disks=[],
1021
                                        mode=constants.REPLACE_DISK_CHG,
1022
                                        early_release=self.op.early_release)]
1023
        for instance_name in self.instance_names]
1024

    
1025
    else:
1026
      raise errors.ProgrammerError("No iallocator or remote node")
1027

    
1028
    return ResultWithJobs(jobs)
1029

    
1030

    
1031
class LUNodeMigrate(LogicalUnit):
1032
  """Migrate all instances from a node.
1033

1034
  """
1035
  HPATH = "node-migrate"
1036
  HTYPE = constants.HTYPE_NODE
1037
  REQ_BGL = False
1038

    
1039
  def CheckArguments(self):
1040
    pass
1041

    
1042
  def ExpandNames(self):
1043
    (self.op.node_uuid, self.op.node_name) = \
1044
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1045

    
1046
    self.share_locks = ShareAll()
1047
    self.needed_locks = {
1048
      locking.LEVEL_NODE: [self.op.node_uuid],
1049
      }
1050

    
1051
  def BuildHooksEnv(self):
1052
    """Build hooks env.
1053

1054
    This runs on the master, the primary and all the secondaries.
1055

1056
    """
1057
    return {
1058
      "NODE_NAME": self.op.node_name,
1059
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1060
      }
1061

    
1062
  def BuildHooksNodes(self):
1063
    """Build hooks nodes.
1064

1065
    """
1066
    nl = [self.cfg.GetMasterNode()]
1067
    return (nl, nl)
1068

    
1069
  def CheckPrereq(self):
1070
    pass
1071

    
1072
  def Exec(self, feedback_fn):
1073
    # Prepare jobs for migration instances
1074
    jobs = [
1075
      [opcodes.OpInstanceMigrate(
1076
        instance_name=inst.name,
1077
        mode=self.op.mode,
1078
        live=self.op.live,
1079
        iallocator=self.op.iallocator,
1080
        target_node=self.op.target_node,
1081
        allow_runtime_changes=self.op.allow_runtime_changes,
1082
        ignore_ipolicy=self.op.ignore_ipolicy)]
1083
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1084

    
1085
    # TODO: Run iallocator in this opcode and pass correct placement options to
1086
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1087
    # running the iallocator and the actual migration, a good consistency model
1088
    # will have to be found.
1089

    
1090
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1091
            frozenset([self.op.node_uuid]))
1092

    
1093
    return ResultWithJobs(jobs)
1094

    
1095

    
1096
def _GetStorageTypeArgs(cfg, storage_type):
1097
  """Returns the arguments for a storage type.
1098

1099
  """
1100
  # Special case for file storage
1101
  if storage_type == constants.ST_FILE:
1102
    # storage.FileStorage wants a list of storage directories
1103
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1104

    
1105
  return []
1106

    
1107

    
1108
class LUNodeModifyStorage(NoHooksLU):
1109
  """Logical unit for modifying a storage volume on a node.
1110

1111
  """
1112
  REQ_BGL = False
1113

    
1114
  def CheckArguments(self):
1115
    (self.op.node_uuid, self.op.node_name) = \
1116
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1117

    
1118
    storage_type = self.op.storage_type
1119

    
1120
    try:
1121
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1122
    except KeyError:
1123
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1124
                                 " modified" % storage_type,
1125
                                 errors.ECODE_INVAL)
1126

    
1127
    diff = set(self.op.changes.keys()) - modifiable
1128
    if diff:
1129
      raise errors.OpPrereqError("The following fields can not be modified for"
1130
                                 " storage units of type '%s': %r" %
1131
                                 (storage_type, list(diff)),
1132
                                 errors.ECODE_INVAL)
1133

    
1134
  def CheckPrereq(self):
1135
    """Check prerequisites.
1136

1137
    """
1138
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1139

    
1140
  def ExpandNames(self):
1141
    self.needed_locks = {
1142
      locking.LEVEL_NODE: self.op.node_uuid,
1143
      }
1144

    
1145
  def Exec(self, feedback_fn):
1146
    """Computes the list of nodes and their attributes.
1147

1148
    """
1149
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1150
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1151
                                          self.op.storage_type, st_args,
1152
                                          self.op.name, self.op.changes)
1153
    result.Raise("Failed to modify storage unit '%s' on %s" %
1154
                 (self.op.name, self.op.node_name))
1155

    
1156

    
1157
class NodeQuery(QueryBase):
1158
  FIELDS = query.NODE_FIELDS
1159

    
1160
  def ExpandNames(self, lu):
1161
    lu.needed_locks = {}
1162
    lu.share_locks = ShareAll()
1163

    
1164
    if self.names:
1165
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1166
    else:
1167
      self.wanted = locking.ALL_SET
1168

    
1169
    self.do_locking = (self.use_locking and
1170
                       query.NQ_LIVE in self.requested_data)
1171

    
1172
    if self.do_locking:
1173
      # If any non-static field is requested we need to lock the nodes
1174
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1175
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1176

    
1177
  def DeclareLocks(self, lu, level):
1178
    pass
1179

    
1180
  def _GetQueryData(self, lu):
1181
    """Computes the list of nodes and their attributes.
1182

1183
    """
1184
    all_info = lu.cfg.GetAllNodesInfo()
1185

    
1186
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1187

    
1188
    # Gather data as requested
1189
    if query.NQ_LIVE in self.requested_data:
1190
      # filter out non-vm_capable nodes
1191
      toquery_node_uuids = [node.uuid for node in all_info.values()
1192
                            if node.vm_capable and node.uuid in node_uuids]
1193
      lvm_enabled = utils.storage.IsLvmEnabled(
1194
          lu.cfg.GetClusterInfo().enabled_disk_templates)
1195
      # FIXME: this per default asks for storage space information for all
1196
      # enabled disk templates. Fix this by making it possible to specify
1197
      # space report fields for specific disk templates.
1198
      raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1199
          lu.cfg, include_spindles=lvm_enabled)
1200
      storage_units = rpc.PrepareStorageUnitsForNodes(
1201
          lu.cfg, raw_storage_units, toquery_node_uuids)
1202
      default_hypervisor = lu.cfg.GetHypervisorType()
1203
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1204
      hvspecs = [(default_hypervisor, hvparams)]
1205
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1206
                                        hvspecs)
1207
      live_data = dict(
1208
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1209
                                        require_spindles=lvm_enabled))
1210
          for (uuid, nresult) in node_data.items()
1211
          if not nresult.fail_msg and nresult.payload)
1212
    else:
1213
      live_data = None
1214

    
1215
    if query.NQ_INST in self.requested_data:
1216
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1217
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1218

    
1219
      inst_data = lu.cfg.GetAllInstancesInfo()
1220
      inst_uuid_to_inst_name = {}
1221

    
1222
      for inst in inst_data.values():
1223
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1224
        if inst.primary_node in node_to_primary:
1225
          node_to_primary[inst.primary_node].add(inst.uuid)
1226
        for secnode in inst.secondary_nodes:
1227
          if secnode in node_to_secondary:
1228
            node_to_secondary[secnode].add(inst.uuid)
1229
    else:
1230
      node_to_primary = None
1231
      node_to_secondary = None
1232
      inst_uuid_to_inst_name = None
1233

    
1234
    if query.NQ_OOB in self.requested_data:
1235
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1236
                         for uuid, node in all_info.iteritems())
1237
    else:
1238
      oob_support = None
1239

    
1240
    if query.NQ_GROUP in self.requested_data:
1241
      groups = lu.cfg.GetAllNodeGroupsInfo()
1242
    else:
1243
      groups = {}
1244

    
1245
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1246
                               live_data, lu.cfg.GetMasterNode(),
1247
                               node_to_primary, node_to_secondary,
1248
                               inst_uuid_to_inst_name, groups, oob_support,
1249
                               lu.cfg.GetClusterInfo())
1250

    
1251

    
1252
class LUNodeQuery(NoHooksLU):
1253
  """Logical unit for querying nodes.
1254

1255
  """
1256
  # pylint: disable=W0142
1257
  REQ_BGL = False
1258

    
1259
  def CheckArguments(self):
1260
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1261
                         self.op.output_fields, self.op.use_locking)
1262

    
1263
  def ExpandNames(self):
1264
    self.nq.ExpandNames(self)
1265

    
1266
  def DeclareLocks(self, level):
1267
    self.nq.DeclareLocks(self, level)
1268

    
1269
  def Exec(self, feedback_fn):
1270
    return self.nq.OldStyleQuery(self)
1271

    
1272

    
1273
def _CheckOutputFields(static, dynamic, selected):
1274
  """Checks whether all selected fields are valid.
1275

1276
  @type static: L{utils.FieldSet}
1277
  @param static: static fields set
1278
  @type dynamic: L{utils.FieldSet}
1279
  @param dynamic: dynamic fields set
1280

1281
  """
1282
  f = utils.FieldSet()
1283
  f.Extend(static)
1284
  f.Extend(dynamic)
1285

    
1286
  delta = f.NonMatching(selected)
1287
  if delta:
1288
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1289
                               % ",".join(delta), errors.ECODE_INVAL)
1290

    
1291

    
1292
class LUNodeQueryvols(NoHooksLU):
1293
  """Logical unit for getting volumes on node(s).
1294

1295
  """
1296
  REQ_BGL = False
1297
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1298
  _FIELDS_STATIC = utils.FieldSet("node")
1299

    
1300
  def CheckArguments(self):
1301
    _CheckOutputFields(static=self._FIELDS_STATIC,
1302
                       dynamic=self._FIELDS_DYNAMIC,
1303
                       selected=self.op.output_fields)
1304

    
1305
  def ExpandNames(self):
1306
    self.share_locks = ShareAll()
1307

    
1308
    if self.op.nodes:
1309
      self.needed_locks = {
1310
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1311
        }
1312
    else:
1313
      self.needed_locks = {
1314
        locking.LEVEL_NODE: locking.ALL_SET,
1315
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1316
        }
1317

    
1318
  def Exec(self, feedback_fn):
1319
    """Computes the list of nodes and their attributes.
1320

1321
    """
1322
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1323
    volumes = self.rpc.call_node_volumes(node_uuids)
1324

    
1325
    ilist = self.cfg.GetAllInstancesInfo()
1326
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1327

    
1328
    output = []
1329
    for node_uuid in node_uuids:
1330
      nresult = volumes[node_uuid]
1331
      if nresult.offline:
1332
        continue
1333
      msg = nresult.fail_msg
1334
      if msg:
1335
        self.LogWarning("Can't compute volume data on node %s: %s",
1336
                        self.cfg.GetNodeName(node_uuid), msg)
1337
        continue
1338

    
1339
      node_vols = sorted(nresult.payload,
1340
                         key=operator.itemgetter("dev"))
1341

    
1342
      for vol in node_vols:
1343
        node_output = []
1344
        for field in self.op.output_fields:
1345
          if field == "node":
1346
            val = self.cfg.GetNodeName(node_uuid)
1347
          elif field == "phys":
1348
            val = vol["dev"]
1349
          elif field == "vg":
1350
            val = vol["vg"]
1351
          elif field == "name":
1352
            val = vol["name"]
1353
          elif field == "size":
1354
            val = int(float(vol["size"]))
1355
          elif field == "instance":
1356
            inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1357
                                None)
1358
            if inst is not None:
1359
              val = inst.name
1360
            else:
1361
              val = "-"
1362
          else:
1363
            raise errors.ParameterError(field)
1364
          node_output.append(str(val))
1365

    
1366
        output.append(node_output)
1367

    
1368
    return output
1369

    
1370

    
1371
class LUNodeQueryStorage(NoHooksLU):
1372
  """Logical unit for getting information on storage units on node(s).
1373

1374
  """
1375
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1376
  REQ_BGL = False
1377

    
1378
  def CheckArguments(self):
1379
    _CheckOutputFields(static=self._FIELDS_STATIC,
1380
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1381
                       selected=self.op.output_fields)
1382

    
1383
  def ExpandNames(self):
1384
    self.share_locks = ShareAll()
1385

    
1386
    if self.op.nodes:
1387
      self.needed_locks = {
1388
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1389
        }
1390
    else:
1391
      self.needed_locks = {
1392
        locking.LEVEL_NODE: locking.ALL_SET,
1393
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1394
        }
1395

    
1396
  def CheckPrereq(self):
1397
    """Check prerequisites.
1398

1399
    """
1400
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1401

    
1402
  def Exec(self, feedback_fn):
1403
    """Computes the list of nodes and their attributes.
1404

1405
    """
1406
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1407

    
1408
    # Always get name to sort by
1409
    if constants.SF_NAME in self.op.output_fields:
1410
      fields = self.op.output_fields[:]
1411
    else:
1412
      fields = [constants.SF_NAME] + self.op.output_fields
1413

    
1414
    # Never ask for node or type as it's only known to the LU
1415
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1416
      while extra in fields:
1417
        fields.remove(extra)
1418

    
1419
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1420
    name_idx = field_idx[constants.SF_NAME]
1421

    
1422
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1423
    data = self.rpc.call_storage_list(self.node_uuids,
1424
                                      self.op.storage_type, st_args,
1425
                                      self.op.name, fields)
1426

    
1427
    result = []
1428

    
1429
    for node_uuid in utils.NiceSort(self.node_uuids):
1430
      node_name = self.cfg.GetNodeName(node_uuid)
1431
      nresult = data[node_uuid]
1432
      if nresult.offline:
1433
        continue
1434

    
1435
      msg = nresult.fail_msg
1436
      if msg:
1437
        self.LogWarning("Can't get storage data from node %s: %s",
1438
                        node_name, msg)
1439
        continue
1440

    
1441
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1442

    
1443
      for name in utils.NiceSort(rows.keys()):
1444
        row = rows[name]
1445

    
1446
        out = []
1447

    
1448
        for field in self.op.output_fields:
1449
          if field == constants.SF_NODE:
1450
            val = node_name
1451
          elif field == constants.SF_TYPE:
1452
            val = self.op.storage_type
1453
          elif field in field_idx:
1454
            val = row[field_idx[field]]
1455
          else:
1456
            raise errors.ParameterError(field)
1457

    
1458
          out.append(val)
1459

    
1460
        result.append(out)
1461

    
1462
    return result
1463

    
1464

    
1465
class LUNodeRemove(LogicalUnit):
1466
  """Logical unit for removing a node.
1467

1468
  """
1469
  HPATH = "node-remove"
1470
  HTYPE = constants.HTYPE_NODE
1471

    
1472
  def BuildHooksEnv(self):
1473
    """Build hooks env.
1474

1475
    """
1476
    return {
1477
      "OP_TARGET": self.op.node_name,
1478
      "NODE_NAME": self.op.node_name,
1479
      }
1480

    
1481
  def BuildHooksNodes(self):
1482
    """Build hooks nodes.
1483

1484
    This doesn't run on the target node in the pre phase as a failed
1485
    node would then be impossible to remove.
1486

1487
    """
1488
    all_nodes = self.cfg.GetNodeList()
1489
    try:
1490
      all_nodes.remove(self.op.node_uuid)
1491
    except ValueError:
1492
      pass
1493
    return (all_nodes, all_nodes)
1494

    
1495
  def CheckPrereq(self):
1496
    """Check prerequisites.
1497

1498
    This checks:
1499
     - the node exists in the configuration
1500
     - it does not have primary or secondary instances
1501
     - it's not the master
1502

1503
    Any errors are signaled by raising errors.OpPrereqError.
1504

1505
    """
1506
    (self.op.node_uuid, self.op.node_name) = \
1507
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1508
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1509
    assert node is not None
1510

    
1511
    masternode = self.cfg.GetMasterNode()
1512
    if node.uuid == masternode:
1513
      raise errors.OpPrereqError("Node is the master node, failover to another"
1514
                                 " node is required", errors.ECODE_INVAL)
1515

    
1516
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1517
      if node.uuid in instance.all_nodes:
1518
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1519
                                   " please remove first" % instance.name,
1520
                                   errors.ECODE_INVAL)
1521
    self.op.node_name = node.name
1522
    self.node = node
1523

    
1524
  def Exec(self, feedback_fn):
1525
    """Removes the node from the cluster.
1526

1527
    """
1528
    logging.info("Stopping the node daemon and removing configs from node %s",
1529
                 self.node.name)
1530

    
1531
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1532

    
1533
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1534
      "Not owning BGL"
1535

    
1536
    # Promote nodes to master candidate as needed
1537
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1538
    self.context.RemoveNode(self.node)
1539

    
1540
    # Run post hooks on the node before it's removed
1541
    RunPostHook(self, self.node.name)
1542

    
1543
    # we have to call this by name rather than by UUID, as the node is no longer
1544
    # in the config
1545
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1546
    msg = result.fail_msg
1547
    if msg:
1548
      self.LogWarning("Errors encountered on the remote node while leaving"
1549
                      " the cluster: %s", msg)
1550

    
1551
    # Remove node from our /etc/hosts
1552
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1553
      master_node_uuid = self.cfg.GetMasterNode()
1554
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1555
                                              constants.ETC_HOSTS_REMOVE,
1556
                                              self.node.name, None)
1557
      result.Raise("Can't update hosts file with new host data")
1558
      RedistributeAncillaryFiles(self)
1559

    
1560

    
1561
class LURepairNodeStorage(NoHooksLU):
1562
  """Repairs the volume group on a node.
1563

1564
  """
1565
  REQ_BGL = False
1566

    
1567
  def CheckArguments(self):
1568
    (self.op.node_uuid, self.op.node_name) = \
1569
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1570

    
1571
    storage_type = self.op.storage_type
1572

    
1573
    if (constants.SO_FIX_CONSISTENCY not in
1574
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1575
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1576
                                 " repaired" % storage_type,
1577
                                 errors.ECODE_INVAL)
1578

    
1579
  def ExpandNames(self):
1580
    self.needed_locks = {
1581
      locking.LEVEL_NODE: [self.op.node_uuid],
1582
      }
1583

    
1584
  def _CheckFaultyDisks(self, instance, node_uuid):
1585
    """Ensure faulty disks abort the opcode or at least warn."""
1586
    try:
1587
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1588
                                 node_uuid, True):
1589
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1590
                                   " node '%s'" %
1591
                                   (instance.name,
1592
                                    self.cfg.GetNodeName(node_uuid)),
1593
                                   errors.ECODE_STATE)
1594
    except errors.OpPrereqError, err:
1595
      if self.op.ignore_consistency:
1596
        self.LogWarning(str(err.args[0]))
1597
      else:
1598
        raise
1599

    
1600
  def CheckPrereq(self):
1601
    """Check prerequisites.
1602

1603
    """
1604
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1605

    
1606
    # Check whether any instance on this node has faulty disks
1607
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1608
      if not inst.disks_active:
1609
        continue
1610
      check_nodes = set(inst.all_nodes)
1611
      check_nodes.discard(self.op.node_uuid)
1612
      for inst_node_uuid in check_nodes:
1613
        self._CheckFaultyDisks(inst, inst_node_uuid)
1614

    
1615
  def Exec(self, feedback_fn):
1616
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1617
                (self.op.name, self.op.node_name))
1618

    
1619
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1620
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1621
                                           self.op.storage_type, st_args,
1622
                                           self.op.name,
1623
                                           constants.SO_FIX_CONSISTENCY)
1624
    result.Raise("Failed to repair storage unit '%s' on %s" %
1625
                 (self.op.name, self.op.node_name))