Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 07e68848

History | View | Annotate | Download (60.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
142

    
143
  def CheckPrereq(self):
144
    """Check prerequisites.
145

146
    This checks:
147
     - the new node is not already in the config
148
     - it is resolvable
149
     - its parameters (single/dual homed) matches the cluster
150

151
    Any errors are signaled by raising errors.OpPrereqError.
152

153
    """
154
    node_name = self.hostname.name
155
    self.op.primary_ip = self.hostname.ip
156
    if self.op.secondary_ip is None:
157
      if self.primary_ip_family == netutils.IP6Address.family:
158
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159
                                   " IPv4 address must be given as secondary",
160
                                   errors.ECODE_INVAL)
161
      self.op.secondary_ip = self.op.primary_ip
162

    
163
    secondary_ip = self.op.secondary_ip
164
    if not netutils.IP4Address.IsValid(secondary_ip):
165
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166
                                 " address" % secondary_ip, errors.ECODE_INVAL)
167

    
168
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169
    if not self.op.readd and existing_node_info is not None:
170
      raise errors.OpPrereqError("Node %s is already in the configuration" %
171
                                 node_name, errors.ECODE_EXISTS)
172
    elif self.op.readd and existing_node_info is None:
173
      raise errors.OpPrereqError("Node %s is not in the configuration" %
174
                                 node_name, errors.ECODE_NOENT)
175

    
176
    self.changed_primary_ip = False
177

    
178
    for existing_node in self.cfg.GetAllNodesInfo().values():
179
      if self.op.readd and node_name == existing_node.name:
180
        if existing_node.secondary_ip != secondary_ip:
181
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
182
                                     " address configuration as before",
183
                                     errors.ECODE_INVAL)
184
        if existing_node.primary_ip != self.op.primary_ip:
185
          self.changed_primary_ip = True
186

    
187
        continue
188

    
189
      if (existing_node.primary_ip == self.op.primary_ip or
190
          existing_node.secondary_ip == self.op.primary_ip or
191
          existing_node.primary_ip == secondary_ip or
192
          existing_node.secondary_ip == secondary_ip):
193
        raise errors.OpPrereqError("New node ip address(es) conflict with"
194
                                   " existing node %s" % existing_node.name,
195
                                   errors.ECODE_NOTUNIQUE)
196

    
197
    # After this 'if' block, None is no longer a valid value for the
198
    # _capable op attributes
199
    if self.op.readd:
200
      assert existing_node_info is not None, \
201
        "Can't retrieve locked node %s" % node_name
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, getattr(existing_node_info, attr))
205
    else:
206
      for attr in self._NFLAGS:
207
        if getattr(self.op, attr) is None:
208
          setattr(self.op, attr, True)
209

    
210
    if self.op.readd and not self.op.vm_capable:
211
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212
      if pri or sec:
213
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214
                                   " flag set to false, but it already holds"
215
                                   " instances" % node_name,
216
                                   errors.ECODE_STATE)
217

    
218
    # check that the type of the node (single versus dual homed) is the
219
    # same as for the master
220
    myself = self.cfg.GetMasterNodeInfo()
221
    master_singlehomed = myself.secondary_ip == myself.primary_ip
222
    newbie_singlehomed = secondary_ip == self.op.primary_ip
223
    if master_singlehomed != newbie_singlehomed:
224
      if master_singlehomed:
225
        raise errors.OpPrereqError("The master has no secondary ip but the"
226
                                   " new node has one",
227
                                   errors.ECODE_INVAL)
228
      else:
229
        raise errors.OpPrereqError("The master has a secondary ip but the"
230
                                   " new node doesn't have one",
231
                                   errors.ECODE_INVAL)
232

    
233
    # checks reachability
234
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235
      raise errors.OpPrereqError("Node not reachable by ping",
236
                                 errors.ECODE_ENVIRON)
237

    
238
    if not newbie_singlehomed:
239
      # check reachability from my secondary ip to newbie's secondary ip
240
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241
                              source=myself.secondary_ip):
242
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243
                                   " based ping to node daemon port",
244
                                   errors.ECODE_ENVIRON)
245

    
246
    if self.op.readd:
247
      exceptions = [existing_node_info.uuid]
248
    else:
249
      exceptions = []
250

    
251
    if self.op.master_capable:
252
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253
    else:
254
      self.master_candidate = False
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      node_group = self.cfg.LookupNodeGroup(self.op.group)
260
      self.new_node = objects.Node(name=node_name,
261
                                   primary_ip=self.op.primary_ip,
262
                                   secondary_ip=secondary_ip,
263
                                   master_candidate=self.master_candidate,
264
                                   offline=False, drained=False,
265
                                   group=node_group, ndparams={})
266

    
267
    if self.op.ndparams:
268
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270
                           "node", "cluster or group")
271

    
272
    if self.op.hv_state:
273
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274

    
275
    if self.op.disk_state:
276
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277

    
278
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279
    #       it a property on the base class.
280
    rpcrunner = rpc.DnsOnlyRunner()
281
    result = rpcrunner.call_version([node_name])[node_name]
282
    result.Raise("Can't get version information from node %s" % node_name)
283
    if constants.PROTOCOL_VERSION == result.payload:
284
      logging.info("Communication to node %s fine, sw version %s match",
285
                   node_name, result.payload)
286
    else:
287
      raise errors.OpPrereqError("Version mismatch master version %s,"
288
                                 " node version %s" %
289
                                 (constants.PROTOCOL_VERSION, result.payload),
290
                                 errors.ECODE_ENVIRON)
291

    
292
    vg_name = self.cfg.GetVGName()
293
    if vg_name is not None:
294
      vparams = {constants.NV_PVLIST: [vg_name]}
295
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296
      cname = self.cfg.GetClusterName()
297
      result = rpcrunner.call_node_verify_light(
298
          [node_name], vparams, cname,
299
          self.cfg.GetClusterInfo().hvparams)[node_name]
300
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301
      if errmsgs:
302
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
304

    
305
  def _InitOpenVSwitch(self):
306
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
307
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
308

    
309
    ovs = filled_ndparams.get(constants.ND_OVS, None)
310
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
311
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
312

    
313
    if ovs:
314
      if not ovs_link:
315
        self.LogInfo("No physical interface for OpenvSwitch was given."
316
                     " OpenvSwitch will not have an outside connection. This"
317
                     " might not be what you want.")
318

    
319
      result = self.rpc.call_node_configure_ovs(
320
                 self.new_node.name, ovs_name, ovs_link)
