Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ a9f33339

History | View | Annotate | Download (60.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
    if self.op.ndparams:
116
      ovs = self.op.ndparams.get(constants.ND_OVS, None)
117
      ovs_name = self.op.ndparams.get(constants.ND_OVS_NAME, None)
118
      ovs_link = self.op.ndparams.get(constants.ND_OVS_LINK, None)
119

    
120
      # OpenvSwitch: Warn user if link is missing
121
      if ovs and not ovs_link:
122
        self.LogInfo("No physical interface for OpenvSwitch was given."
123
                     " OpenvSwitch will not have an outside connection. This"
124
                     " might not be what you want.")
125
      # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
126
      if not ovs and (ovs_name or ovs_link):
127
        raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
128
                                   " OpenvSwitch is not enabled. Please enable"
129
                                   " OpenvSwitch with --ovs",
130
                                   errors.ECODE_INVAL)
131

    
132
  def BuildHooksEnv(self):
133
    """Build hooks env.
134

135
    This will run on all nodes before, and on all nodes + the new node after.
136

137
    """
138
    return {
139
      "OP_TARGET": self.op.node_name,
140
      "NODE_NAME": self.op.node_name,
141
      "NODE_PIP": self.op.primary_ip,
142
      "NODE_SIP": self.op.secondary_ip,
143
      "MASTER_CAPABLE": str(self.op.master_capable),
144
      "VM_CAPABLE": str(self.op.vm_capable),
145
      }
146

    
147
  def BuildHooksNodes(self):
148
    """Build hooks nodes.
149

150
    """
151
    hook_nodes = self.cfg.GetNodeList()
152
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
153
    if new_node_info is not None:
154
      # Exclude added node
155
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
156

    
157
    # add the new node as post hook node by name; it does not have an UUID yet
158
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
159

    
160
  def CheckPrereq(self):
161
    """Check prerequisites.
162

163
    This checks:
164
     - the new node is not already in the config
165
     - it is resolvable
166
     - its parameters (single/dual homed) matches the cluster
167

168
    Any errors are signaled by raising errors.OpPrereqError.
169

170
    """
171
    node_name = self.hostname.name
172
    self.op.primary_ip = self.hostname.ip
173
    if self.op.secondary_ip is None:
174
      if self.primary_ip_family == netutils.IP6Address.family:
175
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
176
                                   " IPv4 address must be given as secondary",
177
                                   errors.ECODE_INVAL)
178
      self.op.secondary_ip = self.op.primary_ip
179

    
180
    secondary_ip = self.op.secondary_ip
181
    if not netutils.IP4Address.IsValid(secondary_ip):
182
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
183
                                 " address" % secondary_ip, errors.ECODE_INVAL)
184

    
185
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
186
    if not self.op.readd and existing_node_info is not None:
187
      raise errors.OpPrereqError("Node %s is already in the configuration" %
188
                                 node_name, errors.ECODE_EXISTS)
189
    elif self.op.readd and existing_node_info is None:
190
      raise errors.OpPrereqError("Node %s is not in the configuration" %
191
                                 node_name, errors.ECODE_NOENT)
192

    
193
    self.changed_primary_ip = False
194

    
195
    for existing_node in self.cfg.GetAllNodesInfo().values():
196
      if self.op.readd and node_name == existing_node.name:
197
        if existing_node.secondary_ip != secondary_ip:
198
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
199
                                     " address configuration as before",
200
                                     errors.ECODE_INVAL)
201
        if existing_node.primary_ip != self.op.primary_ip:
202
          self.changed_primary_ip = True
203

    
204
        continue
205

    
206
      if (existing_node.primary_ip == self.op.primary_ip or
207
          existing_node.secondary_ip == self.op.primary_ip or
208
          existing_node.primary_ip == secondary_ip or
209
          existing_node.secondary_ip == secondary_ip):
210
        raise errors.OpPrereqError("New node ip address(es) conflict with"
211
                                   " existing node %s" % existing_node.name,
212
                                   errors.ECODE_NOTUNIQUE)
213

    
214
    # After this 'if' block, None is no longer a valid value for the
215
    # _capable op attributes
216
    if self.op.readd:
217
      assert existing_node_info is not None, \
218
        "Can't retrieve locked node %s" % node_name
219
      for attr in self._NFLAGS:
220
        if getattr(self.op, attr) is None:
221
          setattr(self.op, attr, getattr(existing_node_info, attr))
222
    else:
223
      for attr in self._NFLAGS:
224
        if getattr(self.op, attr) is None:
225
          setattr(self.op, attr, True)
226

    
227
    if self.op.readd and not self.op.vm_capable:
228
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
229
      if pri or sec:
230
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
231
                                   " flag set to false, but it already holds"
232
                                   " instances" % node_name,
233
                                   errors.ECODE_STATE)
234

    
235
    # check that the type of the node (single versus dual homed) is the
236
    # same as for the master
237
    myself = self.cfg.GetMasterNodeInfo()
238
    master_singlehomed = myself.secondary_ip == myself.primary_ip
239
    newbie_singlehomed = secondary_ip == self.op.primary_ip
240
    if master_singlehomed != newbie_singlehomed:
241
      if master_singlehomed:
242
        raise errors.OpPrereqError("The master has no secondary ip but the"
243
                                   " new node has one",
244
                                   errors.ECODE_INVAL)
245
      else:
246
        raise errors.OpPrereqError("The master has a secondary ip but the"
247
                                   " new node doesn't have one",
248
                                   errors.ECODE_INVAL)
249

    
250
    # checks reachability
251
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
252
      raise errors.OpPrereqError("Node not reachable by ping",
253
                                 errors.ECODE_ENVIRON)
254

    
255
    if not newbie_singlehomed:
256
      # check reachability from my secondary ip to newbie's secondary ip
257
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
258
                              source=myself.secondary_ip):
259
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
260
                                   " based ping to node daemon port",
261
                                   errors.ECODE_ENVIRON)
262

    
263
    if self.op.readd:
264
      exceptions = [existing_node_info.uuid]
265
    else:
266
      exceptions = []
267

    
268
    if self.op.master_capable:
269
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
270
    else:
271
      self.master_candidate = False
272

    
273
    node_group = self.cfg.LookupNodeGroup(self.op.group)
274

    
275
    if self.op.readd:
276
      self.new_node = existing_node_info
277
    else:
278
      self.new_node = objects.Node(name=node_name,
279
                                   primary_ip=self.op.primary_ip,
280
                                   secondary_ip=secondary_ip,
281
                                   master_candidate=self.master_candidate,
282
                                   offline=False, drained=False,
283
                                   group=node_group, ndparams={})
284

    
285
    if self.op.ndparams:
286
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
287
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
288
                           "node", "cluster or group")
289

    
290
    if self.op.hv_state:
