Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ ff1c051b

History | View | Annotate | Download (60.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes)
142

    
143
  def PreparePostHookNodes(self, post_hook_node_uuids):
144
    return post_hook_node_uuids + [self.new_node.uuid]
145

    
146
  def CheckPrereq(self):
147
    """Check prerequisites.
148

149
    This checks:
150
     - the new node is not already in the config
151
     - it is resolvable
152
     - its parameters (single/dual homed) matches the cluster
153

154
    Any errors are signaled by raising errors.OpPrereqError.
155

156
    """
157
    node_name = self.hostname.name
158
    self.op.primary_ip = self.hostname.ip
159
    if self.op.secondary_ip is None:
160
      if self.primary_ip_family == netutils.IP6Address.family:
161
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
162
                                   " IPv4 address must be given as secondary",
163
                                   errors.ECODE_INVAL)
164
      self.op.secondary_ip = self.op.primary_ip
165

    
166
    secondary_ip = self.op.secondary_ip
167
    if not netutils.IP4Address.IsValid(secondary_ip):
168
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
169
                                 " address" % secondary_ip, errors.ECODE_INVAL)
170

    
171
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
172
    if not self.op.readd and existing_node_info is not None:
173
      raise errors.OpPrereqError("Node %s is already in the configuration" %
174
                                 node_name, errors.ECODE_EXISTS)
175
    elif self.op.readd and existing_node_info is None:
176
      raise errors.OpPrereqError("Node %s is not in the configuration" %
177
                                 node_name, errors.ECODE_NOENT)
178

    
179
    self.changed_primary_ip = False
180

    
181
    for existing_node in self.cfg.GetAllNodesInfo().values():
182
      if self.op.readd and node_name == existing_node.name:
183
        if existing_node.secondary_ip != secondary_ip:
184
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
185
                                     " address configuration as before",
186
                                     errors.ECODE_INVAL)
187
        if existing_node.primary_ip != self.op.primary_ip:
188
          self.changed_primary_ip = True
189

    
190
        continue
191

    
192
      if (existing_node.primary_ip == self.op.primary_ip or
193
          existing_node.secondary_ip == self.op.primary_ip or
194
          existing_node.primary_ip == secondary_ip or
195
          existing_node.secondary_ip == secondary_ip):
196
        raise errors.OpPrereqError("New node ip address(es) conflict with"
197
                                   " existing node %s" % existing_node.name,
198
                                   errors.ECODE_NOTUNIQUE)
199

    
200
    # After this 'if' block, None is no longer a valid value for the
201
    # _capable op attributes
202
    if self.op.readd:
203
      assert existing_node_info is not None, \
204
        "Can't retrieve locked node %s" % node_name
205
      for attr in self._NFLAGS:
206
        if getattr(self.op, attr) is None:
207
          setattr(self.op, attr, getattr(existing_node_info, attr))
208
    else:
209
      for attr in self._NFLAGS:
210
        if getattr(self.op, attr) is None:
211
          setattr(self.op, attr, True)
212

    
213
    if self.op.readd and not self.op.vm_capable:
214
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
215
      if pri or sec:
216
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
217
                                   " flag set to false, but it already holds"
218
                                   " instances" % node_name,
219
                                   errors.ECODE_STATE)
220

    
221
    # check that the type of the node (single versus dual homed) is the
222
    # same as for the master
223
    myself = self.cfg.GetMasterNodeInfo()
224
    master_singlehomed = myself.secondary_ip == myself.primary_ip
225
    newbie_singlehomed = secondary_ip == self.op.primary_ip
226
    if master_singlehomed != newbie_singlehomed:
227
      if master_singlehomed:
228
        raise errors.OpPrereqError("The master has no secondary ip but the"
229
                                   " new node has one",
230
                                   errors.ECODE_INVAL)
231
      else:
232
        raise errors.OpPrereqError("The master has a secondary ip but the"
233
                                   " new node doesn't have one",
234
                                   errors.ECODE_INVAL)
235

    
236
    # checks reachability
237
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
238
      raise errors.OpPrereqError("Node not reachable by ping",
239
                                 errors.ECODE_ENVIRON)
240

    
241
    if not newbie_singlehomed:
242
      # check reachability from my secondary ip to newbie's secondary ip
243
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
244
                              source=myself.secondary_ip):
245
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
246
                                   " based ping to node daemon port",
247
                                   errors.ECODE_ENVIRON)
248

    
249
    if self.op.readd:
250
      exceptions = [existing_node_info.uuid]
251
    else:
252
      exceptions = []
253

    
254
    if self.op.master_capable:
255
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
256
    else:
257
      self.master_candidate = False
258

    
259
    if self.op.readd:
260
      self.new_node = existing_node_info
261
    else:
262
      node_group = self.cfg.LookupNodeGroup(self.op.group)
263
      self.new_node = objects.Node(name=node_name,
264
                                   primary_ip=self.op.primary_ip,
265
                                   secondary_ip=secondary_ip,
266
                                   master_candidate=self.master_candidate,
267
                                   offline=False, drained=False,
268
                                   group=node_group, ndparams={})
269

    
270
    if self.op.ndparams:
271
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
272
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
273
                           "node", "cluster or group")
274

    
275
    if self.op.hv_state:
276
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
277

    
278
    if self.op.disk_state:
279
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
280

    
281
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
282
    #       it a property on the base class.
283
    rpcrunner = rpc.DnsOnlyRunner()
284
    result = rpcrunner.call_version([node_name])[node_name]
285
    result.Raise("Can't get version information from node %s" % node_name)
286
    if constants.PROTOCOL_VERSION == result.payload:
287
      logging.info("Communication to node %s fine, sw version %s match",
288
                   node_name, result.payload)
289
    else:
290
      raise errors.OpPrereqError("Version mismatch master version %s,"
291
                                 " node version %s" %
292
                                 (constants.PROTOCOL_VERSION, result.payload),
293
                                 errors.ECODE_ENVIRON)
294

    
295
    vg_name = self.cfg.GetVGName()
296
    if vg_name is not None:
297
      vparams = {constants.NV_PVLIST: [vg_name]}
298
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
299
      cname = self.cfg.GetClusterName()
300
      result = rpcrunner.call_node_verify_light(
301
          [node_name], vparams, cname,
302
          self.cfg.GetClusterInfo().hvparams)[node_name]
303
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
304
      if errmsgs:
305
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
306
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
307

    
308
  def _InitOpenVSwitch(self):
309
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
310
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
311

    
312
    ovs = filled_ndparams.get(constants.ND_OVS, None)
313
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
314
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
315

    
316
    if ovs:
317
      if not ovs_link:
318
        self.LogInfo("No physical interface for OpenvSwitch was given."
319
                     " OpenvSwitch will not have an outside connection. This"
320
                     " might not be what you want.")
321

    
322
      result = self.rpc.call_node_configure_ovs(
323
                 self.new_node.name, ovs_name, ovs_link)
324
      result.Raise("Failed to initialize OpenVSwitch on new node")
325

    
326
  def Exec(self, feedback_fn):
327
    """Adds the new node to the cluster.
328

329
    """
330
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
331
      "Not owning BGL"
332

    
333
    # We adding a new node so we assume it's powered
334
    self.new_node.powered = True
335

    
336
    # for re-adds, reset the offline/drained/master-candidate flags;
337
    # we need to reset here, otherwise offline would prevent RPC calls
338
    # later in the procedure; this also means that if the re-add
339
    # fails, we are left with a non-offlined, broken node
340
    if self.op.readd:
341
      self.new_node.offline = False
342
      self.new_node.drained = False
343
      self.LogInfo("Readding a node, the offline/drained flags were reset")
344
      # if we demote the node, we do cleanup later in the procedure
345
      self.new_node.master_candidate = self.master_candidate
346
      if self.changed_primary_ip:
347
        self.new_node.primary_ip = self.op.primary_ip
348

    
349
    # copy the master/vm_capable flags
350
    for attr in self._NFLAGS:
351
      setattr(self.new_node, attr, getattr(self.op, attr))
352

    
353
    # notify the user about any possible mc promotion
354
    if self.new_node.master_candidate:
355
      self.LogInfo("Node will be a master candidate")
356

    
357
    if self.op.ndparams:
358
      self.new_node.ndparams = self.op.ndparams
359
    else:
360
      self.new_node.ndparams = {}
361

    
362
    if self.op.hv_state:
363
      self.new_node.hv_state_static = self.new_hv_state
364

    
365
    if self.op.disk_state:
366
      self.new_node.disk_state_static = self.new_disk_state
367

    
368
    # Add node to our /etc/hosts, and add key to known_hosts
369
    if self.cfg.GetClusterInfo().modify_etc_hosts:
370
      master_node = self.cfg.GetMasterNode()
371
      result = self.rpc.call_etc_hosts_modify(
372
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
373
                 self.hostname.ip)
374
      result.Raise("Can't update hosts file with new host data")
375

    
376
    if self.new_node.secondary_ip != self.new_node.primary_ip:
377
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
378
                               False)
379

    
380
    node_verifier_uuids = [self.cfg.GetMasterNode()]
381
    node_verify_param = {
382
      constants.NV_NODELIST: ([self.new_node.name], {}),
383
      # TODO: do a node-net-test as well?
384
    }
385

    
386
    result = self.rpc.call_node_verify(
387
               node_verifier_uuids, node_verify_param,
388
               self.cfg.GetClusterName(),
389
               self.cfg.GetClusterInfo().hvparams)
390
    for verifier in node_verifier_uuids:
391
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
392
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
393
      if nl_payload:
394
        for failed in nl_payload:
395
          feedback_fn("ssh/hostname verification failed"
396
                      " (checking from %s): %s" %
397
                      (verifier, nl_payload[failed]))
398
        raise errors.OpExecError("ssh/hostname verification failed")
399

    
400
    self._InitOpenVSwitch()
401

    
402
    if self.op.readd:
403
      self.context.ReaddNode(self.new_node)
404
      RedistributeAncillaryFiles(self)
405
      # make sure we redistribute the config
406
      self.cfg.Update(self.new_node, feedback_fn)
407
      # and make sure the new node will not have old files around
408
      if not self.new_node.master_candidate:
409
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
410
        result.Warn("Node failed to demote itself from master candidate status",
411
                    self.LogWarning)
412
    else:
413
      self.context.AddNode(self.new_node, self.proc.GetECId())
414
      RedistributeAncillaryFiles(self)
415

    
416

    
417
class LUNodeSetParams(LogicalUnit):
418
  """Modifies the parameters of a node.
419

420
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
421
      to the node role (as _ROLE_*)
422
  @cvar _R2F: a dictionary from node role to tuples of flags
423
  @cvar _FLAGS: a list of attribute names corresponding to the flags
424

425
  """
426
  HPATH = "node-modify"
427
  HTYPE = constants.HTYPE_NODE
428
  REQ_BGL = False
429
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
430
  _F2R = {
431
    (True, False, False): _ROLE_CANDIDATE,
432
    (False, True, False): _ROLE_DRAINED,
433
    (False, False, True): _ROLE_OFFLINE,
434
    (False, False, False): _ROLE_REGULAR,
435
    }
436
  _R2F = dict((v, k) for k, v in _F2R.items())
437
  _FLAGS = ["master_candidate", "drained", "offline"]
438

    
439
  def CheckArguments(self):
440
    (self.op.node_uuid, self.op.node_name) = \
441
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
442
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
443
                self.op.master_capable, self.op.vm_capable,
444
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
445
                self.op.disk_state]
446
    if all_mods.count(None) == len(all_mods):
447
      raise errors.OpPrereqError("Please pass at least one modification",
448
                                 errors.ECODE_INVAL)
449
    if all_mods.count(True) > 1:
450
      raise errors.OpPrereqError("Can't set the node into more than one"
451
                                 " state at the same time",
452
                                 errors.ECODE_INVAL)
453

    
454
    # Boolean value that tells us whether we might be demoting from MC
455
    self.might_demote = (self.op.master_candidate is False or
456
                         self.op.offline is True or
457
                         self.op.drained is True or
458
                         self.op.master_capable is False)
459

    
460
    if self.op.secondary_ip:
461
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
462
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
463
                                   " address" % self.op.secondary_ip,
464
                                   errors.ECODE_INVAL)
465

    
466
    self.lock_all = self.op.auto_promote and self.might_demote
467
    self.lock_instances = self.op.secondary_ip is not None
468

    
469
  def _InstanceFilter(self, instance):
470
    """Filter for getting affected instances.
471

472
    """
473
    return (instance.disk_template in constants.DTS_INT_MIRROR and
474
            self.op.node_uuid in instance.all_nodes)
475

    
476
  def ExpandNames(self):
477
    if self.lock_all:
478
      self.needed_locks = {
479
        locking.LEVEL_NODE: locking.ALL_SET,
480

    
481
        # Block allocations when all nodes are locked
482
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
483
        }
484
    else:
485
      self.needed_locks = {
486
        locking.LEVEL_NODE: self.op.node_uuid,
487
        }