321
      result.Raise("Failed to initialize OpenVSwitch on new node")
322

    
323
  def Exec(self, feedback_fn):
324
    """Adds the new node to the cluster.
325

326
    """
327
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
328
      "Not owning BGL"
329

    
330
    # We adding a new node so we assume it's powered
331
    self.new_node.powered = True
332

    
333
    # for re-adds, reset the offline/drained/master-candidate flags;
334
    # we need to reset here, otherwise offline would prevent RPC calls
335
    # later in the procedure; this also means that if the re-add
336
    # fails, we are left with a non-offlined, broken node
337
    if self.op.readd:
338
      self.new_node.drained = False
339
      self.LogInfo("Readding a node, the offline/drained flags were reset")
340
      # if we demote the node, we do cleanup later in the procedure
341
      self.new_node.master_candidate = self.master_candidate
342
      if self.changed_primary_ip:
343
        self.new_node.primary_ip = self.op.primary_ip
344

    
345
    # copy the master/vm_capable flags
346
    for attr in self._NFLAGS:
347
      setattr(self.new_node, attr, getattr(self.op, attr))
348

    
349
    # notify the user about any possible mc promotion
350
    if self.new_node.master_candidate:
351
      self.LogInfo("Node will be a master candidate")
352

    
353
    if self.op.ndparams:
354
      self.new_node.ndparams = self.op.ndparams
355
    else:
356
      self.new_node.ndparams = {}
357

    
358
    if self.op.hv_state:
359
      self.new_node.hv_state_static = self.new_hv_state
360

    
361
    if self.op.disk_state:
362
      self.new_node.disk_state_static = self.new_disk_state
363

    
364
    # Add node to our /etc/hosts, and add key to known_hosts
365
    if self.cfg.GetClusterInfo().modify_etc_hosts:
366
      master_node = self.cfg.GetMasterNode()
367
      result = self.rpc.call_etc_hosts_modify(
368
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
369
                 self.hostname.ip)
370
      result.Raise("Can't update hosts file with new host data")
371

    
372
    if self.new_node.secondary_ip != self.new_node.primary_ip:
373
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
374
                               False)
375

    
376
    node_verifier_uuids = [self.cfg.GetMasterNode()]
377
    node_verify_param = {
378
      constants.NV_NODELIST: ([self.new_node.name], {}),
379
      # TODO: do a node-net-test as well?
380
    }
381

    
382
    result = self.rpc.call_node_verify(
383
               node_verifier_uuids, node_verify_param,
384
               self.cfg.GetClusterName(),
385
               self.cfg.GetClusterInfo().hvparams)
386
    for verifier in node_verifier_uuids:
387
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
388
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
389
      if nl_payload:
390
        for failed in nl_payload:
391
          feedback_fn("ssh/hostname verification failed"
392
                      " (checking from %s): %s" %
393
                      (verifier, nl_payload[failed]))
394
        raise errors.OpExecError("ssh/hostname verification failed")
395

    
396
    self._InitOpenVSwitch()
397

    
398
    if self.op.readd:
399
      self.context.ReaddNode(self.new_node)
400
      RedistributeAncillaryFiles(self)
401
      # make sure we redistribute the config
402
      self.cfg.Update(self.new_node, feedback_fn)
403
      # and make sure the new node will not have old files around
404
      if not self.new_node.master_candidate:
405
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
406
        result.Warn("Node failed to demote itself from master candidate status",
407
                    self.LogWarning)
408
    else:
409
      self.context.AddNode(self.new_node, self.proc.GetECId())
410
      RedistributeAncillaryFiles(self)
411

    
412

    
413
class LUNodeSetParams(LogicalUnit):
414
  """Modifies the parameters of a node.
415

416
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
417
      to the node role (as _ROLE_*)
418
  @cvar _R2F: a dictionary from node role to tuples of flags
419
  @cvar _FLAGS: a list of attribute names corresponding to the flags
420

421
  """
422
  HPATH = "node-modify"
423
  HTYPE = constants.HTYPE_NODE
424
  REQ_BGL = False
425
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
426
  _F2R = {
427
    (True, False, False): _ROLE_CANDIDATE,
428
    (False, True, False): _ROLE_DRAINED,
429
    (False, False, True): _ROLE_OFFLINE,
430
    (False, False, False): _ROLE_REGULAR,
431
    }
432
  _R2F = dict((v, k) for k, v in _F2R.items())
433
  _FLAGS = ["master_candidate", "drained", "offline"]
434

    
435
  def CheckArguments(self):
436
    (self.op.node_uuid, self.op.node_name) = \
437
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
438
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
439
                self.op.master_capable, self.op.vm_capable,
440
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
441
                self.op.disk_state]
442
    if all_mods.count(None) == len(all_mods):
443
      raise errors.OpPrereqError("Please pass at least one modification",
444
                                 errors.ECODE_INVAL)
445
    if all_mods.count(True) > 1:
446
      raise errors.OpPrereqError("Can't set the node into more than one"
447
                                 " state at the same time",
448
                                 errors.ECODE_INVAL)
449

    
450
    # Boolean value that tells us whether we might be demoting from MC
451
    self.might_demote = (self.op.master_candidate is False or
452
                         self.op.offline is True or
453
                         self.op.drained is True or
454
                         self.op.master_capable is False)
455

    
456
    if self.op.secondary_ip:
457
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
458
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
459
                                   " address" % self.op.secondary_ip,
460
                                   errors.ECODE_INVAL)
461

    
462
    self.lock_all = self.op.auto_promote and self.might_demote
463
    self.lock_instances = self.op.secondary_ip is not None
464

    
465
  def _InstanceFilter(self, instance):
466
    """Filter for getting affected instances.
467

468
    """
469
    return (instance.disk_template in constants.DTS_INT_MIRROR and
470
            self.op.node_uuid in instance.all_nodes)
471

    
472
  def ExpandNames(self):
473
    if self.lock_all:
474
      self.needed_locks = {
475
        locking.LEVEL_NODE: locking.ALL_SET,
476

    
477
        # Block allocations when all nodes are locked
478
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
479
        }
480
    else:
481
      self.needed_locks = {
482
        locking.LEVEL_NODE: self.op.node_uuid,
483
        }
484

    
485
    # Since modifying a node can have severe effects on currently running
486
    # operations the resource lock is at least acquired in shared mode
487
    self.needed_locks[locking.LEVEL_NODE_RES] = \
488
      self.needed_locks[locking.LEVEL_NODE]