291
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
292

    
293
    if self.op.disk_state:
294
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
295

    
296
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
297
    #       it a property on the base class.
298
    rpcrunner = rpc.DnsOnlyRunner()
299
    result = rpcrunner.call_version([node_name])[node_name]
300
    result.Raise("Can't get version information from node %s" % node_name)
301
    if constants.PROTOCOL_VERSION == result.payload:
302
      logging.info("Communication to node %s fine, sw version %s match",
303
                   node_name, result.payload)
304
    else:
305
      raise errors.OpPrereqError("Version mismatch master version %s,"
306
                                 " node version %s" %
307
                                 (constants.PROTOCOL_VERSION, result.payload),
308
                                 errors.ECODE_ENVIRON)
309

    
310
    vg_name = self.cfg.GetVGName()
311
    if vg_name is not None:
312
      vparams = {constants.NV_PVLIST: [vg_name]}
313
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
314
      cname = self.cfg.GetClusterName()
315
      result = rpcrunner.call_node_verify_light(
316
          [node_name], vparams, cname,
317
          self.cfg.GetClusterInfo().hvparams,
318
          {node_name: node_group},
319
          self.cfg.GetAllNodeGroupsInfoDict()
320
        )[node_name]
321
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
322
      if errmsgs:
323
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
324
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
325

    
326
  def Exec(self, feedback_fn):
327
    """Adds the new node to the cluster.
328

329
    """
330
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
331
      "Not owning BGL"
332

    
333
    # We adding a new node so we assume it's powered
334
    self.new_node.powered = True
335

    
336
    # for re-adds, reset the offline/drained/master-candidate flags;
337
    # we need to reset here, otherwise offline would prevent RPC calls
338
    # later in the procedure; this also means that if the re-add
339
    # fails, we are left with a non-offlined, broken node
340
    if self.op.readd:
341
      self.new_node.drained = False
342
      self.LogInfo("Readding a node, the offline/drained flags were reset")
343
      # if we demote the node, we do cleanup later in the procedure
344
      self.new_node.master_candidate = self.master_candidate
345
      if self.changed_primary_ip:
346
        self.new_node.primary_ip = self.op.primary_ip
347

    
348
    # copy the master/vm_capable flags
349
    for attr in self._NFLAGS:
350
      setattr(self.new_node, attr, getattr(self.op, attr))
351

    
352
    # notify the user about any possible mc promotion
353
    if self.new_node.master_candidate:
354
      self.LogInfo("Node will be a master candidate")
355

    
356
    if self.op.ndparams:
357
      self.new_node.ndparams = self.op.ndparams
358
    else:
359
      self.new_node.ndparams = {}
360

    
361
    if self.op.hv_state:
362
      self.new_node.hv_state_static = self.new_hv_state
363

    
364
    if self.op.disk_state:
365
      self.new_node.disk_state_static = self.new_disk_state
366

    
367
    # Add node to our /etc/hosts, and add key to known_hosts
368
    if self.cfg.GetClusterInfo().modify_etc_hosts:
369
      master_node = self.cfg.GetMasterNode()
370
      result = self.rpc.call_etc_hosts_modify(
371
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
372
                 self.hostname.ip)
373
      result.Raise("Can't update hosts file with new host data")
374

    
375
    if self.new_node.secondary_ip != self.new_node.primary_ip:
376
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
377
                               False)
378

    
379
    node_verifier_uuids = [self.cfg.GetMasterNode()]
380
    node_verify_param = {
381
      constants.NV_NODELIST: ([self.new_node.name], {}),
382
      # TODO: do a node-net-test as well?
383
    }
384

    
385
    result = self.rpc.call_node_verify(
386
               node_verifier_uuids, node_verify_param,
387
               self.cfg.GetClusterName(),
388
               self.cfg.GetClusterInfo().hvparams,
389
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
390
               self.cfg.GetAllNodeGroupsInfoDict()
391
               )
392
    for verifier in node_verifier_uuids:
393
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
394
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
395
      if nl_payload:
396
        for failed in nl_payload:
397
          feedback_fn("ssh/hostname verification failed"
398
                      " (checking from %s): %s" %
399
                      (verifier, nl_payload[failed]))
400
        raise errors.OpExecError("ssh/hostname verification failed")
401

    
402
    # OpenvSwitch initialization on the node
403
    ovs = self.new_node.ndparams.get(constants.ND_OVS, None)
404
    ovs_name = self.new_node.ndparams.get(constants.ND_OVS_NAME, None)
405
    ovs_link = self.new_node.ndparams.get(constants.ND_OVS_LINK, None)
406

    
407
    if ovs:
408
      result = self.rpc.call_node_configure_ovs(
409
                 self.new_node.name, ovs_name, ovs_link)
410

    
411
    if self.op.readd:
412
      self.context.ReaddNode(self.new_node)
413
      RedistributeAncillaryFiles(self)
414
      # make sure we redistribute the config
415
      self.cfg.Update(self.new_node, feedback_fn)
416
      # and make sure the new node will not have old files around
417
      if not self.new_node.master_candidate:
418
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
419
        result.Warn("Node failed to demote itself from master candidate status",
420
                    self.LogWarning)
421
    else:
422
      self.context.AddNode(self.new_node, self.proc.GetECId())
423
      RedistributeAncillaryFiles(self)
424

    
425

    
426
class LUNodeSetParams(LogicalUnit):
427
  """Modifies the parameters of a node.
428

429
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
430
      to the node role (as _ROLE_*)
431
  @cvar _R2F: a dictionary from node role to tuples of flags
432
  @cvar _FLAGS: a list of attribute names corresponding to the flags
433

434
  """
435
  HPATH = "node-modify"
436
  HTYPE = constants.HTYPE_NODE
437
  REQ_BGL = False
438
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
439
  _F2R = {
440
    (True, False, False): _ROLE_CANDIDATE,
441
    (False, True, False): _ROLE_DRAINED,
442
    (False, False, True): _ROLE_OFFLINE,
443
    (False, False, False): _ROLE_REGULAR,
444
    }
445
  _R2F = dict((v, k) for k, v in _F2R.items())
446
  _FLAGS = ["master_candidate", "drained", "offline"]
447

    
448
  def CheckArguments(self):
449
    (self.op.node_uuid, self.op.node_name) = \
450
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
451
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
452
                self.op.master_capable, self.op.vm_capable,
453
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
454
                self.op.disk_state]
455
    if all_mods.count(None) == len(all_mods):
456
      raise errors.OpPrereqError("Please pass at least one modification",
457
                                 errors.ECODE_INVAL)
458
    if all_mods.count(True) > 1:
459
      raise errors.OpPrereqError("Can't set the node into more than one"
460
                                 " state at the same time",
461
                                 errors.ECODE_INVAL)