488

    
489
    # Since modifying a node can have severe effects on currently running
490
    # operations the resource lock is at least acquired in shared mode
491
    self.needed_locks[locking.LEVEL_NODE_RES] = \
492
      self.needed_locks[locking.LEVEL_NODE]
493

    
494
    # Get all locks except nodes in shared mode; they are not used for anything
495
    # but read-only access
496
    self.share_locks = ShareAll()
497
    self.share_locks[locking.LEVEL_NODE] = 0
498
    self.share_locks[locking.LEVEL_NODE_RES] = 0
499
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
500

    
501
    if self.lock_instances:
502
      self.needed_locks[locking.LEVEL_INSTANCE] = \
503
        self.cfg.GetInstanceNames(
504
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
505

    
506
  def BuildHooksEnv(self):
507
    """Build hooks env.
508

509
    This runs on the master node.
510

511
    """
512
    return {
513
      "OP_TARGET": self.op.node_name,
514
      "MASTER_CANDIDATE": str(self.op.master_candidate),
515
      "OFFLINE": str(self.op.offline),
516
      "DRAINED": str(self.op.drained),
517
      "MASTER_CAPABLE": str(self.op.master_capable),
518
      "VM_CAPABLE": str(self.op.vm_capable),
519
      }
520

    
521
  def BuildHooksNodes(self):
522
    """Build hooks nodes.
523

524
    """
525
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
526
    return (nl, nl)
527

    
528
  def CheckPrereq(self):
529
    """Check prerequisites.
530

531
    This only checks the instance list against the existing names.
532

533
    """
534
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
535
    if self.lock_instances:
536
      affected_instances = \
537
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
538

    
539
      # Verify instance locks
540
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
541
      wanted_instance_names = frozenset([inst.name for inst in
542
                                         affected_instances.values()])
543
      if wanted_instance_names - owned_instance_names:
544
        raise errors.OpPrereqError("Instances affected by changing node %s's"
545
                                   " secondary IP address have changed since"
546
                                   " locks were acquired, wanted '%s', have"
547
                                   " '%s'; retry the operation" %
548
                                   (node.name,
549
                                    utils.CommaJoin(wanted_instance_names),
550
                                    utils.CommaJoin(owned_instance_names)),
551
                                   errors.ECODE_STATE)
552
    else:
553
      affected_instances = None
554

    
555
    if (self.op.master_candidate is not None or
556
        self.op.drained is not None or
557
        self.op.offline is not None):
558
      # we can't change the master's node flags
559
      if node.uuid == self.cfg.GetMasterNode():
560
        raise errors.OpPrereqError("The master role can be changed"
561
                                   " only via master-failover",
562
                                   errors.ECODE_INVAL)
563

    
564
    if self.op.master_candidate and not node.master_capable:
565
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
566
                                 " it a master candidate" % node.name,
567
                                 errors.ECODE_STATE)
568

    
569
    if self.op.vm_capable is False:
570
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
571
      if ipri or isec:
572
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
573
                                   " the vm_capable flag" % node.name,
574
                                   errors.ECODE_STATE)
575

    
576
    if node.master_candidate and self.might_demote and not self.lock_all:
577
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
578
      # check if after removing the current node, we're missing master
579
      # candidates
580
      (mc_remaining, mc_should, _) = \
581
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
582
      if mc_remaining < mc_should:
583
        raise errors.OpPrereqError("Not enough master candidates, please"
584
                                   " pass auto promote option to allow"
585
                                   " promotion (--auto-promote or RAPI"
586
                                   " auto_promote=True)", errors.ECODE_STATE)
587

    
588
    self.old_flags = old_flags = (node.master_candidate,
589
                                  node.drained, node.offline)
590
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
591
    self.old_role = old_role = self._F2R[old_flags]
592

    
593
    # Check for ineffective changes
594
    for attr in self._FLAGS:
595
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
596
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
597
        setattr(self.op, attr, None)
598

    
599
    # Past this point, any flag change to False means a transition
600
    # away from the respective state, as only real changes are kept
601

    
602
    # TODO: We might query the real power state if it supports OOB
603
    if SupportsOob(self.cfg, node):
604
      if self.op.offline is False and not (node.powered or
605
                                           self.op.powered is True):
606
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
607
                                    " offline status can be reset") %
608
                                   self.op.node_name, errors.ECODE_STATE)
609
    elif self.op.powered is not None:
610
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
611
                                  " as it does not support out-of-band"
612
                                  " handling") % self.op.node_name,
613
                                 errors.ECODE_STATE)
614

    
615
    # If we're being deofflined/drained, we'll MC ourself if needed
616
    if (self.op.drained is False or self.op.offline is False or
617
        (self.op.master_capable and not node.master_capable)):
618
      if _DecideSelfPromotion(self):
619
        self.op.master_candidate = True
620
        self.LogInfo("Auto-promoting node to master candidate")
621

    
622
    # If we're no longer master capable, we'll demote ourselves from MC
623
    if self.op.master_capable is False and node.master_candidate:
624
      self.LogInfo("Demoting from master candidate")
625
      self.op.master_candidate = False
626

    
627
    # Compute new role
628
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
629
    if self.op.master_candidate:
630
      new_role = self._ROLE_CANDIDATE
631
    elif self.op.drained:
632
      new_role = self._ROLE_DRAINED
633
    elif self.op.offline:
634
      new_role = self._ROLE_OFFLINE
635
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
636
      # False is still in new flags, which means we're un-setting (the
637
      # only) True flag
638
      new_role = self._ROLE_REGULAR
639
    else: # no new flags, nothing, keep old role
640
      new_role = old_role
641

    
642
    self.new_role = new_role
643

    
644
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
645
      # Trying to transition out of offline status
646
      result = self.rpc.call_version([node.uuid])[node.uuid]
647
      if result.fail_msg:
648
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
649
                                   " to report its version: %s" %
650
                                   (node.name, result.fail_msg),
651
                                   errors.ECODE_STATE)
652
      else:
653
        self.LogWarning("Transitioning node from offline to online state"
654
                        " without using re-add. Please make sure the node"
655
                        " is healthy!")
656

    
657
    # When changing the secondary ip, verify if this is a single-homed to
658
    # multi-homed transition or vice versa, and apply the relevant
659
    # restrictions.
660
    if self.op.secondary_ip:
661
      # Ok even without locking, because this can't be changed by any LU
662
      master = self.cfg.GetMasterNodeInfo()
663
      master_singlehomed = master.secondary_ip == master.primary_ip
664
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
665
        if self.op.force and node.uuid == master.uuid:
666
          self.LogWarning("Transitioning from single-homed to multi-homed"
667
                          " cluster; all nodes will require a secondary IP"
668
                          " address")
669
        else:
670
          raise errors.OpPrereqError("Changing the secondary ip on a"
671
                                     " single-homed cluster requires the"
672
                                     " --force option to be passed, and the"
673
                                     " target node to be the master",
674
                                     errors.ECODE_INVAL)
675
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
676
        if self.op.force and node.uuid == master.uuid:
677
          self.LogWarning("Transitioning from multi-homed to single-homed"
678
                          " cluster; secondary IP addresses will have to be"
679
                          " removed")
680
        else:
681
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
682
                                     " same as the primary IP on a multi-homed"
683
                                     " cluster, unless the --force option is"
684
                                     " passed, and the target node is the"
685
                                     " master", errors.ECODE_INVAL)
686

    
687
      assert not (set([inst.name for inst in affected_instances.values()]) -
688
                  self.owned_locks(locking.LEVEL_INSTANCE))
689

    
690
      if node.offline:
691
        if affected_instances:
692
          msg = ("Cannot change secondary IP address: offline node has"
693
                 " instances (%s) configured to use it" %
694
                 utils.CommaJoin(
695
                   [inst.name for inst in affected_instances.values()]))
696
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
697
      else:
698
        # On online nodes, check that no instances are running, and that
699
        # the node has the new ip and we can reach it.
700
        for instance in affected_instances.values():
701
          CheckInstanceState(self, instance, INSTANCE_DOWN,
702
                             msg="cannot change secondary ip")
703

    
704
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
705
        if master.uuid != node.uuid:
706
          # check reachability from master secondary ip to new secondary ip
707
          if not netutils.TcpPing(self.op.secondary_ip,
708
                                  constants.DEFAULT_NODED_PORT,
709
                                  source=master.secondary_ip):
710
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
711
                                       " based ping to node daemon port",
712
                                       errors.ECODE_ENVIRON)
713

    
714
    if self.op.ndparams:
715
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
716
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
717
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
718
                           "node", "cluster or group")
719
      self.new_ndparams = new_ndparams
720

    
721
    if self.op.hv_state:
722
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
723
                                                node.hv_state_static)
724

    
725
    if self.op.disk_state:
726
      self.new_disk_state = \
727
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
728

    
729
  def Exec(self, feedback_fn):
730
    """Modifies a node.
731

732
    """
733
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
734
    result = []
735

    
736
    if self.op.ndparams:
737
      node.ndparams = self.new_ndparams
738

    
739
    if self.op.powered is not None:
740
      node.powered = self.op.powered
741

    
742
    if self.op.hv_state:
743
      node.hv_state_static = self.new_hv_state
744

    
745
    if self.op.disk_state:
746
      node.disk_state_static = self.new_disk_state
747

    
748
    for attr in ["master_capable", "vm_capable"]:
749
      val = getattr(self.op, attr)
750
      if val is not None:
751
        setattr(node, attr, val)
752
        result.append((attr, str(val)))
753

    
754
    if self.new_role != self.old_role:
755
      # Tell the node to demote itself, if no longer MC and not offline
756
      if self.old_role == self._ROLE_CANDIDATE and \
757
          self.new_role != self._ROLE_OFFLINE:
758
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
759
        if msg:
760
          self.LogWarning("Node failed to demote itself: %s", msg)
761

    
762
      new_flags = self._R2F[self.new_role]
763
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
764
        if of != nf:
765
          result.append((desc, str(nf)))
766
      (node.master_candidate, node.drained, node.offline) = new_flags
767

    
768
      # we locked all nodes, we adjust the CP before updating this node
769
      if self.lock_all:
770
        AdjustCandidatePool(self, [node.uuid])
771

    
772
    if self.op.secondary_ip:
773
      node.secondary_ip = self.op.secondary_ip
774
      result.append(("secondary_ip", self.op.secondary_ip))
775

    
776
    # this will trigger configuration file update, if needed
777
    self.cfg.Update(node, feedback_fn)
778

    
779
    # this will trigger job queue propagation or cleanup if the mc
780
    # flag changed
781
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
782
      self.context.ReaddNode(node)
783

    
784
    return result
785

    
786

    
787
class LUNodePowercycle(NoHooksLU):
788
  """Powercycles a node.
789

790
  """
791
  REQ_BGL = False
792

    
793
  def CheckArguments(self):
794
    (self.op.node_uuid, self.op.node_name) = \
795
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
796

    
797
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
798
      raise errors.OpPrereqError("The node is the master and the force"
799
                                 " parameter was not set",
800
                                 errors.ECODE_INVAL)
801

    
802
  def ExpandNames(self):
803
    """Locking for PowercycleNode.
804

805
    This is a last-resort option and shouldn't block on other
806
    jobs. Therefore, we grab no locks.
807

808
    """
809
    self.needed_locks = {}
810

    
811
  def Exec(self, feedback_fn):
812
    """Reboots a node.
813

814
    """
815
    default_hypervisor = self.cfg.GetHypervisorType()
816
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
817
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
818
                                           default_hypervisor,
819
                                           hvparams)
820
    result.Raise("Failed to schedule the reboot")
821
    return result.payload
822

    
823

    
824
def _GetNodeInstancesInner(cfg, fn):
825
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
826

    
827

    
828
def _GetNodePrimaryInstances(cfg, node_uuid):
829
  """Returns primary instances on a node.
830

831
  """
832
  return _GetNodeInstancesInner(cfg,
833
                                lambda inst: node_uuid == inst.primary_node)
834

    
835

    
836
def _GetNodeSecondaryInstances(cfg, node_uuid):
837
  """Returns secondary instances on a node.
838

839
  """
840
  return _GetNodeInstancesInner(cfg,
841
                                lambda inst: node_uuid in inst.secondary_nodes)
842

    
843

    
844
def _GetNodeInstances(cfg, node_uuid):
845
  """Returns a list of all primary and secondary instances on a node.
846

847
  """
848

    
849
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
850

    
851

    
852
class LUNodeEvacuate(NoHooksLU):
853
  """Evacuates instances off a list of nodes.
854

855
  """
856
  REQ_BGL = False
857

    
858
  def CheckArguments(self):
859
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
860

    
861
  def ExpandNames(self):
862
    (self.op.node_uuid, self.op.node_name) = \
863
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
864

    
865
    if self.op.remote_node is not None:
866
      (self.op.remote_node_uuid, self.op.remote_node) = \
867
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
868
                              self.op.remote_node)
869
      assert self.op.remote_node