489

    
490
    # Get all locks except nodes in shared mode; they are not used for anything
491
    # but read-only access
492
    self.share_locks = ShareAll()
493
    self.share_locks[locking.LEVEL_NODE] = 0
494
    self.share_locks[locking.LEVEL_NODE_RES] = 0
495
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
496

    
497
    if self.lock_instances:
498
      self.needed_locks[locking.LEVEL_INSTANCE] = \
499
        self.cfg.GetInstanceNames(
500
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
501

    
502
  def BuildHooksEnv(self):
503
    """Build hooks env.
504

505
    This runs on the master node.
506

507
    """
508
    return {
509
      "OP_TARGET": self.op.node_name,
510
      "MASTER_CANDIDATE": str(self.op.master_candidate),
511
      "OFFLINE": str(self.op.offline),
512
      "DRAINED": str(self.op.drained),
513
      "MASTER_CAPABLE": str(self.op.master_capable),
514
      "VM_CAPABLE": str(self.op.vm_capable),
515
      }
516

    
517
  def BuildHooksNodes(self):
518
    """Build hooks nodes.
519

520
    """
521
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
522
    return (nl, nl)
523

    
524
  def CheckPrereq(self):
525
    """Check prerequisites.
526

527
    This only checks the instance list against the existing names.
528

529
    """
530
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
531
    if self.lock_instances:
532
      affected_instances = \
533
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
534

    
535
      # Verify instance locks
536
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
537
      wanted_instance_names = frozenset([inst.name for inst in
538
                                         affected_instances.values()])
539
      if wanted_instance_names - owned_instance_names:
540
        raise errors.OpPrereqError("Instances affected by changing node %s's"
541
                                   " secondary IP address have changed since"
542
                                   " locks were acquired, wanted '%s', have"
543
                                   " '%s'; retry the operation" %
544
                                   (node.name,
545
                                    utils.CommaJoin(wanted_instance_names),
546
                                    utils.CommaJoin(owned_instance_names)),
547
                                   errors.ECODE_STATE)
548
    else:
549
      affected_instances = None
550

    
551
    if (self.op.master_candidate is not None or
552
        self.op.drained is not None or
553
        self.op.offline is not None):
554
      # we can't change the master's node flags
555
      if node.uuid == self.cfg.GetMasterNode():
556
        raise errors.OpPrereqError("The master role can be changed"
557
                                   " only via master-failover",
558
                                   errors.ECODE_INVAL)
559

    
560
    if self.op.master_candidate and not node.master_capable:
561
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
562
                                 " it a master candidate" % node.name,
563
                                 errors.ECODE_STATE)
564

    
565
    if self.op.vm_capable is False:
566
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
567
      if ipri or isec:
568
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
569
                                   " the vm_capable flag" % node.name,
570
                                   errors.ECODE_STATE)
571

    
572
    if node.master_candidate and self.might_demote and not self.lock_all:
573
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
574
      # check if after removing the current node, we're missing master
575
      # candidates
576
      (mc_remaining, mc_should, _) = \
577
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
578
      if mc_remaining < mc_should:
579
        raise errors.OpPrereqError("Not enough master candidates, please"
580
                                   " pass auto promote option to allow"
581
                                   " promotion (--auto-promote or RAPI"
582
                                   " auto_promote=True)", errors.ECODE_STATE)
583

    
584
    self.old_flags = old_flags = (node.master_candidate,
585
                                  node.drained, node.offline)
586
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
587
    self.old_role = old_role = self._F2R[old_flags]
588

    
589
    # Check for ineffective changes
590
    for attr in self._FLAGS:
591
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
592
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
593
        setattr(self.op, attr, None)
594

    
595
    # Past this point, any flag change to False means a transition
596
    # away from the respective state, as only real changes are kept
597

    
598
    # TODO: We might query the real power state if it supports OOB
599
    if SupportsOob(self.cfg, node):
600
      if self.op.offline is False and not (node.powered or
601
                                           self.op.powered is True):
602
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
603
                                    " offline status can be reset") %
604
                                   self.op.node_name, errors.ECODE_STATE)
605
    elif self.op.powered is not None:
606
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
607
                                  " as it does not support out-of-band"
608
                                  " handling") % self.op.node_name,
609
                                 errors.ECODE_STATE)
610

    
611
    # If we're being deofflined/drained, we'll MC ourself if needed
612
    if (self.op.drained is False or self.op.offline is False or
613
        (self.op.master_capable and not node.master_capable)):
614
      if _DecideSelfPromotion(self):
615
        self.op.master_candidate = True
616
        self.LogInfo("Auto-promoting node to master candidate")
617

    
618
    # If we're no longer master capable, we'll demote ourselves from MC
619
    if self.op.master_capable is False and node.master_candidate:
620
      self.LogInfo("Demoting from master candidate")
621
      self.op.master_candidate = False
622

    
623
    # Compute new role
624
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
625
    if self.op.master_candidate:
626
      new_role = self._ROLE_CANDIDATE
627
    elif self.op.drained:
628
      new_role = self._ROLE_DRAINED
629
    elif self.op.offline:
630
      new_role = self._ROLE_OFFLINE
631
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
632
      # False is still in new flags, which means we're un-setting (the
633
      # only) True flag
634
      new_role = self._ROLE_REGULAR
635
    else: # no new flags, nothing, keep old role
636
      new_role = old_role
637

    
638
    self.new_role = new_role
639

    
640
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
641
      # Trying to transition out of offline status
642
      result = self.rpc.call_version([node.uuid])[node.uuid]
643
      if result.fail_msg:
644
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
645
                                   " to report its version: %s" %
646
                                   (node.name, result.fail_msg),
647
                                   errors.ECODE_STATE)
648
      else:
649
        self.LogWarning("Transitioning node from offline to online state"
650
                        " without using re-add. Please make sure the node"
651
                        " is healthy!")
652

    
653
    # When changing the secondary ip, verify if this is a single-homed to
654
    # multi-homed transition or vice versa, and apply the relevant
655
    # restrictions.
656
    if self.op.secondary_ip:
657
      # Ok even without locking, because this can't be changed by any LU
658
      master = self.cfg.GetMasterNodeInfo()
659
      master_singlehomed = master.secondary_ip == master.primary_ip
660
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
661
        if self.op.force and node.uuid == master.uuid:
662
          self.LogWarning("Transitioning from single-homed to multi-homed"
663
                          " cluster; all nodes will require a secondary IP"
664
                          " address")
665
        else:
666
          raise errors.OpPrereqError("Changing the secondary ip on a"
667
                                     " single-homed cluster requires the"
668
                                     " --force option to be passed, and the"
669
                                     " target node to be the master",
670
                                     errors.ECODE_INVAL)
671
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
672
        if self.op.force and node.uuid == master.uuid:
673
          self.LogWarning("Transitioning from multi-homed to single-homed"
674
                          " cluster; secondary IP addresses will have to be"
675
                          " removed")
676
        else:
677
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
678
                                     " same as the primary IP on a multi-homed"
679
                                     " cluster, unless the --force option is"
680
                                     " passed, and the target node is the"
681
                                     " master", errors.ECODE_INVAL)
682

    
683
      assert not (set([inst.name for inst in affected_instances.values()]) -
684
                  self.owned_locks(locking.LEVEL_INSTANCE))
685

    
686
      if node.offline:
687
        if affected_instances:
688
          msg = ("Cannot change secondary IP address: offline node has"
689
                 " instances (%s) configured to use it" %
690
                 utils.CommaJoin(
691
                   [inst.name for inst in affected_instances.values()]))
692
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
693
      else:
694
        # On online nodes, check that no instances are running, and that
695
        # the node has the new ip and we can reach it.
696
        for instance in affected_instances.values():
697
          CheckInstanceState(self, instance, INSTANCE_DOWN,
698
                             msg="cannot change secondary ip")
699

    
700
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
701
        if master.uuid != node.uuid:
702
          # check reachability from master secondary ip to new secondary ip
703
          if not netutils.TcpPing(self.op.secondary_ip,
704
                                  constants.DEFAULT_NODED_PORT,
705
                                  source=master.secondary_ip):
706
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
707
                                       " based ping to node daemon port",
708
                                       errors.ECODE_ENVIRON)
709

    
710
    if self.op.ndparams:
711
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
712
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
713
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
714
                           "node", "cluster or group")
715
      self.new_ndparams = new_ndparams
716

    
717
    if self.op.hv_state:
718
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
719
                                                node.hv_state_static)
720

    
721
    if self.op.disk_state:
722
      self.new_disk_state = \
723
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
724

    
725
  def Exec(self, feedback_fn):
726
    """Modifies a node.
727

728
    """
729
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
730
    result = []
731

    
732
    if self.op.ndparams:
733
      node.ndparams = self.new_ndparams
734

    
735
    if self.op.powered is not None:
736
      node.powered = self.op.powered
737

    
738
    if self.op.hv_state:
739
      node.hv_state_static = self.new_hv_state
740

    
741
    if self.op.disk_state:
742
      node.disk_state_static = self.new_disk_state
743

    
744
    for attr in ["master_capable", "vm_capable"]:
745
      val = getattr(self.op, attr)
746
      if val is not None:
747
        setattr(node, attr, val)
748
        result.append((attr, str(val)))
749

    
750
    if self.new_role != self.old_role:
751
      # Tell the node to demote itself, if no longer MC and not offline
752
      if self.old_role == self._ROLE_CANDIDATE and \
753
          self.new_role != self._ROLE_OFFLINE:
754
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
755
        if msg:
756
          self.LogWarning("Node failed to demote itself: %s", msg)
757

    
758
      new_flags = self._R2F[self.new_role]
759
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
760
        if of != nf:
761
          result.append((desc, str(nf)))
762
      (node.master_candidate, node.drained, node.offline) = new_flags
763

    
764
      # we locked all nodes, we adjust the CP before updating this node
765
      if self.lock_all:
766
        AdjustCandidatePool(self, [node.uuid])
767

    
768
    if self.op.secondary_ip:
769
      node.secondary_ip = self.op.secondary_ip
770
      result.append(("secondary_ip", self.op.secondary_ip))
771

    
772
    # this will trigger configuration file update, if needed
773
    self.cfg.Update(node, feedback_fn)
774

    
775
    # this will trigger job queue propagation or cleanup if the mc
776
    # flag changed
777
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
778
      self.context.ReaddNode(node)
779

    
780
    return result
781

    
782

    
783
class LUNodePowercycle(NoHooksLU):
784
  """Powercycles a node.
785

786
  """
787
  REQ_BGL = False
788

    
789
  def CheckArguments(self):
790
    (self.op.node_uuid, self.op.node_name) = \
791
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
792

    
793
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
794
      raise errors.OpPrereqError("The node is the master and the force"
795
                                 " parameter was not set",
796
                                 errors.ECODE_INVAL)
797

    
798
  def ExpandNames(self):
799
    """Locking for PowercycleNode.
800

801
    This is a last-resort option and shouldn't block on other
802
    jobs. Therefore, we grab no locks.
803

804
    """
805
    self.needed_locks = {}
806

    
807
  def Exec(self, feedback_fn):
808
    """Reboots a node.
809

810
    """
811
    default_hypervisor = self.cfg.GetHypervisorType()
812
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
813
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
814
                                           default_hypervisor,
815
                                           hvparams)
816
    result.Raise("Failed to schedule the reboot")
817
    return result.payload
818

    
819

    
820
def _GetNodeInstancesInner(cfg, fn):
821
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
822

    
823

    
824
def _GetNodePrimaryInstances(cfg, node_uuid):
825
  """Returns primary instances on a node.
826

827
  """
828
  return _GetNodeInstancesInner(cfg,
829
                                lambda inst: node_uuid == inst.primary_node)
830

    
831

    
832
def _GetNodeSecondaryInstances(cfg, node_uuid):
833
  """Returns secondary instances on a node.
834

835
  """
836
  return _GetNodeInstancesInner(cfg,
837
                                lambda inst: node_uuid in inst.secondary_nodes)
838

    
839

    
840
def _GetNodeInstances(cfg, node_uuid):
841
  """Returns a list of all primary and secondary instances on a node.
842

843
  """
844

    
845
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
846

    
847

    
848
class LUNodeEvacuate(NoHooksLU):
849
  """Evacuates instances off a list of nodes.
850

851
  """
852
  REQ_BGL = False
853

    
854
  def CheckArguments(self):
855
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
856

    
857
  def ExpandNames(self):
858
    (self.op.node_uuid, self.op.node_name) = \
859
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
860

    
861
    if self.op.remote_node is not None:
862
      (self.op.remote_node_uuid, self.op.remote_node) = \
863
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
864
                              self.op.remote_node)
865
      assert self.op.remote_node
866

    
867
      if self.op.node_uuid == self.op.remote_node_uuid:
868
        raise errors.OpPrereqError("Can not use evacuated node as a new"
869
                                   " secondary node", errors.ECODE_INVAL)
870

    
871
      if self.op.mode != constants.NODE_EVAC_SEC:
872
        raise errors.OpPrereqError("Without the use of an iallocator only"
873
                                   " secondary instances can be evacuated",
874
                                   errors.ECODE_INVAL)
875

    
876
    # Declare locks
877
    self.share_locks = ShareAll()
878
    self.needed_locks = {
879
      locking.LEVEL_INSTANCE: [],
880
      locking.LEVEL_NODEGROUP: [],
881
      locking.LEVEL_NODE: [],
882
      }
883

    
884
    # Determine nodes (via group) optimistically, needs verification once locks
885
    # have been acquired
886
    self.lock_nodes = self._DetermineNodes()
887

    
888
  def _DetermineNodes(self):
889
    """Gets the list of node UUIDs to operate on.
890

891
    """
892
    if self.op.remote_node is None:
893
      # Iallocator will choose any node(s) in the same group
894
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
895
    else:
896
      group_nodes = frozenset([self.op.remote_node_uuid])
897

    
898
    # Determine nodes to be locked
899
    return set([self.op.node_uuid]) | group_nodes
900

    
901
  def _DetermineInstances(self):
902
    """Builds list of instances to operate on.
903

904
    """
905
    assert self.op.mode in constants.NODE_EVAC_MODES
906

    
907
    if self.op.mode == constants.NODE_EVAC_PRI:
908
      # Primary instances only
909
      inst_fn = _GetNodePrimaryInstances
910
      assert self.op.remote_node is None, \
911
        "Evacuating primary instances requires iallocator"
912
    elif self.op.mode == constants.NODE_EVAC_SEC:
913
      # Secondary instances only
914
      inst_fn = _GetNodeSecondaryInstances
915
    else:
916
      # All instances
917
      assert self.op.mode == constants.NODE_EVAC_ALL
918
      inst_fn = _GetNodeInstances
919
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
920
      # per instance
921
      raise errors.OpPrereqError("Due to an issue with the iallocator"
922
                                 " interface it is not possible to evacuate"
923
                                 " all instances at once; specify explicitly"
924
                                 " whether to evacuate primary or secondary"
925
                                 " instances",
926
                                 errors.ECODE_INVAL)
927

    
928
    return inst_fn(self.cfg, self.op.node_uuid)
929

    
930
  def DeclareLocks(self, level):
931
    if level == locking.LEVEL_INSTANCE:
932
      # Lock instances optimistically, needs verification once node and group
933
      # locks have been acquired
934
      self.needed_locks[locking.LEVEL_INSTANCE] = \
935
        set(i.name for i in self._DetermineInstances())
936

    
937
    elif level == locking.LEVEL_NODEGROUP:
938
      # Lock node groups for all potential target nodes optimistically, needs
939
      # verification once nodes have been acquired
940
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
941
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
942

    
943
    elif level == locking.LEVEL_NODE:
944
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
945

    
946
  def CheckPrereq(self):
947
    # Verify locks
948
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
949
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
950
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
951

    
952
    need_nodes = self._DetermineNodes()
953

    
954
    if not owned_nodes.issuperset(need_nodes):
955
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
956
                                 " locks were acquired, current nodes are"
957
                                 " are '%s', used to be '%s'; retry the"
958
                                 " operation" %
959
                                 (self.op.node_name,
960
                                  utils.CommaJoin(need_nodes),
961
                                  utils.CommaJoin(owned_nodes)),
962
                                 errors.ECODE_STATE)
963

    
964
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
965
    if owned_groups != wanted_groups:
966
      raise errors.OpExecError("Node groups changed since locks were acquired,"
967
                               " current groups are '%s', used to be '%s';"
968
                               " retry the operation" %
969
                               (utils.CommaJoin(wanted_groups),
970
                                utils.CommaJoin(owned_groups)))
971

    
972
    # Determine affected instances
973
    self.instances = self._DetermineInstances()
974
    self.instance_names = [i.name for i in self.instances]
975

    
976
    if set(self.instance_names) != owned_instance_names:
977
      raise errors.OpExecError("Instances on node '%s' changed since locks"
978
                               " were acquired, current instances are '%s',"
979
                               " used to be '%s'; retry the operation" %
980
                               (self.op.node_name,
981
                                utils.CommaJoin(self.instance_names),
982
                                utils.CommaJoin(owned_instance_names)))
983

    
984
    if self.instance_names:
985
      self.LogInfo("Evacuating instances from node '%s': %s",
986
                   self.op.node_name,
987
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
988
    else:
989
      self.LogInfo("No instances to evacuate from node '%s'",
990
                   self.op.node_name)
991

    
992
    if self.op.remote_node is not None:
993
      for i in self.instances:
994
        if i.primary_node == self.op.remote_node_uuid:
995
          raise errors.OpPrereqError("Node %s is the primary node of"
996
                                     " instance %s, cannot use it as"
997
                                     " secondary" %
998
                                     (self.op.remote_node, i.name),
999
                                     errors.ECODE_INVAL)
1000

    
1001
  def Exec(self, feedback_fn):
1002
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1003

    
1004
    if not self.instance_names:
1005
      # No instances to evacuate
1006
      jobs = []
1007

    
1008
    elif self.op.iallocator is not None:
1009
      # TODO: Implement relocation to other group
1010
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1011
                                     instances=list(self.instance_names))
1012
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1013

    
1014
      ial.Run(self.op.iallocator)
1015

    
1016
      if not ial.success:
1017
        raise errors.OpPrereqError("Can't compute node evacuation using"
1018
                                   " iallocator '%s': %s" %
1019
                                   (self.op.iallocator, ial.info),
1020
                                   errors.ECODE_NORES)
1021

    
1022
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1023

    
1024
    elif self.op.remote_node is not None:
1025
      assert self.op.mode == constants.NODE_EVAC_SEC
1026
      jobs = [
1027
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1028
                                        remote_node=self.op.remote_node,
1029
                                        disks=[],
1030
                                        mode=constants.REPLACE_DISK_CHG,
1031
                                        early_release=self.op.early_release)]
1032
        for instance_name in self.instance_names]
1033

    
1034
    else:
1035
      raise errors.ProgrammerError("No iallocator or remote node")
1036

    
1037
    return ResultWithJobs(jobs)
1038

    
1039

    
1040
class LUNodeMigrate(LogicalUnit):
1041
  """Migrate all instances from a node.
1042

1043
  """