462

    
463
    # Boolean value that tells us whether we might be demoting from MC
464
    self.might_demote = (self.op.master_candidate is False or
465
                         self.op.offline is True or
466
                         self.op.drained is True or
467
                         self.op.master_capable is False)
468

    
469
    if self.op.secondary_ip:
470
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
471
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
472
                                   " address" % self.op.secondary_ip,
473
                                   errors.ECODE_INVAL)
474

    
475
    self.lock_all = self.op.auto_promote and self.might_demote
476
    self.lock_instances = self.op.secondary_ip is not None
477

    
478
  def _InstanceFilter(self, instance):
479
    """Filter for getting affected instances.
480

481
    """
482
    return (instance.disk_template in constants.DTS_INT_MIRROR and
483
            self.op.node_uuid in instance.all_nodes)
484

    
485
  def ExpandNames(self):
486
    if self.lock_all:
487
      self.needed_locks = {
488
        locking.LEVEL_NODE: locking.ALL_SET,
489

    
490
        # Block allocations when all nodes are locked
491
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
492
        }
493
    else:
494
      self.needed_locks = {
495
        locking.LEVEL_NODE: self.op.node_uuid,
496
        }
497

    
498
    # Since modifying a node can have severe effects on currently running
499
    # operations the resource lock is at least acquired in shared mode
500
    self.needed_locks[locking.LEVEL_NODE_RES] = \
501
      self.needed_locks[locking.LEVEL_NODE]
502

    
503
    # Get all locks except nodes in shared mode; they are not used for anything
504
    # but read-only access
505
    self.share_locks = ShareAll()
506
    self.share_locks[locking.LEVEL_NODE] = 0
507
    self.share_locks[locking.LEVEL_NODE_RES] = 0
508
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
509

    
510
    if self.lock_instances:
511
      self.needed_locks[locking.LEVEL_INSTANCE] = \
512
        self.cfg.GetInstanceNames(
513
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
514

    
515
  def BuildHooksEnv(self):
516
    """Build hooks env.
517

518
    This runs on the master node.
519

520
    """
521
    return {
522
      "OP_TARGET": self.op.node_name,
523
      "MASTER_CANDIDATE": str(self.op.master_candidate),
524
      "OFFLINE": str(self.op.offline),
525
      "DRAINED": str(self.op.drained),
526
      "MASTER_CAPABLE": str(self.op.master_capable),
527
      "VM_CAPABLE": str(self.op.vm_capable),
528
      }
529

    
530
  def BuildHooksNodes(self):
531
    """Build hooks nodes.
532

533
    """
534
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
535
    return (nl, nl)
536

    
537
  def CheckPrereq(self):
538
    """Check prerequisites.
539

540
    This only checks the instance list against the existing names.
541

542
    """
543
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
544
    if self.lock_instances:
545
      affected_instances = \
546
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
547

    
548
      # Verify instance locks
549
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
550
      wanted_instance_names = frozenset([inst.name for inst in
551
                                         affected_instances.values()])
552
      if wanted_instance_names - owned_instance_names:
553
        raise errors.OpPrereqError("Instances affected by changing node %s's"
554
                                   " secondary IP address have changed since"
555
                                   " locks were acquired, wanted '%s', have"
556
                                   " '%s'; retry the operation" %
557
                                   (node.name,
558
                                    utils.CommaJoin(wanted_instance_names),
559
                                    utils.CommaJoin(owned_instance_names)),
560
                                   errors.ECODE_STATE)
561
    else:
562
      affected_instances = None
563

    
564
    if (self.op.master_candidate is not None or
565
        self.op.drained is not None or
566
        self.op.offline is not None):
567
      # we can't change the master's node flags
568
      if node.uuid == self.cfg.GetMasterNode():
569
        raise errors.OpPrereqError("The master role can be changed"
570
                                   " only via master-failover",
571
                                   errors.ECODE_INVAL)
572

    
573
    if self.op.master_candidate and not node.master_capable:
574
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
575
                                 " it a master candidate" % node.name,
576
                                 errors.ECODE_STATE)
577

    
578
    if self.op.vm_capable is False:
579
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
580
      if ipri or isec:
581
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
582
                                   " the vm_capable flag" % node.name,
583
                                   errors.ECODE_STATE)
584

    
585
    if node.master_candidate and self.might_demote and not self.lock_all:
586
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
587
      # check if after removing the current node, we're missing master
588
      # candidates
589
      (mc_remaining, mc_should, _) = \
590
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
591
      if mc_remaining < mc_should:
592
        raise errors.OpPrereqError("Not enough master candidates, please"
593
                                   " pass auto promote option to allow"
594
                                   " promotion (--auto-promote or RAPI"
595
                                   " auto_promote=True)", errors.ECODE_STATE)
596

    
597
    self.old_flags = old_flags = (node.master_candidate,
598
                                  node.drained, node.offline)
599
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
600
    self.old_role = old_role = self._F2R[old_flags]
601

    
602
    # Check for ineffective changes
603
    for attr in self._FLAGS:
604
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
605
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
606
        setattr(self.op, attr, None)
607

    
608
    # Past this point, any flag change to False means a transition
609
    # away from the respective state, as only real changes are kept
610

    
611
    # TODO: We might query the real power state if it supports OOB
612
    if SupportsOob(self.cfg, node):
613
      if self.op.offline is False and not (node.powered or
614
                                           self.op.powered is True):
615
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
616
                                    " offline status can be reset") %
617
                                   self.op.node_name, errors.ECODE_STATE)
618
    elif self.op.powered is not None:
619
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
620
                                  " as it does not support out-of-band"
621
                                  " handling") % self.op.node_name,
622
                                 errors.ECODE_STATE)
623

    
624
    # If we're being deofflined/drained, we'll MC ourself if needed
625
    if (self.op.drained is False or self.op.offline is False or
626
        (self.op.master_capable and not node.master_capable)):
627
      if _DecideSelfPromotion(self):
628
        self.op.master_candidate = True
629
        self.LogInfo("Auto-promoting node to master candidate")
630

    
631
    # If we're no longer master capable, we'll demote ourselves from MC
632
    if self.op.master_capable is False and node.master_candidate:
633
      self.LogInfo("Demoting from master candidate")
634
      self.op.master_candidate = False
635

    
636
    # Compute new role
637
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
638
    if self.op.master_candidate:
639
      new_role = self._ROLE_CANDIDATE
640
    elif self.op.drained:
641
      new_role = self._ROLE_DRAINED
642
    elif self.op.offline:
643
      new_role = self._ROLE_OFFLINE
644
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
645
      # False is still in new flags, which means we're un-setting (the
646
      # only) True flag
647
      new_role = self._ROLE_REGULAR
648
    else: # no new flags, nothing, keep old role
649
      new_role = old_role