870

    
871
      if self.op.node_uuid == self.op.remote_node_uuid:
872
        raise errors.OpPrereqError("Can not use evacuated node as a new"
873
                                   " secondary node", errors.ECODE_INVAL)
874

    
875
      if self.op.mode != constants.NODE_EVAC_SEC:
876
        raise errors.OpPrereqError("Without the use of an iallocator only"
877
                                   " secondary instances can be evacuated",
878
                                   errors.ECODE_INVAL)
879

    
880
    # Declare locks
881
    self.share_locks = ShareAll()
882
    self.needed_locks = {
883
      locking.LEVEL_INSTANCE: [],
884
      locking.LEVEL_NODEGROUP: [],
885
      locking.LEVEL_NODE: [],
886
      }
887

    
888
    # Determine nodes (via group) optimistically, needs verification once locks
889
    # have been acquired
890
    self.lock_nodes = self._DetermineNodes()
891

    
892
  def _DetermineNodes(self):
893
    """Gets the list of node UUIDs to operate on.
894

895
    """
896
    if self.op.remote_node is None:
897
      # Iallocator will choose any node(s) in the same group
898
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
899
    else:
900
      group_nodes = frozenset([self.op.remote_node_uuid])
901

    
902
    # Determine nodes to be locked
903
    return set([self.op.node_uuid]) | group_nodes
904

    
905
  def _DetermineInstances(self):
906
    """Builds list of instances to operate on.
907

908
    """
909
    assert self.op.mode in constants.NODE_EVAC_MODES
910

    
911
    if self.op.mode == constants.NODE_EVAC_PRI:
912
      # Primary instances only
913
      inst_fn = _GetNodePrimaryInstances
914
      assert self.op.remote_node is None, \
915
        "Evacuating primary instances requires iallocator"
916
    elif self.op.mode == constants.NODE_EVAC_SEC:
917
      # Secondary instances only
918
      inst_fn = _GetNodeSecondaryInstances
919
    else:
920
      # All instances
921
      assert self.op.mode == constants.NODE_EVAC_ALL
922
      inst_fn = _GetNodeInstances
923
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
924
      # per instance
925
      raise errors.OpPrereqError("Due to an issue with the iallocator"
926
                                 " interface it is not possible to evacuate"
927
                                 " all instances at once; specify explicitly"
928
                                 " whether to evacuate primary or secondary"
929
                                 " instances",
930
                                 errors.ECODE_INVAL)
931

    
932
    return inst_fn(self.cfg, self.op.node_uuid)
933

    
934
  def DeclareLocks(self, level):
935
    if level == locking.LEVEL_INSTANCE:
936
      # Lock instances optimistically, needs verification once node and group
937
      # locks have been acquired
938
      self.needed_locks[locking.LEVEL_INSTANCE] = \
939
        set(i.name for i in self._DetermineInstances())
940

    
941
    elif level == locking.LEVEL_NODEGROUP:
942
      # Lock node groups for all potential target nodes optimistically, needs
943
      # verification once nodes have been acquired
944
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
945
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
946

    
947
    elif level == locking.LEVEL_NODE:
948
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
949

    
950
  def CheckPrereq(self):
951
    # Verify locks
952
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
953
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
954
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
955

    
956
    need_nodes = self._DetermineNodes()
957

    
958
    if not owned_nodes.issuperset(need_nodes):
959
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
960
                                 " locks were acquired, current nodes are"
961
                                 " are '%s', used to be '%s'; retry the"
962
                                 " operation" %
963
                                 (self.op.node_name,
964
                                  utils.CommaJoin(need_nodes),
965
                                  utils.CommaJoin(owned_nodes)),
966
                                 errors.ECODE_STATE)
967

    
968
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
969
    if owned_groups != wanted_groups:
970
      raise errors.OpExecError("Node groups changed since locks were acquired,"
971
                               " current groups are '%s', used to be '%s';"
972
                               " retry the operation" %
973
                               (utils.CommaJoin(wanted_groups),
974
                                utils.CommaJoin(owned_groups)))
975

    
976
    # Determine affected instances
977
    self.instances = self._DetermineInstances()
978
    self.instance_names = [i.name for i in self.instances]
979

    
980
    if set(self.instance_names) != owned_instance_names:
981
      raise errors.OpExecError("Instances on node '%s' changed since locks"
982
                               " were acquired, current instances are '%s',"
983
                               " used to be '%s'; retry the operation" %
984
                               (self.op.node_name,
985
                                utils.CommaJoin(self.instance_names),
986
                                utils.CommaJoin(owned_instance_names)))
987

    
988
    if self.instance_names:
989
      self.LogInfo("Evacuating instances from node '%s': %s",
990
                   self.op.node_name,
991
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
992
    else:
993
      self.LogInfo("No instances to evacuate from node '%s'",
994
                   self.op.node_name)
995

    
996
    if self.op.remote_node is not None:
997
      for i in self.instances:
998
        if i.primary_node == self.op.remote_node_uuid:
999
          raise errors.OpPrereqError("Node %s is the primary node of"
1000
                                     " instance %s, cannot use it as"
1001
                                     " secondary" %
1002
                                     (self.op.remote_node, i.name),
1003
                                     errors.ECODE_INVAL)
1004

    
1005
  def Exec(self, feedback_fn):
1006
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1007

    
1008
    if not self.instance_names:
1009
      # No instances to evacuate
1010
      jobs = []
1011

    
1012
    elif self.op.iallocator is not None:
1013
      # TODO: Implement relocation to other group
1014
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1015
                                     instances=list(self.instance_names))
1016
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1017

    
1018
      ial.Run(self.op.iallocator)
1019

    
1020
      if not ial.success:
1021
        raise errors.OpPrereqError("Can't compute node evacuation using"
1022
                                   " iallocator '%s': %s" %
1023
                                   (self.op.iallocator, ial.info),
1024
                                   errors.ECODE_NORES)
1025

    
1026
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1027

    
1028
    elif self.op.remote_node is not None:
1029
      assert self.op.mode == constants.NODE_EVAC_SEC
1030
      jobs = [
1031
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1032
                                        remote_node=self.op.remote_node,
1033
                                        disks=[],
1034
                                        mode=constants.REPLACE_DISK_CHG,
1035
                                        early_release=self.op.early_release)]
1036
        for instance_name in self.instance_names]
1037

    
1038
    else:
1039
      raise errors.ProgrammerError("No iallocator or remote node")
1040

    
1041
    return ResultWithJobs(jobs)
1042

    
1043

    
1044
class LUNodeMigrate(LogicalUnit):
1045
  """Migrate all instances from a node.
1046

1047
  """