1044
  HPATH = "node-migrate"
1045
  HTYPE = constants.HTYPE_NODE
1046
  REQ_BGL = False
1047

    
1048
  def CheckArguments(self):
1049
    pass
1050

    
1051
  def ExpandNames(self):
1052
    (self.op.node_uuid, self.op.node_name) = \
1053
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1054

    
1055
    self.share_locks = ShareAll()
1056
    self.needed_locks = {
1057
      locking.LEVEL_NODE: [self.op.node_uuid],
1058
      }
1059

    
1060
  def BuildHooksEnv(self):
1061
    """Build hooks env.
1062

1063
    This runs on the master, the primary and all the secondaries.
1064

1065
    """
1066
    return {
1067
      "NODE_NAME": self.op.node_name,
1068
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1069
      }
1070

    
1071
  def BuildHooksNodes(self):
1072
    """Build hooks nodes.
1073

1074
    """
1075
    nl = [self.cfg.GetMasterNode()]
1076
    return (nl, nl)
1077

    
1078
  def CheckPrereq(self):
1079
    pass
1080

    
1081
  def Exec(self, feedback_fn):
1082
    # Prepare jobs for migration instances
1083
    jobs = [
1084
      [opcodes.OpInstanceMigrate(
1085
        instance_name=inst.name,
1086
        mode=self.op.mode,
1087
        live=self.op.live,
1088
        iallocator=self.op.iallocator,
1089
        target_node=self.op.target_node,
1090
        allow_runtime_changes=self.op.allow_runtime_changes,
1091
        ignore_ipolicy=self.op.ignore_ipolicy)]
1092
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1093

    
1094
    # TODO: Run iallocator in this opcode and pass correct placement options to
1095
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1096
    # running the iallocator and the actual migration, a good consistency model
1097
    # will have to be found.
1098

    
1099
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1100
            frozenset([self.op.node_uuid]))
1101

    
1102
    return ResultWithJobs(jobs)
1103

    
1104

    
1105
def _GetStorageTypeArgs(cfg, storage_type):
1106
  """Returns the arguments for a storage type.
1107

1108
  """
1109
  # Special case for file storage
1110
  if storage_type == constants.ST_FILE:
1111
    # storage.FileStorage wants a list of storage directories
1112
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1113

    
1114
  return []
1115

    
1116

    
1117
class LUNodeModifyStorage(NoHooksLU):
1118
  """Logical unit for modifying a storage volume on a node.
1119

1120
  """
1121
  REQ_BGL = False
1122

    
1123
  def CheckArguments(self):
1124
    (self.op.node_uuid, self.op.node_name) = \
1125
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1126

    
1127
    storage_type = self.op.storage_type
1128

    
1129
    try:
1130
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1131
    except KeyError:
1132
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1133
                                 " modified" % storage_type,
1134
                                 errors.ECODE_INVAL)
1135

    
1136
    diff = set(self.op.changes.keys()) - modifiable
1137
    if diff:
1138
      raise errors.OpPrereqError("The following fields can not be modified for"
1139
                                 " storage units of type '%s': %r" %
1140
                                 (storage_type, list(diff)),
1141
                                 errors.ECODE_INVAL)
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    """
1147
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1148

    
1149
  def ExpandNames(self):
1150
    self.needed_locks = {
1151
      locking.LEVEL_NODE: self.op.node_uuid,
1152
      }
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Computes the list of nodes and their attributes.
1156

1157
    """
1158
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1159
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1160
                                          self.op.storage_type, st_args,
1161
                                          self.op.name, self.op.changes)
1162
    result.Raise("Failed to modify storage unit '%s' on %s" %
1163
                 (self.op.name, self.op.node_name))
1164

    
1165

    
1166
class NodeQuery(QueryBase):
1167
  FIELDS = query.NODE_FIELDS
1168

    
1169
  def ExpandNames(self, lu):
1170
    lu.needed_locks = {}
1171
    lu.share_locks = ShareAll()
1172

    
1173
    if self.names:
1174
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1175
    else:
1176
      self.wanted = locking.ALL_SET
1177

    
1178
    self.do_locking = (self.use_locking and
1179
                       query.NQ_LIVE in self.requested_data)
1180

    
1181
    if self.do_locking:
1182
      # If any non-static field is requested we need to lock the nodes
1183
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1184
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1185

    
1186
  def DeclareLocks(self, lu, level):
1187
    pass
1188

    
1189
  def _GetQueryData(self, lu):
1190
    """Computes the list of nodes and their attributes.
1191

1192
    """
1193
    all_info = lu.cfg.GetAllNodesInfo()
1194

    
1195
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1196

    
1197
    # Gather data as requested
1198
    if query.NQ_LIVE in self.requested_data:
1199
      # filter out non-vm_capable nodes
1200
      toquery_node_uuids = [node.uuid for node in all_info.values()
1201
                            if node.vm_capable and node.uuid in node_uuids]
1202
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1203
      raw_storage_units = utils.storage.GetStorageUnits(
1204
          lu.cfg, [default_template])
1205
      storage_units = rpc.PrepareStorageUnitsForNodes(
1206
          lu.cfg, raw_storage_units, toquery_node_uuids)
1207
      default_hypervisor = lu.cfg.GetHypervisorType()
1208
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1209
      hvspecs = [(default_hypervisor, hvparams)]
1210
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1211
                                        hvspecs)
1212
      live_data = dict(
1213
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1214
          for (uuid, nresult) in node_data.items()
1215
          if not nresult.fail_msg and nresult.payload)
1216
    else:
1217
      live_data = None
1218

    
1219
    if query.NQ_INST in self.requested_data:
1220
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1221
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1222

    
1223
      inst_data = lu.cfg.GetAllInstancesInfo()
1224
      inst_uuid_to_inst_name = {}
1225

    
1226
      for inst in inst_data.values():
1227
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1228
        if inst.primary_node in node_to_primary:
1229
          node_to_primary[inst.primary_node].add(inst.uuid)
1230
        for secnode in inst.secondary_nodes:
1231
          if secnode in node_to_secondary:
1232
            node_to_secondary[secnode].add(inst.uuid)
1233
    else:
1234
      node_to_primary = None
1235
      node_to_secondary = None
1236
      inst_uuid_to_inst_name = None
1237

    
1238
    if query.NQ_OOB in self.requested_data:
1239
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1240
                         for uuid, node in all_info.iteritems())
1241
    else:
1242
      oob_support = None
1243

    
1244
    if query.NQ_GROUP in self.requested_data:
1245
      groups = lu.cfg.GetAllNodeGroupsInfo()
1246
    else:
1247
      groups = {}
1248

    
1249
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1250
                               live_data, lu.cfg.GetMasterNode(),
1251
                               node_to_primary, node_to_secondary,
1252
                               inst_uuid_to_inst_name, groups, oob_support,
1253
                               lu.cfg.GetClusterInfo())
1254

    
1255

    
1256
class LUNodeQuery(NoHooksLU):
1257
  """Logical unit for querying nodes.
1258

1259
  """
1260
  # pylint: disable=W0142
1261
  REQ_BGL = False
1262

    
1263
  def CheckArguments(self):
1264
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1265
                         self.op.output_fields, self.op.use_locking)
1266

    
1267
  def ExpandNames(self):
1268
    self.nq.ExpandNames(self)
1269

    
1270
  def DeclareLocks(self, level):
1271
    self.nq.DeclareLocks(self, level)
1272

    
1273
  def Exec(self, feedback_fn):
1274
    return self.nq.OldStyleQuery(self)
1275

    
1276

    
1277
def _CheckOutputFields(fields, selected):
1278
  """Checks whether all selected fields are valid according to fields.
1279

1280
  @type fields: L{utils.FieldSet}
1281
  @param fields: fields set
1282
  @type selected: L{utils.FieldSet}
1283
  @param selected: fields set
1284

1285
  """
1286
  delta = fields.NonMatching(selected)
1287
  if delta:
1288
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1289
                               % ",".join(delta), errors.ECODE_INVAL)
1290

    
1291

    
1292
class LUNodeQueryvols(NoHooksLU):
1293
  """Logical unit for getting volumes on node(s).
1294

1295
  """
1296
  REQ_BGL = False
1297

    
1298
  def CheckArguments(self):
1299
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1300
                                      constants.VF_VG, constants.VF_NAME,
1301
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1302
                       self.op.output_fields)
1303

    
1304
  def ExpandNames(self):
1305
    self.share_locks = ShareAll()
1306

    
1307
    if self.op.nodes:
1308
      self.needed_locks = {
1309
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1310
        }
1311
    else:
1312
      self.needed_locks = {
1313
        locking.LEVEL_NODE: locking.ALL_SET,
1314
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1315
        }
1316

    
1317
  def Exec(self, feedback_fn):
1318
    """Computes the list of nodes and their attributes.
1319

1320
    """
1321
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1322
    volumes = self.rpc.call_node_volumes(node_uuids)
1323

    
1324
    ilist = self.cfg.GetAllInstancesInfo()
1325
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1326

    
1327
    output = []
1328
    for node_uuid in node_uuids:
1329
      nresult = volumes[node_uuid]
1330
      if nresult.offline:
1331
        continue
1332
      msg = nresult.fail_msg
1333
      if msg:
1334
        self.LogWarning("Can't compute volume data on node %s: %s",
1335
                        self.cfg.GetNodeName(node_uuid), msg)
1336
        continue
1337

    
1338
      node_vols = sorted(nresult.payload,
1339
                         key=operator.itemgetter(constants.VF_DEV))
1340

    
1341
      for vol in node_vols:
1342
        node_output = []
1343
        for field in self.op.output_fields:
1344
          if field == constants.VF_NODE:
1345
            val = self.cfg.GetNodeName(node_uuid)
1346
          elif field == constants.VF_PHYS:
1347
            val = vol[constants.VF_DEV]
1348
          elif field == constants.VF_VG:
1349
            val = vol[constants.VF_VG]
1350
          elif field == constants.VF_NAME:
1351
            val = vol[constants.VF_NAME]
1352
          elif field == constants.VF_SIZE:
1353
            val = int(float(vol[constants.VF_SIZE]))
1354
          elif field == constants.VF_INSTANCE:
1355
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1356
                                 vol[constants.VF_NAME]), None)
1357
            if inst is not None:
1358
              val = inst.name
1359
            else:
1360
              val = "-"
1361
          else:
1362
            raise errors.ParameterError(field)
1363
          node_output.append(str(val))
1364

    
1365
        output.append(node_output)
1366

    
1367
    return output
1368

    
1369

    
1370
class LUNodeQueryStorage(NoHooksLU):
1371
  """Logical unit for getting information on storage units on node(s).
1372

1373
  """
1374
  REQ_BGL = False
1375

    
1376
  def CheckArguments(self):
1377
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1378
                       self.op.output_fields)
1379

    
1380
  def ExpandNames(self):
1381
    self.share_locks = ShareAll()
1382

    
1383
    if self.op.nodes:
1384
      self.needed_locks = {
1385
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1386
        }
1387
    else:
1388
      self.needed_locks = {
1389
        locking.LEVEL_NODE: locking.ALL_SET,
1390
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1391
        }
1392

    
1393
  def _DetermineStorageType(self):
1394
    """Determines the default storage type of the cluster.
1395

1396
    """
1397
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1398
    default_storage_type = \
1399
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1400
    return default_storage_type
1401

    
1402
  def CheckPrereq(self):
1403
    """Check prerequisites.
1404

1405
    """
1406
    if self.op.storage_type:
1407
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1408
      self.storage_type = self.op.storage_type
1409
    else:
1410
      self.storage_type = self._DetermineStorageType()
1411
      if self.storage_type not in constants.STS_REPORT:
1412
        raise errors.OpPrereqError(
1413
            "Storage reporting for storage type '%s' is not supported. Please"
1414
            " use the --storage-type option to specify one of the supported"
1415
            " storage types (%s) or set the default disk template to one that"
1416
            " supports storage reporting." %
1417
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1418

    
1419
  def Exec(self, feedback_fn):
1420
    """Computes the list of nodes and their attributes.
1421

1422
    """
1423
    if self.op.storage_type:
1424
      self.storage_type = self.op.storage_type
1425
    else:
1426
      self.storage_type = self._DetermineStorageType()
1427

    
1428
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1429

    
1430
    # Always get name to sort by
1431
    if constants.SF_NAME in self.op.output_fields:
1432
      fields = self.op.output_fields[:]
1433
    else:
1434
      fields = [constants.SF_NAME] + self.op.output_fields
1435

    
1436
    # Never ask for node or type as it's only known to the LU
1437
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1438
      while extra in fields:
1439
        fields.remove(extra)
1440

    
1441
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1442
    name_idx = field_idx[constants.SF_NAME]
1443

    
1444
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1445
    data = self.rpc.call_storage_list(self.node_uuids,
1446
                                      self.storage_type, st_args,
1447
                                      self.op.name, fields)
1448

    
1449
    result = []
1450

    
1451
    for node_uuid in utils.NiceSort(self.node_uuids):
1452
      node_name = self.cfg.GetNodeName(node_uuid)
1453
      nresult = data[node_uuid]
1454
      if nresult.offline:
1455
        continue
1456

    
1457
      msg = nresult.fail_msg
1458
      if msg:
1459
        self.LogWarning("Can't get storage data from node %s: %s",
1460
                        node_name, msg)
1461
        continue
1462

    
1463
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1464

    
1465
      for name in utils.NiceSort(rows.keys()):
1466
        row = rows[name]
1467

    
1468
        out = []
1469

    
1470
        for field in self.op.output_fields:
1471
          if field == constants.SF_NODE:
1472
            val = node_name
1473
          elif field == constants.SF_TYPE:
1474
            val = self.storage_type
1475
          elif field in field_idx:
1476
            val = row[field_idx[field]]
1477
          else:
1478
            raise errors.ParameterError(field)
1479

    
1480
          out.append(val)
1481

    
1482
        result.append(out)
1483

    
1484
    return result
1485

    
1486

    
1487
class LUNodeRemove(LogicalUnit):
1488
  """Logical unit for removing a node.
1489

1490
  """
1491
  HPATH = "node-remove"
1492
  HTYPE = constants.HTYPE_NODE
1493

    
1494
  def BuildHooksEnv(self):
1495
    """Build hooks env.
1496

1497
    """
1498
    return {
1499
      "OP_TARGET": self.op.node_name,
1500
      "NODE_NAME": self.op.node_name,
1501
      }
1502

    
1503
  def BuildHooksNodes(self):
1504
    """Build hooks nodes.
1505

1506
    This doesn't run on the target node in the pre phase as a failed
1507
    node would then be impossible to remove.
1508

1509
    """
1510
    all_nodes = self.cfg.GetNodeList()
1511
    try:
1512
      all_nodes.remove(self.op.node_uuid)
1513
    except ValueError:
1514
      pass
1515
    return (all_nodes, all_nodes)
1516

    
1517
  def CheckPrereq(self):
1518
    """Check prerequisites.
1519

1520
    This checks:
1521
     - the node exists in the configuration
1522
     - it does not have primary or secondary instances
1523
     - it's not the master
1524

1525
    Any errors are signaled by raising errors.OpPrereqError.
1526

1527
    """
1528
    (self.op.node_uuid, self.op.node_name) = \
1529
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1530
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1531
    assert node is not None
1532

    
1533
    masternode = self.cfg.GetMasterNode()
1534
    if node.uuid == masternode:
1535
      raise errors.OpPrereqError("Node is the master node, failover to another"
1536
                                 " node is required", errors.ECODE_INVAL)
1537

    
1538
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1539
      if node.uuid in instance.all_nodes:
1540
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1541
                                   " please remove first" % instance.name,
1542
                                   errors.ECODE_INVAL)
1543
    self.op.node_name = node.name
1544
    self.node = node
1545

    
1546
  def Exec(self, feedback_fn):
1547
    """Removes the node from the cluster.
1548

1549
    """
1550
    logging.info("Stopping the node daemon and removing configs from node %s",
1551
                 self.node.name)
1552

    
1553
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1554

    
1555
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1556
      "Not owning BGL"
1557

    
1558
    # Promote nodes to master candidate as needed
1559
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1560
    self.context.RemoveNode(self.node)
1561

    
1562
    # Run post hooks on the node before it's removed
1563
    RunPostHook(self, self.node.name)
1564

    
1565
    # we have to call this by name rather than by UUID, as the node is no longer
1566
    # in the config
1567
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1568
    msg = result.fail_msg
1569
    if msg:
1570
      self.LogWarning("Errors encountered on the remote node while leaving"
1571
                      " the cluster: %s", msg)
1572

    
1573
    # Remove node from our /etc/hosts
1574
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1575
      master_node_uuid = self.cfg.GetMasterNode()
1576
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1577
                                              constants.ETC_HOSTS_REMOVE,
1578
                                              self.node.name, None)
1579
      result.Raise("Can't update hosts file with new host data")
1580
      RedistributeAncillaryFiles(self)
1581

    
1582

    
1583
class LURepairNodeStorage(NoHooksLU):
1584
  """Repairs the volume group on a node.
1585

1586
  """
1587
  REQ_BGL = False
1588

    
1589
  def CheckArguments(self):
1590
    (self.op.node_uuid, self.op.node_name) = \
1591
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1592

    
1593
    storage_type = self.op.storage_type
1594

    
1595
    if (constants.SO_FIX_CONSISTENCY not in
1596
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1597
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1598
                                 " repaired" % storage_type,
1599
                                 errors.ECODE_INVAL)
1600

    
1601
  def ExpandNames(self):
1602
    self.needed_locks = {
1603
      locking.LEVEL_NODE: [self.op.node_uuid],
1604
      }
1605

    
1606
  def _CheckFaultyDisks(self, instance, node_uuid):
1607
    """Ensure faulty disks abort the opcode or at least warn."""
1608
    try:
1609
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1610
                                 node_uuid, True):
1611
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1612
                                   " node '%s'" %
1613
                                   (instance.name,
1614
                                    self.cfg.GetNodeName(node_uuid)),
1615
                                   errors.ECODE_STATE)
1616
    except errors.OpPrereqError, err:
1617
      if self.op.ignore_consistency:
1618
        self.LogWarning(str(err.args[0]))
1619
      else:
1620
        raise
1621

    
1622
  def CheckPrereq(self):
1623
    """Check prerequisites.
1624

1625
    """
1626
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1627

    
1628
    # Check whether any instance on this node has faulty disks
1629
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1630
      if not inst.disks_active:
1631
        continue
1632
      check_nodes = set(inst.all_nodes)
1633
      check_nodes.discard(self.op.node_uuid)
1634
      for inst_node_uuid in check_nodes:
1635
        self._CheckFaultyDisks(inst, inst_node_uuid)
1636

    
1637
  def Exec(self, feedback_fn):
1638
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1639
                (self.op.name, self.op.node_name))
1640

    
1641
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1642
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1643
                                           self.op.storage_type, st_args,
1644
                                           self.op.name,
1645
                                           constants.SO_FIX_CONSISTENCY)
1646
    result.Raise("Failed to repair storage unit '%s' on %s" %
1647
                 (self.op.name, self.op.node_name))