650

    
651
    self.new_role = new_role
652

    
653
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
654
      # Trying to transition out of offline status
655
      result = self.rpc.call_version([node.uuid])[node.uuid]
656
      if result.fail_msg:
657
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
658
                                   " to report its version: %s" %
659
                                   (node.name, result.fail_msg),
660
                                   errors.ECODE_STATE)
661
      else:
662
        self.LogWarning("Transitioning node from offline to online state"
663
                        " without using re-add. Please make sure the node"
664
                        " is healthy!")
665

    
666
    # When changing the secondary ip, verify if this is a single-homed to
667
    # multi-homed transition or vice versa, and apply the relevant
668
    # restrictions.
669
    if self.op.secondary_ip:
670
      # Ok even without locking, because this can't be changed by any LU
671
      master = self.cfg.GetMasterNodeInfo()
672
      master_singlehomed = master.secondary_ip == master.primary_ip
673
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
674
        if self.op.force and node.uuid == master.uuid:
675
          self.LogWarning("Transitioning from single-homed to multi-homed"
676
                          " cluster; all nodes will require a secondary IP"
677
                          " address")
678
        else:
679
          raise errors.OpPrereqError("Changing the secondary ip on a"
680
                                     " single-homed cluster requires the"
681
                                     " --force option to be passed, and the"
682
                                     " target node to be the master",
683
                                     errors.ECODE_INVAL)
684
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
685
        if self.op.force and node.uuid == master.uuid:
686
          self.LogWarning("Transitioning from multi-homed to single-homed"
687
                          " cluster; secondary IP addresses will have to be"
688
                          " removed")
689
        else:
690
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
691
                                     " same as the primary IP on a multi-homed"
692
                                     " cluster, unless the --force option is"
693
                                     " passed, and the target node is the"
694
                                     " master", errors.ECODE_INVAL)
695

    
696
      assert not (set([inst.name for inst in affected_instances.values()]) -
697
                  self.owned_locks(locking.LEVEL_INSTANCE))
698

    
699
      if node.offline:
700
        if affected_instances:
701
          msg = ("Cannot change secondary IP address: offline node has"
702
                 " instances (%s) configured to use it" %
703
                 utils.CommaJoin(
704
                   [inst.name for inst in affected_instances.values()]))
705
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
706
      else:
707
        # On online nodes, check that no instances are running, and that
708
        # the node has the new ip and we can reach it.
709
        for instance in affected_instances.values():
710
          CheckInstanceState(self, instance, INSTANCE_DOWN,
711
                             msg="cannot change secondary ip")
712

    
713
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
714
        if master.uuid != node.uuid:
715
          # check reachability from master secondary ip to new secondary ip
716
          if not netutils.TcpPing(self.op.secondary_ip,
717
                                  constants.DEFAULT_NODED_PORT,
718
                                  source=master.secondary_ip):
719
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
720
                                       " based ping to node daemon port",
721
                                       errors.ECODE_ENVIRON)
722

    
723
    if self.op.ndparams:
724
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
725
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
726
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
727
                           "node", "cluster or group")
728
      self.new_ndparams = new_ndparams
729

    
730
    if self.op.hv_state:
731
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
732
                                                node.hv_state_static)
733

    
734
    if self.op.disk_state:
735
      self.new_disk_state = \
736
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
737

    
738
  def Exec(self, feedback_fn):
739
    """Modifies a node.
740

741
    """
742
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
743
    result = []
744

    
745
    if self.op.ndparams:
746
      node.ndparams = self.new_ndparams
747

    
748
    if self.op.powered is not None:
749
      node.powered = self.op.powered
750

    
751
    if self.op.hv_state:
752
      node.hv_state_static = self.new_hv_state
753

    
754
    if self.op.disk_state:
755
      node.disk_state_static = self.new_disk_state
756

    
757
    for attr in ["master_capable", "vm_capable"]:
758
      val = getattr(self.op, attr)
759
      if val is not None:
760
        setattr(node, attr, val)
761
        result.append((attr, str(val)))
762

    
763
    if self.new_role != self.old_role:
764
      # Tell the node to demote itself, if no longer MC and not offline
765
      if self.old_role == self._ROLE_CANDIDATE and \
766
          self.new_role != self._ROLE_OFFLINE:
767
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
768
        if msg:
769
          self.LogWarning("Node failed to demote itself: %s", msg)
770

    
771
      new_flags = self._R2F[self.new_role]
772
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
773
        if of != nf:
774
          result.append((desc, str(nf)))
775
      (node.master_candidate, node.drained, node.offline) = new_flags
776

    
777
      # we locked all nodes, we adjust the CP before updating this node
778
      if self.lock_all:
779
        AdjustCandidatePool(self, [node.uuid])
780

    
781
    if self.op.secondary_ip:
782
      node.secondary_ip = self.op.secondary_ip
783
      result.append(("secondary_ip", self.op.secondary_ip))
784

    
785
    # this will trigger configuration file update, if needed
786
    self.cfg.Update(node, feedback_fn)
787

    
788
    # this will trigger job queue propagation or cleanup if the mc
789
    # flag changed
790
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
791
      self.context.ReaddNode(node)
792

    
793
    return result
794

    
795

    
796
class LUNodePowercycle(NoHooksLU):
797
  """Powercycles a node.
798

799
  """
800
  REQ_BGL = False
801

    
802
  def CheckArguments(self):
803
    (self.op.node_uuid, self.op.node_name) = \
804
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
805

    
806
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
807
      raise errors.OpPrereqError("The node is the master and the force"
808
                                 " parameter was not set",
809
                                 errors.ECODE_INVAL)
810

    
811
  def ExpandNames(self):
812
    """Locking for PowercycleNode.
813

814
    This is a last-resort option and shouldn't block on other
815
    jobs. Therefore, we grab no locks.
816

817
    """
818
    self.needed_locks = {}
819

    
820
  def Exec(self, feedback_fn):
821
    """Reboots a node.
822

823
    """
824
    default_hypervisor = self.cfg.GetHypervisorType()
825
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
826
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
827
                                           default_hypervisor,
828
                                           hvparams)
829
    result.Raise("Failed to schedule the reboot")
830
    return result.payload
831

    
832

    
833
def _GetNodeInstancesInner(cfg, fn):
834
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
835

    
836

    
837
def _GetNodePrimaryInstances(cfg, node_uuid):
838
  """Returns primary instances on a node.
839

840
  """
841
  return _GetNodeInstancesInner(cfg,
842
                                lambda inst: node_uuid == inst.primary_node)
843

    
844

    
845
def _GetNodeSecondaryInstances(cfg, node_uuid):
846
  """Returns secondary instances on a node.
847

848
  """
849
  return _GetNodeInstancesInner(cfg,
850
                                lambda inst: node_uuid in inst.secondary_nodes)