1048
  HPATH = "node-migrate"
1049
  HTYPE = constants.HTYPE_NODE
1050
  REQ_BGL = False
1051

    
1052
  def CheckArguments(self):
1053
    pass
1054

    
1055
  def ExpandNames(self):
1056
    (self.op.node_uuid, self.op.node_name) = \
1057
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1058

    
1059
    self.share_locks = ShareAll()
1060
    self.needed_locks = {
1061
      locking.LEVEL_NODE: [self.op.node_uuid],
1062
      }
1063

    
1064
  def BuildHooksEnv(self):
1065
    """Build hooks env.
1066

1067
    This runs on the master, the primary and all the secondaries.
1068

1069
    """
1070
    return {
1071
      "NODE_NAME": self.op.node_name,
1072
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1073
      }
1074

    
1075
  def BuildHooksNodes(self):
1076
    """Build hooks nodes.
1077

1078
    """
1079
    nl = [self.cfg.GetMasterNode()]
1080
    return (nl, nl)
1081

    
1082
  def CheckPrereq(self):
1083
    pass
1084

    
1085
  def Exec(self, feedback_fn):
1086
    # Prepare jobs for migration instances
1087
    jobs = [
1088
      [opcodes.OpInstanceMigrate(
1089
        instance_name=inst.name,
1090
        mode=self.op.mode,
1091
        live=self.op.live,
1092
        iallocator=self.op.iallocator,
1093
        target_node=self.op.target_node,
1094
        allow_runtime_changes=self.op.allow_runtime_changes,
1095
        ignore_ipolicy=self.op.ignore_ipolicy)]
1096
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1097

    
1098
    # TODO: Run iallocator in this opcode and pass correct placement options to
1099
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1100
    # running the iallocator and the actual migration, a good consistency model
1101
    # will have to be found.
1102

    
1103
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1104
            frozenset([self.op.node_uuid]))
1105

    
1106
    return ResultWithJobs(jobs)
1107

    
1108

    
1109
def _GetStorageTypeArgs(cfg, storage_type):
1110
  """Returns the arguments for a storage type.
1111

1112
  """
1113
  # Special case for file storage
1114
  if storage_type == constants.ST_FILE:
1115
    # storage.FileStorage wants a list of storage directories
1116
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1117

    
1118
  return []
1119

    
1120

    
1121
class LUNodeModifyStorage(NoHooksLU):
1122
  """Logical unit for modifying a storage volume on a node.
1123

1124
  """
1125
  REQ_BGL = False
1126

    
1127
  def CheckArguments(self):
1128
    (self.op.node_uuid, self.op.node_name) = \
1129
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1130

    
1131
    storage_type = self.op.storage_type
1132

    
1133
    try:
1134
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1135
    except KeyError:
1136
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1137
                                 " modified" % storage_type,
1138
                                 errors.ECODE_INVAL)
1139

    
1140
    diff = set(self.op.changes.keys()) - modifiable
1141
    if diff:
1142
      raise errors.OpPrereqError("The following fields can not be modified for"
1143
                                 " storage units of type '%s': %r" %
1144
                                 (storage_type, list(diff)),
1145
                                 errors.ECODE_INVAL)
1146

    
1147
  def CheckPrereq(self):
1148
    """Check prerequisites.
1149

1150
    """
1151
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1152

    
1153
  def ExpandNames(self):
1154
    self.needed_locks = {
1155
      locking.LEVEL_NODE: self.op.node_uuid,
1156
      }
1157

    
1158
  def Exec(self, feedback_fn):
1159
    """Computes the list of nodes and their attributes.
1160

1161
    """
1162
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1163
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1164
                                          self.op.storage_type, st_args,
1165
                                          self.op.name, self.op.changes)
1166
    result.Raise("Failed to modify storage unit '%s' on %s" %
1167
                 (self.op.name, self.op.node_name))
1168

    
1169

    
1170
class NodeQuery(QueryBase):
1171
  FIELDS = query.NODE_FIELDS
1172

    
1173
  def ExpandNames(self, lu):
1174
    lu.needed_locks = {}
1175
    lu.share_locks = ShareAll()
1176

    
1177
    if self.names:
1178
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1179
    else:
1180
      self.wanted = locking.ALL_SET
1181

    
1182
    self.do_locking = (self.use_locking and
1183
                       query.NQ_LIVE in self.requested_data)
1184

    
1185
    if self.do_locking:
1186
      # If any non-static field is requested we need to lock the nodes
1187
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1188
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1189

    
1190
  def DeclareLocks(self, lu, level):
1191
    pass
1192

    
1193
  def _GetQueryData(self, lu):
1194
    """Computes the list of nodes and their attributes.
1195

1196
    """
1197
    all_info = lu.cfg.GetAllNodesInfo()
1198

    
1199
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1200

    
1201
    # Gather data as requested
1202
    if query.NQ_LIVE in self.requested_data:
1203
      # filter out non-vm_capable nodes
1204
      toquery_node_uuids = [node.uuid for node in all_info.values()
1205
                            if node.vm_capable and node.uuid in node_uuids]
1206
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1207
      raw_storage_units = utils.storage.GetStorageUnits(
1208
          lu.cfg, [default_template])
1209
      storage_units = rpc.PrepareStorageUnitsForNodes(
1210
          lu.cfg, raw_storage_units, toquery_node_uuids)
1211
      default_hypervisor = lu.cfg.GetHypervisorType()
1212
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1213
      hvspecs = [(default_hypervisor, hvparams)]
1214
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1215
                                        hvspecs)
1216
      live_data = dict(
1217
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1218
          for (uuid, nresult) in node_data.items()
1219
          if not nresult.fail_msg and nresult.payload)
1220
    else:
1221
      live_data = None
1222

    
1223
    if query.NQ_INST in self.requested_data:
1224
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1225
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1226

    
1227
      inst_data = lu.cfg.GetAllInstancesInfo()
1228
      inst_uuid_to_inst_name = {}
1229

    
1230
      for inst in inst_data.values():
1231
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1232
        if inst.primary_node in node_to_primary:
1233
          node_to_primary[inst.primary_node].add(inst.uuid)
1234
        for secnode in inst.secondary_nodes:
1235
          if secnode in node_to_secondary:
1236
            node_to_secondary[secnode].add(inst.uuid)
1237
    else:
1238
      node_to_primary = None
1239
      node_to_secondary = None
1240
      inst_uuid_to_inst_name = None
1241

    
1242
    if query.NQ_OOB in self.requested_data:
1243
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1244
                         for uuid, node in all_info.iteritems())
1245
    else:
1246
      oob_support = None
1247

    
1248
    if query.NQ_GROUP in self.requested_data:
1249
      groups = lu.cfg.GetAllNodeGroupsInfo()
1250
    else:
1251
      groups = {}
1252

    
1253
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1254
                               live_data, lu.cfg.GetMasterNode(),
1255
                               node_to_primary, node_to_secondary,
1256
                               inst_uuid_to_inst_name, groups, oob_support,
1257
                               lu.cfg.GetClusterInfo())
1258

    
1259

    
1260
class LUNodeQuery(NoHooksLU):
1261
  """Logical unit for querying nodes.
1262

1263
  """
1264
  # pylint: disable=W0142
1265
  REQ_BGL = False
1266

    
1267
  def CheckArguments(self):
1268
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1269
                         self.op.output_fields, self.op.use_locking)
1270

    
1271
  def ExpandNames(self):
1272
    self.nq.ExpandNames(self)
1273

    
1274
  def DeclareLocks(self, level):
1275
    self.nq.DeclareLocks(self, level)
1276

    
1277
  def Exec(self, feedback_fn):
1278
    return self.nq.OldStyleQuery(self)
1279

    
1280

    
1281
def _CheckOutputFields(fields, selected):
1282
  """Checks whether all selected fields are valid according to fields.
1283

1284
  @type fields: L{utils.FieldSet}
1285
  @param fields: fields set
1286
  @type selected: L{utils.FieldSet}
1287
  @param selected: fields set
1288

1289
  """
1290
  delta = fields.NonMatching(selected)
1291
  if delta:
1292
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1293
                               % ",".join(delta), errors.ECODE_INVAL)
1294

    
1295

    
1296
class LUNodeQueryvols(NoHooksLU):
1297
  """Logical unit for getting volumes on node(s).
1298

1299
  """
1300
  REQ_BGL = False
1301

    
1302
  def CheckArguments(self):
1303
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1304
                                      constants.VF_VG, constants.VF_NAME,
1305
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1306
                       self.op.output_fields)
1307

    
1308
  def ExpandNames(self):
1309
    self.share_locks = ShareAll()
1310

    
1311
    if self.op.nodes:
1312
      self.needed_locks = {
1313
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1314
        }
1315
    else:
1316
      self.needed_locks = {
1317
        locking.LEVEL_NODE: locking.ALL_SET,
1318
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1319
        }
1320

    
1321
  def Exec(self, feedback_fn):
1322
    """Computes the list of nodes and their attributes.
1323

1324
    """
1325
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1326
    volumes = self.rpc.call_node_volumes(node_uuids)
1327

    
1328
    ilist = self.cfg.GetAllInstancesInfo()
1329
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1330

    
1331
    output = []
1332
    for node_uuid in node_uuids:
1333
      nresult = volumes[node_uuid]
1334
      if nresult.offline:
1335
        continue
1336
      msg = nresult.fail_msg
1337
      if msg:
1338
        self.LogWarning("Can't compute volume data on node %s: %s",
1339
                        self.cfg.GetNodeName(node_uuid), msg)
1340
        continue
1341

    
1342
      node_vols = sorted(nresult.payload,
1343
                         key=operator.itemgetter(constants.VF_DEV))
1344

    
1345
      for vol in node_vols:
1346
        node_output = []
1347
        for field in self.op.output_fields:
1348
          if field == constants.VF_NODE:
1349
            val = self.cfg.GetNodeName(node_uuid)
1350
          elif field == constants.VF_PHYS:
1351
            val = vol[constants.VF_DEV]
1352
          elif field == constants.VF_VG:
1353
            val = vol[constants.VF_VG]
1354
          elif field == constants.VF_NAME:
1355
            val = vol[constants.VF_NAME]
1356
          elif field == constants.VF_SIZE:
1357
            val = int(float(vol[constants.VF_SIZE]))
1358
          elif field == constants.VF_INSTANCE:
1359
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1360
                                 vol[constants.VF_NAME]), None)
1361
            if inst is not None:
1362
              val = inst.name
1363
            else:
1364
              val = "-"
1365
          else:
1366
            raise errors.ParameterError(field)
1367
          node_output.append(str(val))
1368

    
1369
        output.append(node_output)
1370

    
1371
    return output
1372

    
1373

    
1374
class LUNodeQueryStorage(NoHooksLU):
1375
  """Logical unit for getting information on storage units on node(s).
1376

1377
  """
1378
  REQ_BGL = False
1379

    
1380
  def CheckArguments(self):
1381
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1382
                       self.op.output_fields)
1383

    
1384
  def ExpandNames(self):
1385
    self.share_locks = ShareAll()
1386

    
1387
    if self.op.nodes:
1388
      self.needed_locks = {
1389
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1390
        }
1391
    else:
1392
      self.needed_locks = {
1393
        locking.LEVEL_NODE: locking.ALL_SET,
1394
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1395
        }
1396

    
1397
  def _DetermineStorageType(self):
1398
    """Determines the default storage type of the cluster.
1399

1400
    """
1401
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1402
    default_storage_type = \
1403
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1404
    return default_storage_type
1405

    
1406
  def CheckPrereq(self):
1407
    """Check prerequisites.
1408

1409
    """
1410
    if self.op.storage_type:
1411
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1412
      self.storage_type = self.op.storage_type
1413
    else:
1414
      self.storage_type = self._DetermineStorageType()
1415
      if self.storage_type not in constants.STS_REPORT:
1416
        raise errors.OpPrereqError(
1417
            "Storage reporting for storage type '%s' is not supported. Please"
1418
            " use the --storage-type option to specify one of the supported"
1419
            " storage types (%s) or set the default disk template to one that"
1420
            " supports storage reporting." %
1421
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1422

    
1423
  def Exec(self, feedback_fn):
1424
    """Computes the list of nodes and their attributes.
1425

1426
    """
1427
    if self.op.storage_type:
1428
      self.storage_type = self.op.storage_type
1429
    else:
1430
      self.storage_type = self._DetermineStorageType()
1431

    
1432
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1433

    
1434
    # Always get name to sort by
1435
    if constants.SF_NAME in self.op.output_fields:
1436
      fields = self.op.output_fields[:]
1437
    else:
1438
      fields = [constants.SF_NAME] + self.op.output_fields
1439

    
1440
    # Never ask for node or type as it's only known to the LU
1441
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1442
      while extra in fields:
1443
        fields.remove(extra)
1444

    
1445
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1446
    name_idx = field_idx[constants.SF_NAME]
1447

    
1448
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1449
    data = self.rpc.call_storage_list(self.node_uuids,
1450
                                      self.storage_type, st_args,
1451
                                      self.op.name, fields)
1452

    
1453
    result = []
1454

    
1455
    for node_uuid in utils.NiceSort(self.node_uuids):
1456
      node_name = self.cfg.GetNodeName(node_uuid)
1457
      nresult = data[node_uuid]
1458
      if nresult.offline:
1459
        continue
1460

    
1461
      msg = nresult.fail_msg
1462
      if msg:
1463
        self.LogWarning("Can't get storage data from node %s: %s",
1464
                        node_name, msg)
1465
        continue
1466

    
1467
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1468

    
1469
      for name in utils.NiceSort(rows.keys()):
1470
        row = rows[name]
1471

    
1472
        out = []
1473

    
1474
        for field in self.op.output_fields:
1475
          if field == constants.SF_NODE:
1476
            val = node_name
1477
          elif field == constants.SF_TYPE:
1478
            val = self.storage_type
1479
          elif field in field_idx:
1480
            val = row[field_idx[field]]
1481
          else:
1482
            raise errors.ParameterError(field)
1483

    
1484
          out.append(val)
1485

    
1486
        result.append(out)
1487

    
1488
    return result
1489

    
1490

    
1491
class LUNodeRemove(LogicalUnit):
1492
  """Logical unit for removing a node.
1493

1494
  """
1495
  HPATH = "node-remove"
1496
  HTYPE = constants.HTYPE_NODE
1497

    
1498
  def BuildHooksEnv(self):
1499
    """Build hooks env.
1500

1501
    """
1502
    return {
1503
      "OP_TARGET": self.op.node_name,
1504
      "NODE_NAME": self.op.node_name,
1505
      }
1506

    
1507
  def BuildHooksNodes(self):
1508
    """Build hooks nodes.
1509

1510
    This doesn't run on the target node in the pre phase as a failed
1511
    node would then be impossible to remove.
1512

1513
    """
1514
    all_nodes = self.cfg.GetNodeList()
1515
    try:
1516
      all_nodes.remove(self.op.node_uuid)
1517
    except ValueError:
1518
      pass
1519
    return (all_nodes, all_nodes)
1520

    
1521
  def CheckPrereq(self):
1522
    """Check prerequisites.
1523

1524
    This checks:
1525
     - the node exists in the configuration
1526
     - it does not have primary or secondary instances
1527
     - it's not the master
1528

1529
    Any errors are signaled by raising errors.OpPrereqError.
1530

1531
    """
1532
    (self.op.node_uuid, self.op.node_name) = \
1533
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1534
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1535
    assert node is not None
1536

    
1537
    masternode = self.cfg.GetMasterNode()
1538
    if node.uuid == masternode:
1539
      raise errors.OpPrereqError("Node is the master node, failover to another"
1540
                                 " node is required", errors.ECODE_INVAL)
1541

    
1542
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1543
      if node.uuid in instance.all_nodes:
1544
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1545
                                   " please remove first" % instance.name,
1546
                                   errors.ECODE_INVAL)
1547
    self.op.node_name = node.name
1548
    self.node = node
1549

    
1550
  def Exec(self, feedback_fn):
1551
    """Removes the node from the cluster.
1552

1553
    """
1554
    logging.info("Stopping the node daemon and removing configs from node %s",
1555
                 self.node.name)
1556

    
1557
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1558

    
1559
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1560
      "Not owning BGL"
1561

    
1562
    # Promote nodes to master candidate as needed
1563
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1564
    self.context.RemoveNode(self.node)
1565

    
1566
    # Run post hooks on the node before it's removed
1567
    RunPostHook(self, self.node.name)
1568

    
1569
    # we have to call this by name rather than by UUID, as the node is no longer
1570
    # in the config
1571
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1572
    msg = result.fail_msg
1573
    if msg:
1574
      self.LogWarning("Errors encountered on the remote node while leaving"
1575
                      " the cluster: %s", msg)
1576

    
1577
    # Remove node from our /etc/hosts
1578
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1579
      master_node_uuid = self.cfg.GetMasterNode()
1580
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1581
                                              constants.ETC_HOSTS_REMOVE,
1582
                                              self.node.name, None)
1583
      result.Raise("Can't update hosts file with new host data")
1584
      RedistributeAncillaryFiles(self)
1585

    
1586

    
1587
class LURepairNodeStorage(NoHooksLU):
1588
  """Repairs the volume group on a node.
1589

1590
  """
1591
  REQ_BGL = False
1592

    
1593
  def CheckArguments(self):
1594
    (self.op.node_uuid, self.op.node_name) = \
1595
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1596

    
1597
    storage_type = self.op.storage_type
1598

    
1599
    if (constants.SO_FIX_CONSISTENCY not in
1600
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1601
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1602
                                 " repaired" % storage_type,
1603
                                 errors.ECODE_INVAL)
1604

    
1605
  def ExpandNames(self):
1606
    self.needed_locks = {
1607
      locking.LEVEL_NODE: [self.op.node_uuid],
1608
      }
1609

    
1610
  def _CheckFaultyDisks(self, instance, node_uuid):
1611
    """Ensure faulty disks abort the opcode or at least warn."""
1612
    try:
1613
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1614
                                 node_uuid, True):
1615
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1616
                                   " node '%s'" %
1617
                                   (instance.name,
1618
                                    self.cfg.GetNodeName(node_uuid)),
1619
                                   errors.ECODE_STATE)
1620
    except errors.OpPrereqError, err:
1621
      if self.op.ignore_consistency:
1622
        self.LogWarning(str(err.args[0]))
1623
      else:
1624
        raise
1625

    
1626
  def CheckPrereq(self):
1627
    """Check prerequisites.
1628

1629
    """
1630
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1631

    
1632
    # Check whether any instance on this node has faulty disks
1633
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1634
      if not inst.disks_active:
1635
        continue
1636
      check_nodes = set(inst.all_nodes)
1637
      check_nodes.discard(self.op.node_uuid)
1638
      for inst_node_uuid in check_nodes:
1639
        self._CheckFaultyDisks(inst, inst_node_uuid)
1640

    
1641
  def Exec(self, feedback_fn):
1642
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1643
                (self.op.name, self.op.node_name))
1644

    
1645
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1646
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1647
                                           self.op.storage_type, st_args,
1648
                                           self.op.name,
1649
                                           constants.SO_FIX_CONSISTENCY)
1650
    result.Raise("Failed to repair storage unit '%s' on %s" %
1651
                 (self.op.name, self.op.node_name))