851

    
852

    
853
def _GetNodeInstances(cfg, node_uuid):
854
  """Returns a list of all primary and secondary instances on a node.
855

856
  """
857

    
858
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
859

    
860

    
861
class LUNodeEvacuate(NoHooksLU):
862
  """Evacuates instances off a list of nodes.
863

864
  """
865
  REQ_BGL = False
866

    
867
  def CheckArguments(self):
868
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
869

    
870
  def ExpandNames(self):
871
    (self.op.node_uuid, self.op.node_name) = \
872
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
873

    
874
    if self.op.remote_node is not None:
875
      (self.op.remote_node_uuid, self.op.remote_node) = \
876
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
877
                              self.op.remote_node)
878
      assert self.op.remote_node
879

    
880
      if self.op.node_uuid == self.op.remote_node_uuid:
881
        raise errors.OpPrereqError("Can not use evacuated node as a new"
882
                                   " secondary node", errors.ECODE_INVAL)
883

    
884
      if self.op.mode != constants.NODE_EVAC_SEC:
885
        raise errors.OpPrereqError("Without the use of an iallocator only"
886
                                   " secondary instances can be evacuated",
887
                                   errors.ECODE_INVAL)
888

    
889
    # Declare locks
890
    self.share_locks = ShareAll()
891
    self.needed_locks = {
892
      locking.LEVEL_INSTANCE: [],
893
      locking.LEVEL_NODEGROUP: [],
894
      locking.LEVEL_NODE: [],
895
      }
896

    
897
    # Determine nodes (via group) optimistically, needs verification once locks
898
    # have been acquired
899
    self.lock_nodes = self._DetermineNodes()
900

    
901
  def _DetermineNodes(self):
902
    """Gets the list of node UUIDs to operate on.
903

904
    """
905
    if self.op.remote_node is None:
906
      # Iallocator will choose any node(s) in the same group
907
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
908
    else:
909
      group_nodes = frozenset([self.op.remote_node_uuid])
910

    
911
    # Determine nodes to be locked
912
    return set([self.op.node_uuid]) | group_nodes
913

    
914
  def _DetermineInstances(self):
915
    """Builds list of instances to operate on.
916

917
    """
918
    assert self.op.mode in constants.NODE_EVAC_MODES
919

    
920
    if self.op.mode == constants.NODE_EVAC_PRI:
921
      # Primary instances only
922
      inst_fn = _GetNodePrimaryInstances
923
      assert self.op.remote_node is None, \
924
        "Evacuating primary instances requires iallocator"
925
    elif self.op.mode == constants.NODE_EVAC_SEC:
926
      # Secondary instances only
927
      inst_fn = _GetNodeSecondaryInstances
928
    else:
929
      # All instances
930
      assert self.op.mode == constants.NODE_EVAC_ALL
931
      inst_fn = _GetNodeInstances
932
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
933
      # per instance
934
      raise errors.OpPrereqError("Due to an issue with the iallocator"
935
                                 " interface it is not possible to evacuate"
936
                                 " all instances at once; specify explicitly"
937
                                 " whether to evacuate primary or secondary"
938
                                 " instances",
939
                                 errors.ECODE_INVAL)
940

    
941
    return inst_fn(self.cfg, self.op.node_uuid)
942

    
943
  def DeclareLocks(self, level):
944
    if level == locking.LEVEL_INSTANCE:
945
      # Lock instances optimistically, needs verification once node and group
946
      # locks have been acquired
947
      self.needed_locks[locking.LEVEL_INSTANCE] = \
948
        set(i.name for i in self._DetermineInstances())
949

    
950
    elif level == locking.LEVEL_NODEGROUP:
951
      # Lock node groups for all potential target nodes optimistically, needs
952
      # verification once nodes have been acquired
953
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
954
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
955

    
956
    elif level == locking.LEVEL_NODE:
957
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
958

    
959
  def CheckPrereq(self):
960
    # Verify locks
961
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
962
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
963
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
964

    
965
    need_nodes = self._DetermineNodes()
966

    
967
    if not owned_nodes.issuperset(need_nodes):
968
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
969
                                 " locks were acquired, current nodes are"
970
                                 " are '%s', used to be '%s'; retry the"
971
                                 " operation" %
972
                                 (self.op.node_name,
973
                                  utils.CommaJoin(need_nodes),
974
                                  utils.CommaJoin(owned_nodes)),
975
                                 errors.ECODE_STATE)
976

    
977
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
978
    if owned_groups != wanted_groups:
979
      raise errors.OpExecError("Node groups changed since locks were acquired,"
980
                               " current groups are '%s', used to be '%s';"
981
                               " retry the operation" %
982
                               (utils.CommaJoin(wanted_groups),
983
                                utils.CommaJoin(owned_groups)))
984

    
985
    # Determine affected instances
986
    self.instances = self._DetermineInstances()
987
    self.instance_names = [i.name for i in self.instances]
988

    
989
    if set(self.instance_names) != owned_instance_names:
990
      raise errors.OpExecError("Instances on node '%s' changed since locks"
991
                               " were acquired, current instances are '%s',"
992
                               " used to be '%s'; retry the operation" %
993
                               (self.op.node_name,
994
                                utils.CommaJoin(self.instance_names),
995
                                utils.CommaJoin(owned_instance_names)))
996

    
997
    if self.instance_names:
998
      self.LogInfo("Evacuating instances from node '%s': %s",
999
                   self.op.node_name,
1000
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1001
    else:
1002
      self.LogInfo("No instances to evacuate from node '%s'",
1003
                   self.op.node_name)
1004

    
1005
    if self.op.remote_node is not None:
1006
      for i in self.instances:
1007
        if i.primary_node == self.op.remote_node_uuid:
1008
          raise errors.OpPrereqError("Node %s is the primary node of"
1009
                                     " instance %s, cannot use it as"
1010
                                     " secondary" %
1011
                                     (self.op.remote_node, i.name),
1012
                                     errors.ECODE_INVAL)
1013

    
1014
  def Exec(self, feedback_fn):
1015
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1016

    
1017
    if not self.instance_names:
1018
      # No instances to evacuate
1019
      jobs = []
1020

    
1021
    elif self.op.iallocator is not None:
1022
      # TODO: Implement relocation to other group
1023
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1024
                                     instances=list(self.instance_names))
1025
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1026

    
1027
      ial.Run(self.op.iallocator)
1028

    
1029
      if not ial.success:
1030
        raise errors.OpPrereqError("Can't compute node evacuation using"
1031
                                   " iallocator '%s': %s" %
1032
                                   (self.op.iallocator, ial.info),
1033
                                   errors.ECODE_NORES)
1034

    
1035
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1036

    
1037
    elif self.op.remote_node is not None:
1038
      assert self.op.mode == constants.NODE_EVAC_SEC
1039
      jobs = [
1040
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1041
                                        remote_node=self.op.remote_node,
1042
                                        disks=[],
1043
                                        mode=constants.REPLACE_DISK_CHG,
1044
                                        early_release=self.op.early_release)]
1045
        for instance_name in self.instance_names]
1046

    
1047
    else:
1048
      raise errors.ProgrammerError("No iallocator or remote node")
1049

    
1050
    return ResultWithJobs(jobs)
1051

    
1052

    
1053
class LUNodeMigrate(LogicalUnit):
1054
  """Migrate all instances from a node.
1055

1056
  """
1057
  HPATH = "node-migrate"
1058
  HTYPE = constants.HTYPE_NODE
1059
  REQ_BGL = False
1060

    
1061
  def CheckArguments(self):
1062
    pass
1063

    
1064
  def ExpandNames(self):
1065
    (self.op.node_uuid, self.op.node_name) = \
1066
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1067

    
1068
    self.share_locks = ShareAll()
1069
    self.needed_locks = {
1070
      locking.LEVEL_NODE: [self.op.node_uuid],
1071
      }
1072

    
1073
  def BuildHooksEnv(self):
1074
    """Build hooks env.
1075

1076
    This runs on the master, the primary and all the secondaries.
1077

1078
    """
1079
    return {
1080
      "NODE_NAME": self.op.node_name,
1081
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1082
      }
1083

    
1084
  def BuildHooksNodes(self):
1085
    """Build hooks nodes.
1086

1087
    """
1088
    nl = [self.cfg.GetMasterNode()]
1089
    return (nl, nl)
1090

    
1091
  def CheckPrereq(self):
1092
    pass
1093

    
1094
  def Exec(self, feedback_fn):
1095
    # Prepare jobs for migration instances
1096
    jobs = [
1097
      [opcodes.OpInstanceMigrate(
1098
        instance_name=inst.name,
1099
        mode=self.op.mode,
1100
        live=self.op.live,
1101
        iallocator=self.op.iallocator,
1102
        target_node=self.op.target_node,
1103
        allow_runtime_changes=self.op.allow_runtime_changes,
1104
        ignore_ipolicy=self.op.ignore_ipolicy)]
1105
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1106

    
1107
    # TODO: Run iallocator in this opcode and pass correct placement options to
1108
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1109
    # running the iallocator and the actual migration, a good consistency model
1110
    # will have to be found.
1111

    
1112
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1113
            frozenset([self.op.node_uuid]))
1114

    
1115
    return ResultWithJobs(jobs)
1116

    
1117

    
1118
def _GetStorageTypeArgs(cfg, storage_type):
1119
  """Returns the arguments for a storage type.
1120

1121
  """
1122
  # Special case for file storage
1123
  if storage_type == constants.ST_FILE:
1124
    # storage.FileStorage wants a list of storage directories
1125
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1126

    
1127
  return []
1128

    
1129

    
1130
class LUNodeModifyStorage(NoHooksLU):
1131
  """Logical unit for modifying a storage volume on a node.
1132

1133
  """
1134
  REQ_BGL = False
1135

    
1136
  def CheckArguments(self):
1137
    (self.op.node_uuid, self.op.node_name) = \
1138
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1139

    
1140
    storage_type = self.op.storage_type
1141

    
1142
    try:
1143
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1144
    except KeyError:
1145
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1146
                                 " modified" % storage_type,
1147
                                 errors.ECODE_INVAL)
1148

    
1149
    diff = set(self.op.changes.keys()) - modifiable
1150
    if diff:
1151
      raise errors.OpPrereqError("The following fields can not be modified for"
1152
                                 " storage units of type '%s': %r" %
1153
                                 (storage_type, list(diff)),
1154
                                 errors.ECODE_INVAL)
1155

    
1156
  def CheckPrereq(self):
1157
    """Check prerequisites.
1158

1159
    """
1160
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1161

    
1162
  def ExpandNames(self):
1163
    self.needed_locks = {
1164
      locking.LEVEL_NODE: self.op.node_uuid,
1165
      }
1166

    
1167
  def Exec(self, feedback_fn):
1168
    """Computes the list of nodes and their attributes.
1169

1170
    """
1171
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1172
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1173
                                          self.op.storage_type, st_args,
1174
                                          self.op.name, self.op.changes)
1175
    result.Raise("Failed to modify storage unit '%s' on %s" %
1176
                 (self.op.name, self.op.node_name))
1177

    
1178

    
1179
class NodeQuery(QueryBase):
1180
  FIELDS = query.NODE_FIELDS
1181

    
1182
  def ExpandNames(self, lu):
1183
    lu.needed_locks = {}
1184
    lu.share_locks = ShareAll()
1185

    
1186
    if self.names:
1187
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1188
    else:
1189
      self.wanted = locking.ALL_SET
1190

    
1191
    self.do_locking = (self.use_locking and
1192
                       query.NQ_LIVE in self.requested_data)
1193

    
1194
    if self.do_locking:
1195
      # If any non-static field is requested we need to lock the nodes
1196
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1197
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1198

    
1199
  def DeclareLocks(self, lu, level):
1200
    pass
1201

    
1202
  def _GetQueryData(self, lu):
1203
    """Computes the list of nodes and their attributes.
1204

1205
    """
1206
    all_info = lu.cfg.GetAllNodesInfo()
1207

    
1208
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1209

    
1210
    # Gather data as requested
1211
    if query.NQ_LIVE in self.requested_data:
1212
      # filter out non-vm_capable nodes
1213
      toquery_node_uuids = [node.uuid for node in all_info.values()
1214
                            if node.vm_capable and node.uuid in node_uuids]
1215
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1216
      raw_storage_units = utils.storage.GetStorageUnits(
1217
          lu.cfg, [default_template])
1218
      storage_units = rpc.PrepareStorageUnitsForNodes(
1219
          lu.cfg, raw_storage_units, toquery_node_uuids)
1220
      default_hypervisor = lu.cfg.GetHypervisorType()
1221
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1222
      hvspecs = [(default_hypervisor, hvparams)]
1223
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1224
                                        hvspecs)
1225
      live_data = dict(
1226
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1227
          for (uuid, nresult) in node_data.items()
1228
          if not nresult.fail_msg and nresult.payload)
1229
    else:
1230
      live_data = None
1231

    
1232
    if query.NQ_INST in self.requested_data:
1233
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1234
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1235

    
1236
      inst_data = lu.cfg.GetAllInstancesInfo()
1237
      inst_uuid_to_inst_name = {}
1238

    
1239
      for inst in inst_data.values():
1240
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1241
        if inst.primary_node in node_to_primary:
1242
          node_to_primary[inst.primary_node].add(inst.uuid)
1243
        for secnode in inst.secondary_nodes:
1244
          if secnode in node_to_secondary:
1245
            node_to_secondary[secnode].add(inst.uuid)
1246
    else:
1247
      node_to_primary = None
1248
      node_to_secondary = None
1249
      inst_uuid_to_inst_name = None
1250

    
1251
    if query.NQ_OOB in self.requested_data:
1252
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1253
                         for uuid, node in all_info.iteritems())
1254
    else:
1255
      oob_support = None
1256

    
1257
    if query.NQ_GROUP in self.requested_data:
1258
      groups = lu.cfg.GetAllNodeGroupsInfo()
1259
    else:
1260
      groups = {}
1261

    
1262
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1263
                               live_data, lu.cfg.GetMasterNode(),
1264
                               node_to_primary, node_to_secondary,
1265
                               inst_uuid_to_inst_name, groups, oob_support,
1266
                               lu.cfg.GetClusterInfo())
1267

    
1268

    
1269
class LUNodeQuery(NoHooksLU):
1270
  """Logical unit for querying nodes.
1271

1272
  """
1273
  # pylint: disable=W0142
1274
  REQ_BGL = False
1275

    
1276
  def CheckArguments(self):
1277
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1278
                         self.op.output_fields, self.op.use_locking)
1279

    
1280
  def ExpandNames(self):
1281
    self.nq.ExpandNames(self)
1282

    
1283
  def DeclareLocks(self, level):
1284
    self.nq.DeclareLocks(self, level)
1285

    
1286
  def Exec(self, feedback_fn):
1287
    return self.nq.OldStyleQuery(self)
1288

    
1289

    
1290
def _CheckOutputFields(fields, selected):
1291
  """Checks whether all selected fields are valid according to fields.
1292

1293
  @type fields: L{utils.FieldSet}
1294
  @param fields: fields set
1295
  @type selected: L{utils.FieldSet}
1296
  @param selected: fields set
1297

1298
  """
1299
  delta = fields.NonMatching(selected)
1300
  if delta:
1301
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1302
                               % ",".join(delta), errors.ECODE_INVAL)
1303

    
1304

    
1305
class LUNodeQueryvols(NoHooksLU):
1306
  """Logical unit for getting volumes on node(s).
1307

1308
  """
1309
  REQ_BGL = False
1310

    
1311
  def CheckArguments(self):
1312
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1313
                                      constants.VF_VG, constants.VF_NAME,
1314
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1315
                       self.op.output_fields)
1316

    
1317
  def ExpandNames(self):
1318
    self.share_locks = ShareAll()
1319

    
1320
    if self.op.nodes:
1321
      self.needed_locks = {
1322
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1323
        }
1324
    else:
1325
      self.needed_locks = {
1326
        locking.LEVEL_NODE: locking.ALL_SET,
1327
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1328
        }
1329

    
1330
  def Exec(self, feedback_fn):
1331
    """Computes the list of nodes and their attributes.
1332

1333
    """
1334
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1335
    volumes = self.rpc.call_node_volumes(node_uuids)
1336

    
1337
    ilist = self.cfg.GetAllInstancesInfo()
1338
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1339

    
1340
    output = []
1341
    for node_uuid in node_uuids:
1342
      nresult = volumes[node_uuid]
1343
      if nresult.offline:
1344
        continue
1345
      msg = nresult.fail_msg
1346
      if msg:
1347
        self.LogWarning("Can't compute volume data on node %s: %s",
1348
                        self.cfg.GetNodeName(node_uuid), msg)
1349
        continue
1350

    
1351
      node_vols = sorted(nresult.payload,
1352
                         key=operator.itemgetter(constants.VF_DEV))
1353

    
1354
      for vol in node_vols:
1355
        node_output = []
1356
        for field in self.op.output_fields:
1357
          if field == constants.VF_NODE:
1358
            val = self.cfg.GetNodeName(node_uuid)
1359
          elif field == constants.VF_PHYS:
1360
            val = vol[constants.VF_DEV]
1361
          elif field == constants.VF_VG:
1362
            val = vol[constants.VF_VG]
1363
          elif field == constants.VF_NAME:
1364
            val = vol[constants.VF_NAME]
1365
          elif field == constants.VF_SIZE:
1366
            val = int(float(vol[constants.VF_SIZE]))
1367
          elif field == constants.VF_INSTANCE:
1368
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1369
                                 vol[constants.VF_NAME]), None)
1370
            if inst is not None:
1371
              val = inst.name
1372
            else:
1373
              val = "-"
1374
          else:
1375
            raise errors.ParameterError(field)
1376
          node_output.append(str(val))
1377

    
1378
        output.append(node_output)
1379

    
1380
    return output
1381

    
1382

    
1383
class LUNodeQueryStorage(NoHooksLU):
1384
  """Logical unit for getting information on storage units on node(s).
1385

1386
  """
1387
  REQ_BGL = False
1388

    
1389
  def CheckArguments(self):
1390
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1391
                       self.op.output_fields)
1392

    
1393
  def ExpandNames(self):
1394
    self.share_locks = ShareAll()
1395

    
1396
    if self.op.nodes:
1397
      self.needed_locks = {
1398
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1399
        }
1400
    else:
1401
      self.needed_locks = {
1402
        locking.LEVEL_NODE: locking.ALL_SET,
1403
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1404
        }
1405

    
1406
  def _DetermineStorageType(self):
1407
    """Determines the default storage type of the cluster.
1408

1409
    """
1410
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1411
    default_storage_type = \
1412
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1413
    return default_storage_type
1414

    
1415
  def CheckPrereq(self):
1416
    """Check prerequisites.
1417

1418
    """
1419
    if self.op.storage_type:
1420
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1421
      self.storage_type = self.op.storage_type
1422
    else:
1423
      self.storage_type = self._DetermineStorageType()
1424
      if self.storage_type not in constants.STS_REPORT:
1425
        raise errors.OpPrereqError(
1426
            "Storage reporting for storage type '%s' is not supported. Please"
1427
            " use the --storage-type option to specify one of the supported"
1428
            " storage types (%s) or set the default disk template to one that"
1429
            " supports storage reporting." %
1430
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1431

    
1432
  def Exec(self, feedback_fn):
1433
    """Computes the list of nodes and their attributes.
1434

1435
    """
1436
    if self.op.storage_type:
1437
      self.storage_type = self.op.storage_type
1438
    else:
1439
      self.storage_type = self._DetermineStorageType()
1440

    
1441
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1442

    
1443
    # Always get name to sort by
1444
    if constants.SF_NAME in self.op.output_fields:
1445
      fields = self.op.output_fields[:]
1446
    else:
1447
      fields = [constants.SF_NAME] + self.op.output_fields
1448

    
1449
    # Never ask for node or type as it's only known to the LU
1450
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1451
      while extra in fields:
1452
        fields.remove(extra)
1453

    
1454
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1455
    name_idx = field_idx[constants.SF_NAME]
1456

    
1457
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1458
    data = self.rpc.call_storage_list(self.node_uuids,
1459
                                      self.storage_type, st_args,
1460
                                      self.op.name, fields)
1461

    
1462
    result = []
1463

    
1464
    for node_uuid in utils.NiceSort(self.node_uuids):
1465
      node_name = self.cfg.GetNodeName(node_uuid)
1466
      nresult = data[node_uuid]
1467
      if nresult.offline:
1468
        continue
1469

    
1470
      msg = nresult.fail_msg
1471
      if msg:
1472
        self.LogWarning("Can't get storage data from node %s: %s",
1473
                        node_name, msg)
1474
        continue
1475

    
1476
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1477

    
1478
      for name in utils.NiceSort(rows.keys()):
1479
        row = rows[name]
1480

    
1481
        out = []
1482

    
1483
        for field in self.op.output_fields:
1484
          if field == constants.SF_NODE:
1485
            val = node_name
1486
          elif field == constants.SF_TYPE:
1487
            val = self.storage_type
1488
          elif field in field_idx:
1489
            val = row[field_idx[field]]
1490
          else:
1491
            raise errors.ParameterError(field)
1492

    
1493
          out.append(val)
1494

    
1495
        result.append(out)
1496

    
1497
    return result
1498

    
1499

    
1500
class LUNodeRemove(LogicalUnit):
1501
  """Logical unit for removing a node.
1502

1503
  """
1504
  HPATH = "node-remove"
1505
  HTYPE = constants.HTYPE_NODE
1506

    
1507
  def BuildHooksEnv(self):
1508
    """Build hooks env.
1509

1510
    """
1511
    return {
1512
      "OP_TARGET": self.op.node_name,
1513
      "NODE_NAME": self.op.node_name,
1514
      }
1515

    
1516
  def BuildHooksNodes(self):
1517
    """Build hooks nodes.
1518

1519
    This doesn't run on the target node in the pre phase as a failed
1520
    node would then be impossible to remove.
1521

1522
    """
1523
    all_nodes = self.cfg.GetNodeList()
1524
    try:
1525
      all_nodes.remove(self.op.node_uuid)
1526
    except ValueError:
1527
      pass
1528
    return (all_nodes, all_nodes)
1529

    
1530
  def CheckPrereq(self):
1531
    """Check prerequisites.
1532

1533
    This checks:
1534
     - the node exists in the configuration
1535
     - it does not have primary or secondary instances
1536
     - it's not the master
1537

1538
    Any errors are signaled by raising errors.OpPrereqError.
1539

1540
    """
1541
    (self.op.node_uuid, self.op.node_name) = \
1542
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1543
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1544
    assert node is not None
1545

    
1546
    masternode = self.cfg.GetMasterNode()
1547
    if node.uuid == masternode:
1548
      raise errors.OpPrereqError("Node is the master node, failover to another"
1549
                                 " node is required", errors.ECODE_INVAL)
1550

    
1551
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1552
      if node.uuid in instance.all_nodes:
1553
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1554
                                   " please remove first" % instance.name,
1555
                                   errors.ECODE_INVAL)
1556
    self.op.node_name = node.name
1557
    self.node = node
1558

    
1559
  def Exec(self, feedback_fn):
1560
    """Removes the node from the cluster.
1561

1562
    """
1563
    logging.info("Stopping the node daemon and removing configs from node %s",
1564
                 self.node.name)
1565

    
1566
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1567

    
1568
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1569
      "Not owning BGL"
1570

    
1571
    # Promote nodes to master candidate as needed
1572
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1573
    self.context.RemoveNode(self.node)
1574

    
1575
    # Run post hooks on the node before it's removed
1576
    RunPostHook(self, self.node.name)
1577

    
1578
    # we have to call this by name rather than by UUID, as the node is no longer
1579
    # in the config
1580
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1581
    msg = result.fail_msg
1582
    if msg:
1583
      self.LogWarning("Errors encountered on the remote node while leaving"
1584
                      " the cluster: %s", msg)
1585

    
1586
    # Remove node from our /etc/hosts
1587
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1588
      master_node_uuid = self.cfg.GetMasterNode()
1589
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1590
                                              constants.ETC_HOSTS_REMOVE,
1591
                                              self.node.name, None)
1592
      result.Raise("Can't update hosts file with new host data")
1593
      RedistributeAncillaryFiles(self)
1594

    
1595

    
1596
class LURepairNodeStorage(NoHooksLU):
1597
  """Repairs the volume group on a node.
1598

1599
  """
1600
  REQ_BGL = False
1601

    
1602
  def CheckArguments(self):
1603
    (self.op.node_uuid, self.op.node_name) = \
1604
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1605

    
1606
    storage_type = self.op.storage_type
1607

    
1608
    if (constants.SO_FIX_CONSISTENCY not in
1609
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1610
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1611
                                 " repaired" % storage_type,
1612
                                 errors.ECODE_INVAL)
1613

    
1614
  def ExpandNames(self):
1615
    self.needed_locks = {
1616
      locking.LEVEL_NODE: [self.op.node_uuid],
1617
      }
1618

    
1619
  def _CheckFaultyDisks(self, instance, node_uuid):
1620
    """Ensure faulty disks abort the opcode or at least warn."""
1621
    try:
1622
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1623
                                 node_uuid, True):
1624
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1625
                                   " node '%s'" %
1626
                                   (instance.name,
1627
                                    self.cfg.GetNodeName(node_uuid)),
1628
                                   errors.ECODE_STATE)
1629
    except errors.OpPrereqError, err:
1630
      if self.op.ignore_consistency:
1631
        self.LogWarning(str(err.args[0]))
1632
      else:
1633
        raise
1634

    
1635
  def CheckPrereq(self):
1636
    """Check prerequisites.
1637

1638
    """
1639
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1640

    
1641
    # Check whether any instance on this node has faulty disks
1642
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1643
      if not inst.disks_active:
1644
        continue
1645
      check_nodes = set(inst.all_nodes)
1646
      check_nodes.discard(self.op.node_uuid)
1647
      for inst_node_uuid in check_nodes:
1648
        self._CheckFaultyDisks(inst, inst_node_uuid)
1649

    
1650
  def Exec(self, feedback_fn):
1651
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1652
                (self.op.name, self.op.node_name))
1653

    
1654
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1655
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1656
                                           self.op.storage_type, st_args,
1657
                                           self.op.name,
1658
                                           constants.SO_FIX_CONSISTENCY)
1659
    result.Raise("Failed to repair storage unit '%s' on %s" %
1660
                 (self.op.name, self.op.node_name))