Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 355d1f32

History | View | Annotate | Download (60.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
    if self.op.ndparams:
116
      ovs = self.op.ndparams.get(constants.ND_OVS, None)
117
      ovs_name = self.op.ndparams.get(constants.ND_OVS_NAME, None)
118
      ovs_link = self.op.ndparams.get(constants.ND_OVS_LINK, None)
119

    
120
      # OpenvSwitch: Warn user if link is missing
121
      if ovs and not ovs_link:
122
        self.LogInfo("No physical interface for OpenvSwitch was given."
123
                     " OpenvSwitch will not have an outside connection. This"
124
                     " might not be what you want.")
125
      # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
126
      if not ovs and (ovs_name or ovs_link):
127
        raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
128
                                   " OpenvSwitch is not enabled. Please enable"
129
                                   " OpenvSwitch with --ovs",
130
                                   errors.ECODE_INVAL)
131

    
132
  def BuildHooksEnv(self):
133
    """Build hooks env.
134

135
    This will run on all nodes before, and on all nodes + the new node after.
136

137
    """
138
    return {
139
      "OP_TARGET": self.op.node_name,
140
      "NODE_NAME": self.op.node_name,
141
      "NODE_PIP": self.op.primary_ip,
142
      "NODE_SIP": self.op.secondary_ip,
143
      "MASTER_CAPABLE": str(self.op.master_capable),
144
      "VM_CAPABLE": str(self.op.vm_capable),
145
      }
146

    
147
  def BuildHooksNodes(self):
148
    """Build hooks nodes.
149

150
    """
151
    hook_nodes = self.cfg.GetNodeList()
152
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
153
    if new_node_info is not None:
154
      # Exclude added node
155
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
156

    
157
    # add the new node as post hook node by name; it does not have an UUID yet
158
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
159

    
160
  def CheckPrereq(self):
161
    """Check prerequisites.
162

163
    This checks:
164
     - the new node is not already in the config
165
     - it is resolvable
166
     - its parameters (single/dual homed) matches the cluster
167

168
    Any errors are signaled by raising errors.OpPrereqError.
169

170
    """
171
    node_name = self.hostname.name
172
    self.op.primary_ip = self.hostname.ip
173
    if self.op.secondary_ip is None:
174
      if self.primary_ip_family == netutils.IP6Address.family:
175
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
176
                                   " IPv4 address must be given as secondary",
177
                                   errors.ECODE_INVAL)
178
      self.op.secondary_ip = self.op.primary_ip
179

    
180
    secondary_ip = self.op.secondary_ip
181
    if not netutils.IP4Address.IsValid(secondary_ip):
182
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
183
                                 " address" % secondary_ip, errors.ECODE_INVAL)
184

    
185
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
186
    if not self.op.readd and existing_node_info is not None:
187
      raise errors.OpPrereqError("Node %s is already in the configuration" %
188
                                 node_name, errors.ECODE_EXISTS)
189
    elif self.op.readd and existing_node_info is None:
190
      raise errors.OpPrereqError("Node %s is not in the configuration" %
191
                                 node_name, errors.ECODE_NOENT)
192

    
193
    self.changed_primary_ip = False
194

    
195
    for existing_node in self.cfg.GetAllNodesInfo().values():
196
      if self.op.readd and node_name == existing_node.name:
197
        if existing_node.secondary_ip != secondary_ip:
198
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
199
                                     " address configuration as before",
200
                                     errors.ECODE_INVAL)
201
        if existing_node.primary_ip != self.op.primary_ip:
202
          self.changed_primary_ip = True
203

    
204
        continue
205

    
206
      if (existing_node.primary_ip == self.op.primary_ip or
207
          existing_node.secondary_ip == self.op.primary_ip or
208
          existing_node.primary_ip == secondary_ip or
209
          existing_node.secondary_ip == secondary_ip):
210
        raise errors.OpPrereqError("New node ip address(es) conflict with"
211
                                   " existing node %s" % existing_node.name,
212
                                   errors.ECODE_NOTUNIQUE)
213

    
214
    # After this 'if' block, None is no longer a valid value for the
215
    # _capable op attributes
216
    if self.op.readd:
217
      assert existing_node_info is not None, \
218
        "Can't retrieve locked node %s" % node_name
219
      for attr in self._NFLAGS:
220
        if getattr(self.op, attr) is None:
221
          setattr(self.op, attr, getattr(existing_node_info, attr))
222
    else:
223
      for attr in self._NFLAGS:
224
        if getattr(self.op, attr) is None:
225
          setattr(self.op, attr, True)
226

    
227
    if self.op.readd and not self.op.vm_capable:
228
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
229
      if pri or sec:
230
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
231
                                   " flag set to false, but it already holds"
232
                                   " instances" % node_name,
233
                                   errors.ECODE_STATE)
234

    
235
    # check that the type of the node (single versus dual homed) is the
236
    # same as for the master
237
    myself = self.cfg.GetMasterNodeInfo()
238
    master_singlehomed = myself.secondary_ip == myself.primary_ip
239
    newbie_singlehomed = secondary_ip == self.op.primary_ip
240
    if master_singlehomed != newbie_singlehomed:
241
      if master_singlehomed:
242
        raise errors.OpPrereqError("The master has no secondary ip but the"
243
                                   " new node has one",
244
                                   errors.ECODE_INVAL)
245
      else:
246
        raise errors.OpPrereqError("The master has a secondary ip but the"
247
                                   " new node doesn't have one",
248
                                   errors.ECODE_INVAL)
249

    
250
    # checks reachability
251
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
252
      raise errors.OpPrereqError("Node not reachable by ping",
253
                                 errors.ECODE_ENVIRON)
254

    
255
    if not newbie_singlehomed:
256
      # check reachability from my secondary ip to newbie's secondary ip
257
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
258
                              source=myself.secondary_ip):
259
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
260
                                   " based ping to node daemon port",
261
                                   errors.ECODE_ENVIRON)
262

    
263
    if self.op.readd:
264
      exceptions = [existing_node_info.uuid]
265
    else:
266
      exceptions = []
267

    
268
    if self.op.master_capable:
269
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
270
    else:
271
      self.master_candidate = False
272

    
273
    if self.op.readd:
274
      self.new_node = existing_node_info
275
    else:
276
      node_group = self.cfg.LookupNodeGroup(self.op.group)
277
      self.new_node = objects.Node(name=node_name,
278
                                   primary_ip=self.op.primary_ip,
279
                                   secondary_ip=secondary_ip,
280
                                   master_candidate=self.master_candidate,
281
                                   offline=False, drained=False,
282
                                   group=node_group, ndparams={})
283

    
284
    if self.op.ndparams:
285
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
286
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
287
                           "node", "cluster or group")
288

    
289
    if self.op.hv_state:
290
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
291

    
292
    if self.op.disk_state:
293
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
294

    
295
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
296
    #       it a property on the base class.
297
    rpcrunner = rpc.DnsOnlyRunner()
298
    result = rpcrunner.call_version([node_name])[node_name]
299
    result.Raise("Can't get version information from node %s" % node_name)
300
    if constants.PROTOCOL_VERSION == result.payload:
301
      logging.info("Communication to node %s fine, sw version %s match",
302
                   node_name, result.payload)
303
    else:
304
      raise errors.OpPrereqError("Version mismatch master version %s,"
305
                                 " node version %s" %
306
                                 (constants.PROTOCOL_VERSION, result.payload),
307
                                 errors.ECODE_ENVIRON)
308

    
309
    vg_name = self.cfg.GetVGName()
310
    if vg_name is not None:
311
      vparams = {constants.NV_PVLIST: [vg_name]}
312
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
313
      cname = self.cfg.GetClusterName()
314
      result = rpcrunner.call_node_verify_light(
315
          [node_name], vparams, cname,
316
          self.cfg.GetClusterInfo().hvparams)[node_name]
317
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318
      if errmsgs:
319
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
321

    
322
  def Exec(self, feedback_fn):
323
    """Adds the new node to the cluster.
324

325
    """
326
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
327
      "Not owning BGL"
328

    
329
    # We adding a new node so we assume it's powered
330
    self.new_node.powered = True
331

    
332
    # for re-adds, reset the offline/drained/master-candidate flags;
333
    # we need to reset here, otherwise offline would prevent RPC calls
334
    # later in the procedure; this also means that if the re-add
335
    # fails, we are left with a non-offlined, broken node
336
    if self.op.readd:
337
      self.new_node.drained = False
338
      self.LogInfo("Readding a node, the offline/drained flags were reset")
339
      # if we demote the node, we do cleanup later in the procedure
340
      self.new_node.master_candidate = self.master_candidate
341
      if self.changed_primary_ip:
342
        self.new_node.primary_ip = self.op.primary_ip
343

    
344
    # copy the master/vm_capable flags
345
    for attr in self._NFLAGS:
346
      setattr(self.new_node, attr, getattr(self.op, attr))
347

    
348
    # notify the user about any possible mc promotion
349
    if self.new_node.master_candidate:
350
      self.LogInfo("Node will be a master candidate")
351

    
352
    if self.op.ndparams:
353
      self.new_node.ndparams = self.op.ndparams
354
    else:
355
      self.new_node.ndparams = {}
356

    
357
    if self.op.hv_state:
358
      self.new_node.hv_state_static = self.new_hv_state
359

    
360
    if self.op.disk_state:
361
      self.new_node.disk_state_static = self.new_disk_state
362

    
363
    # Add node to our /etc/hosts, and add key to known_hosts
364
    if self.cfg.GetClusterInfo().modify_etc_hosts:
365
      master_node = self.cfg.GetMasterNode()
366
      result = self.rpc.call_etc_hosts_modify(
367
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
368
                 self.hostname.ip)
369
      result.Raise("Can't update hosts file with new host data")
370

    
371
    if self.new_node.secondary_ip != self.new_node.primary_ip:
372
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
373
                               False)
374

    
375
    node_verifier_uuids = [self.cfg.GetMasterNode()]
376
    node_verify_param = {
377
      constants.NV_NODELIST: ([self.new_node.name], {}),
378
      # TODO: do a node-net-test as well?
379
    }
380

    
381
    result = self.rpc.call_node_verify(
382
               node_verifier_uuids, node_verify_param,
383
               self.cfg.GetClusterName(),
384
               self.cfg.GetClusterInfo().hvparams)
385
    for verifier in node_verifier_uuids:
386
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
387
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
388
      if nl_payload:
389
        for failed in nl_payload:
390
          feedback_fn("ssh/hostname verification failed"
391
                      " (checking from %s): %s" %
392
                      (verifier, nl_payload[failed]))
393
        raise errors.OpExecError("ssh/hostname verification failed")
394

    
395
    # OpenvSwitch initialization on the node
396
    ovs = self.new_node.ndparams.get(constants.ND_OVS, None)
397
    ovs_name = self.new_node.ndparams.get(constants.ND_OVS_NAME, None)
398
    ovs_link = self.new_node.ndparams.get(constants.ND_OVS_LINK, None)
399

    
400
    if ovs:
401
      result = self.rpc.call_node_configure_ovs(
402
                 self.new_node.name, ovs_name, ovs_link)
403

    
404
    if self.op.readd:
405
      self.context.ReaddNode(self.new_node)
406
      RedistributeAncillaryFiles(self)
407
      # make sure we redistribute the config
408
      self.cfg.Update(self.new_node, feedback_fn)
409
      # and make sure the new node will not have old files around
410
      if not self.new_node.master_candidate:
411
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
412
        result.Warn("Node failed to demote itself from master candidate status",
413
                    self.LogWarning)
414
    else:
415
      self.context.AddNode(self.new_node, self.proc.GetECId())
416
      RedistributeAncillaryFiles(self)
417

    
418

    
419
class LUNodeSetParams(LogicalUnit):
420
  """Modifies the parameters of a node.
421

422
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
423
      to the node role (as _ROLE_*)
424
  @cvar _R2F: a dictionary from node role to tuples of flags
425
  @cvar _FLAGS: a list of attribute names corresponding to the flags
426

427
  """
428
  HPATH = "node-modify"
429
  HTYPE = constants.HTYPE_NODE
430
  REQ_BGL = False
431
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
432
  _F2R = {
433
    (True, False, False): _ROLE_CANDIDATE,
434
    (False, True, False): _ROLE_DRAINED,
435
    (False, False, True): _ROLE_OFFLINE,
436
    (False, False, False): _ROLE_REGULAR,
437
    }
438
  _R2F = dict((v, k) for k, v in _F2R.items())
439
  _FLAGS = ["master_candidate", "drained", "offline"]
440

    
441
  def CheckArguments(self):
442
    (self.op.node_uuid, self.op.node_name) = \
443
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
444
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
445
                self.op.master_capable, self.op.vm_capable,
446
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
447
                self.op.disk_state]
448
    if all_mods.count(None) == len(all_mods):
449
      raise errors.OpPrereqError("Please pass at least one modification",
450
                                 errors.ECODE_INVAL)
451
    if all_mods.count(True) > 1:
452
      raise errors.OpPrereqError("Can't set the node into more than one"
453
                                 " state at the same time",
454
                                 errors.ECODE_INVAL)
455

    
456
    # Boolean value that tells us whether we might be demoting from MC
457
    self.might_demote = (self.op.master_candidate is False or
458
                         self.op.offline is True or
459
                         self.op.drained is True or
460
                         self.op.master_capable is False)
461

    
462
    if self.op.secondary_ip:
463
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
464
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
465
                                   " address" % self.op.secondary_ip,
466
                                   errors.ECODE_INVAL)
467

    
468
    self.lock_all = self.op.auto_promote and self.might_demote
469
    self.lock_instances = self.op.secondary_ip is not None
470

    
471
  def _InstanceFilter(self, instance):
472
    """Filter for getting affected instances.
473

474
    """
475
    return (instance.disk_template in constants.DTS_INT_MIRROR and
476
            self.op.node_uuid in instance.all_nodes)
477

    
478
  def ExpandNames(self):
479
    if self.lock_all:
480
      self.needed_locks = {
481
        locking.LEVEL_NODE: locking.ALL_SET,
482

    
483
        # Block allocations when all nodes are locked
484
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
485
        }
486
    else:
487
      self.needed_locks = {
488
        locking.LEVEL_NODE: self.op.node_uuid,
489
        }
490

    
491
    # Since modifying a node can have severe effects on currently running
492
    # operations the resource lock is at least acquired in shared mode
493
    self.needed_locks[locking.LEVEL_NODE_RES] = \
494
      self.needed_locks[locking.LEVEL_NODE]
495

    
496
    # Get all locks except nodes in shared mode; they are not used for anything
497
    # but read-only access
498
    self.share_locks = ShareAll()
499
    self.share_locks[locking.LEVEL_NODE] = 0
500
    self.share_locks[locking.LEVEL_NODE_RES] = 0
501
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
502

    
503
    if self.lock_instances:
504
      self.needed_locks[locking.LEVEL_INSTANCE] = \
505
        self.cfg.GetInstanceNames(
506
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
507

    
508
  def BuildHooksEnv(self):
509
    """Build hooks env.
510

511
    This runs on the master node.
512

513
    """
514
    return {
515
      "OP_TARGET": self.op.node_name,
516
      "MASTER_CANDIDATE": str(self.op.master_candidate),
517
      "OFFLINE": str(self.op.offline),
518
      "DRAINED": str(self.op.drained),
519
      "MASTER_CAPABLE": str(self.op.master_capable),
520
      "VM_CAPABLE": str(self.op.vm_capable),
521
      }
522

    
523
  def BuildHooksNodes(self):
524
    """Build hooks nodes.
525

526
    """
527
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
528
    return (nl, nl)
529

    
530
  def CheckPrereq(self):
531
    """Check prerequisites.
532

533
    This only checks the instance list against the existing names.
534

535
    """
536
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
537
    if self.lock_instances:
538
      affected_instances = \
539
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
540

    
541
      # Verify instance locks
542
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
543
      wanted_instance_names = frozenset([inst.name for inst in
544
                                         affected_instances.values()])
545
      if wanted_instance_names - owned_instance_names:
546
        raise errors.OpPrereqError("Instances affected by changing node %s's"
547
                                   " secondary IP address have changed since"
548
                                   " locks were acquired, wanted '%s', have"
549
                                   " '%s'; retry the operation" %
550
                                   (node.name,
551
                                    utils.CommaJoin(wanted_instance_names),
552
                                    utils.CommaJoin(owned_instance_names)),
553
                                   errors.ECODE_STATE)
554
    else:
555
      affected_instances = None
556

    
557
    if (self.op.master_candidate is not None or
558
        self.op.drained is not None or
559
        self.op.offline is not None):
560
      # we can't change the master's node flags
561
      if node.uuid == self.cfg.GetMasterNode():
562
        raise errors.OpPrereqError("The master role can be changed"
563
                                   " only via master-failover",
564
                                   errors.ECODE_INVAL)
565

    
566
    if self.op.master_candidate and not node.master_capable:
567
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
568
                                 " it a master candidate" % node.name,
569
                                 errors.ECODE_STATE)
570

    
571
    if self.op.vm_capable is False:
572
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
573
      if ipri or isec:
574
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
575
                                   " the vm_capable flag" % node.name,
576
                                   errors.ECODE_STATE)
577

    
578
    if node.master_candidate and self.might_demote and not self.lock_all:
579
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
580
      # check if after removing the current node, we're missing master
581
      # candidates
582
      (mc_remaining, mc_should, _) = \
583
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
584
      if mc_remaining < mc_should:
585
        raise errors.OpPrereqError("Not enough master candidates, please"
586
                                   " pass auto promote option to allow"
587
                                   " promotion (--auto-promote or RAPI"
588
                                   " auto_promote=True)", errors.ECODE_STATE)
589

    
590
    self.old_flags = old_flags = (node.master_candidate,
591
                                  node.drained, node.offline)
592
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
593
    self.old_role = old_role = self._F2R[old_flags]
594

    
595
    # Check for ineffective changes
596
    for attr in self._FLAGS:
597
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
598
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
599
        setattr(self.op, attr, None)
600

    
601
    # Past this point, any flag change to False means a transition
602
    # away from the respective state, as only real changes are kept
603

    
604
    # TODO: We might query the real power state if it supports OOB
605
    if SupportsOob(self.cfg, node):
606
      if self.op.offline is False and not (node.powered or
607
                                           self.op.powered is True):
608
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
609
                                    " offline status can be reset") %
610
                                   self.op.node_name, errors.ECODE_STATE)
611
    elif self.op.powered is not None:
612
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
613
                                  " as it does not support out-of-band"
614
                                  " handling") % self.op.node_name,
615
                                 errors.ECODE_STATE)
616

    
617
    # If we're being deofflined/drained, we'll MC ourself if needed
618
    if (self.op.drained is False or self.op.offline is False or
619
        (self.op.master_capable and not node.master_capable)):
620
      if _DecideSelfPromotion(self):
621
        self.op.master_candidate = True
622
        self.LogInfo("Auto-promoting node to master candidate")
623

    
624
    # If we're no longer master capable, we'll demote ourselves from MC
625
    if self.op.master_capable is False and node.master_candidate:
626
      self.LogInfo("Demoting from master candidate")
627
      self.op.master_candidate = False
628

    
629
    # Compute new role
630
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
631
    if self.op.master_candidate:
632
      new_role = self._ROLE_CANDIDATE
633
    elif self.op.drained:
634
      new_role = self._ROLE_DRAINED
635
    elif self.op.offline:
636
      new_role = self._ROLE_OFFLINE
637
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
638
      # False is still in new flags, which means we're un-setting (the
639
      # only) True flag
640
      new_role = self._ROLE_REGULAR
641
    else: # no new flags, nothing, keep old role
642
      new_role = old_role
643

    
644
    self.new_role = new_role
645

    
646
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
647
      # Trying to transition out of offline status
648
      result = self.rpc.call_version([node.uuid])[node.uuid]
649
      if result.fail_msg:
650
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
651
                                   " to report its version: %s" %
652
                                   (node.name, result.fail_msg),
653
                                   errors.ECODE_STATE)
654
      else:
655
        self.LogWarning("Transitioning node from offline to online state"
656
                        " without using re-add. Please make sure the node"
657
                        " is healthy!")
658

    
659
    # When changing the secondary ip, verify if this is a single-homed to
660
    # multi-homed transition or vice versa, and apply the relevant
661
    # restrictions.
662
    if self.op.secondary_ip:
663
      # Ok even without locking, because this can't be changed by any LU
664
      master = self.cfg.GetMasterNodeInfo()
665
      master_singlehomed = master.secondary_ip == master.primary_ip
666
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
667
        if self.op.force and node.uuid == master.uuid:
668
          self.LogWarning("Transitioning from single-homed to multi-homed"
669
                          " cluster; all nodes will require a secondary IP"
670
                          " address")
671
        else:
672
          raise errors.OpPrereqError("Changing the secondary ip on a"
673
                                     " single-homed cluster requires the"
674
                                     " --force option to be passed, and the"
675
                                     " target node to be the master",
676
                                     errors.ECODE_INVAL)
677
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
678
        if self.op.force and node.uuid == master.uuid:
679
          self.LogWarning("Transitioning from multi-homed to single-homed"
680
                          " cluster; secondary IP addresses will have to be"
681
                          " removed")
682
        else:
683
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
684
                                     " same as the primary IP on a multi-homed"
685
                                     " cluster, unless the --force option is"
686
                                     " passed, and the target node is the"
687
                                     " master", errors.ECODE_INVAL)
688

    
689
      assert not (set([inst.name for inst in affected_instances.values()]) -
690
                  self.owned_locks(locking.LEVEL_INSTANCE))
691

    
692
      if node.offline:
693
        if affected_instances:
694
          msg = ("Cannot change secondary IP address: offline node has"
695
                 " instances (%s) configured to use it" %
696
                 utils.CommaJoin(
697
                   [inst.name for inst in affected_instances.values()]))
698
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
699
      else:
700
        # On online nodes, check that no instances are running, and that
701
        # the node has the new ip and we can reach it.
702
        for instance in affected_instances.values():
703
          CheckInstanceState(self, instance, INSTANCE_DOWN,
704
                             msg="cannot change secondary ip")
705

    
706
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
707
        if master.uuid != node.uuid:
708
          # check reachability from master secondary ip to new secondary ip
709
          if not netutils.TcpPing(self.op.secondary_ip,
710
                                  constants.DEFAULT_NODED_PORT,
711
                                  source=master.secondary_ip):
712
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
713
                                       " based ping to node daemon port",
714
                                       errors.ECODE_ENVIRON)
715

    
716
    if self.op.ndparams:
717
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
718
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
719
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
720
                           "node", "cluster or group")
721
      self.new_ndparams = new_ndparams
722

    
723
    if self.op.hv_state:
724
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
725
                                                node.hv_state_static)
726

    
727
    if self.op.disk_state:
728
      self.new_disk_state = \
729
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
730

    
731
  def Exec(self, feedback_fn):
732
    """Modifies a node.
733

734
    """
735
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
736
    result = []
737

    
738
    if self.op.ndparams:
739
      node.ndparams = self.new_ndparams
740

    
741
    if self.op.powered is not None:
742
      node.powered = self.op.powered
743

    
744
    if self.op.hv_state:
745
      node.hv_state_static = self.new_hv_state
746

    
747
    if self.op.disk_state:
748
      node.disk_state_static = self.new_disk_state
749

    
750
    for attr in ["master_capable", "vm_capable"]:
751
      val = getattr(self.op, attr)
752
      if val is not None:
753
        setattr(node, attr, val)
754
        result.append((attr, str(val)))
755

    
756
    if self.new_role != self.old_role:
757
      # Tell the node to demote itself, if no longer MC and not offline
758
      if self.old_role == self._ROLE_CANDIDATE and \
759
          self.new_role != self._ROLE_OFFLINE:
760
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
761
        if msg:
762
          self.LogWarning("Node failed to demote itself: %s", msg)
763

    
764
      new_flags = self._R2F[self.new_role]
765
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
766
        if of != nf:
767
          result.append((desc, str(nf)))
768
      (node.master_candidate, node.drained, node.offline) = new_flags
769

    
770
      # we locked all nodes, we adjust the CP before updating this node
771
      if self.lock_all:
772
        AdjustCandidatePool(self, [node.uuid])
773

    
774
    if self.op.secondary_ip:
775
      node.secondary_ip = self.op.secondary_ip
776
      result.append(("secondary_ip", self.op.secondary_ip))
777

    
778
    # this will trigger configuration file update, if needed
779
    self.cfg.Update(node, feedback_fn)
780

    
781
    # this will trigger job queue propagation or cleanup if the mc
782
    # flag changed
783
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
784
      self.context.ReaddNode(node)
785

    
786
    return result
787

    
788

    
789
class LUNodePowercycle(NoHooksLU):
790
  """Powercycles a node.
791

792
  """
793
  REQ_BGL = False
794

    
795
  def CheckArguments(self):
796
    (self.op.node_uuid, self.op.node_name) = \
797
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
798

    
799
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
800
      raise errors.OpPrereqError("The node is the master and the force"
801
                                 " parameter was not set",
802
                                 errors.ECODE_INVAL)
803

    
804
  def ExpandNames(self):
805
    """Locking for PowercycleNode.
806

807
    This is a last-resort option and shouldn't block on other
808
    jobs. Therefore, we grab no locks.
809

810
    """
811
    self.needed_locks = {}
812

    
813
  def Exec(self, feedback_fn):
814
    """Reboots a node.
815

816
    """
817
    default_hypervisor = self.cfg.GetHypervisorType()
818
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
819
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
820
                                           default_hypervisor,
821
                                           hvparams)
822
    result.Raise("Failed to schedule the reboot")
823
    return result.payload
824

    
825

    
826
def _GetNodeInstancesInner(cfg, fn):
827
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
828

    
829

    
830
def _GetNodePrimaryInstances(cfg, node_uuid):
831
  """Returns primary instances on a node.
832

833
  """
834
  return _GetNodeInstancesInner(cfg,
835
                                lambda inst: node_uuid == inst.primary_node)
836

    
837

    
838
def _GetNodeSecondaryInstances(cfg, node_uuid):
839
  """Returns secondary instances on a node.
840

841
  """
842
  return _GetNodeInstancesInner(cfg,
843
                                lambda inst: node_uuid in inst.secondary_nodes)
844

    
845

    
846
def _GetNodeInstances(cfg, node_uuid):
847
  """Returns a list of all primary and secondary instances on a node.
848

849
  """
850

    
851
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
852

    
853

    
854
class LUNodeEvacuate(NoHooksLU):
855
  """Evacuates instances off a list of nodes.
856

857
  """
858
  REQ_BGL = False
859

    
860
  _MODE2IALLOCATOR = {
861
    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
862
    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
863
    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
864
    }
865
  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
866
  assert (frozenset(_MODE2IALLOCATOR.values()) ==
867
          constants.IALLOCATOR_NEVAC_MODES)
868

    
869
  def CheckArguments(self):
870
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
871

    
872
  def ExpandNames(self):
873
    (self.op.node_uuid, self.op.node_name) = \
874
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
875

    
876
    if self.op.remote_node is not None:
877
      (self.op.remote_node_uuid, self.op.remote_node) = \
878
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
879
                              self.op.remote_node)
880
      assert self.op.remote_node
881

    
882
      if self.op.node_uuid == self.op.remote_node_uuid:
883
        raise errors.OpPrereqError("Can not use evacuated node as a new"
884
                                   " secondary node", errors.ECODE_INVAL)
885

    
886
      if self.op.mode != constants.NODE_EVAC_SEC:
887
        raise errors.OpPrereqError("Without the use of an iallocator only"
888
                                   " secondary instances can be evacuated",
889
                                   errors.ECODE_INVAL)
890

    
891
    # Declare locks
892
    self.share_locks = ShareAll()
893
    self.needed_locks = {
894
      locking.LEVEL_INSTANCE: [],
895
      locking.LEVEL_NODEGROUP: [],
896
      locking.LEVEL_NODE: [],
897
      }
898

    
899
    # Determine nodes (via group) optimistically, needs verification once locks
900
    # have been acquired
901
    self.lock_nodes = self._DetermineNodes()
902

    
903
  def _DetermineNodes(self):
904
    """Gets the list of node UUIDs to operate on.
905

906
    """
907
    if self.op.remote_node is None:
908
      # Iallocator will choose any node(s) in the same group
909
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
910
    else:
911
      group_nodes = frozenset([self.op.remote_node_uuid])
912

    
913
    # Determine nodes to be locked
914
    return set([self.op.node_uuid]) | group_nodes
915

    
916
  def _DetermineInstances(self):
917
    """Builds list of instances to operate on.
918

919
    """
920
    assert self.op.mode in constants.NODE_EVAC_MODES
921

    
922
    if self.op.mode == constants.NODE_EVAC_PRI:
923
      # Primary instances only
924
      inst_fn = _GetNodePrimaryInstances
925
      assert self.op.remote_node is None, \
926
        "Evacuating primary instances requires iallocator"
927
    elif self.op.mode == constants.NODE_EVAC_SEC:
928
      # Secondary instances only
929
      inst_fn = _GetNodeSecondaryInstances
930
    else:
931
      # All instances
932
      assert self.op.mode == constants.NODE_EVAC_ALL
933
      inst_fn = _GetNodeInstances
934
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
935
      # per instance
936
      raise errors.OpPrereqError("Due to an issue with the iallocator"
937
                                 " interface it is not possible to evacuate"
938
                                 " all instances at once; specify explicitly"
939
                                 " whether to evacuate primary or secondary"
940
                                 " instances",
941
                                 errors.ECODE_INVAL)
942

    
943
    return inst_fn(self.cfg, self.op.node_uuid)
944

    
945
  def DeclareLocks(self, level):
946
    if level == locking.LEVEL_INSTANCE:
947
      # Lock instances optimistically, needs verification once node and group
948
      # locks have been acquired
949
      self.needed_locks[locking.LEVEL_INSTANCE] = \
950
        set(i.name for i in self._DetermineInstances())
951

    
952
    elif level == locking.LEVEL_NODEGROUP:
953
      # Lock node groups for all potential target nodes optimistically, needs
954
      # verification once nodes have been acquired
955
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
956
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
957

    
958
    elif level == locking.LEVEL_NODE:
959
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
960

    
961
  def CheckPrereq(self):
962
    # Verify locks
963
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
964
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
965
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
966

    
967
    need_nodes = self._DetermineNodes()
968

    
969
    if not owned_nodes.issuperset(need_nodes):
970
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
971
                                 " locks were acquired, current nodes are"
972
                                 " are '%s', used to be '%s'; retry the"
973
                                 " operation" %
974
                                 (self.op.node_name,
975
                                  utils.CommaJoin(need_nodes),
976
                                  utils.CommaJoin(owned_nodes)),
977
                                 errors.ECODE_STATE)
978

    
979
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
980
    if owned_groups != wanted_groups:
981
      raise errors.OpExecError("Node groups changed since locks were acquired,"
982
                               " current groups are '%s', used to be '%s';"
983
                               " retry the operation" %
984
                               (utils.CommaJoin(wanted_groups),
985
                                utils.CommaJoin(owned_groups)))
986

    
987
    # Determine affected instances
988
    self.instances = self._DetermineInstances()
989
    self.instance_names = [i.name for i in self.instances]
990

    
991
    if set(self.instance_names) != owned_instance_names:
992
      raise errors.OpExecError("Instances on node '%s' changed since locks"
993
                               " were acquired, current instances are '%s',"
994
                               " used to be '%s'; retry the operation" %
995
                               (self.op.node_name,
996
                                utils.CommaJoin(self.instance_names),
997
                                utils.CommaJoin(owned_instance_names)))
998

    
999
    if self.instance_names:
1000
      self.LogInfo("Evacuating instances from node '%s': %s",
1001
                   self.op.node_name,
1002
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1003
    else:
1004
      self.LogInfo("No instances to evacuate from node '%s'",
1005
                   self.op.node_name)
1006

    
1007
    if self.op.remote_node is not None:
1008
      for i in self.instances:
1009
        if i.primary_node == self.op.remote_node_uuid:
1010
          raise errors.OpPrereqError("Node %s is the primary node of"
1011
                                     " instance %s, cannot use it as"
1012
                                     " secondary" %
1013
                                     (self.op.remote_node, i.name),
1014
                                     errors.ECODE_INVAL)
1015

    
1016
  def Exec(self, feedback_fn):
1017
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1018

    
1019
    if not self.instance_names:
1020
      # No instances to evacuate
1021
      jobs = []
1022

    
1023
    elif self.op.iallocator is not None:
1024
      # TODO: Implement relocation to other group
1025
      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1026
      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1027
                                     instances=list(self.instance_names))
1028
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1029

    
1030
      ial.Run(self.op.iallocator)
1031

    
1032
      if not ial.success:
1033
        raise errors.OpPrereqError("Can't compute node evacuation using"
1034
                                   " iallocator '%s': %s" %
1035
                                   (self.op.iallocator, ial.info),
1036
                                   errors.ECODE_NORES)
1037

    
1038
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1039

    
1040
    elif self.op.remote_node is not None:
1041
      assert self.op.mode == constants.NODE_EVAC_SEC
1042
      jobs = [
1043
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1044
                                        remote_node=self.op.remote_node,
1045
                                        disks=[],
1046
                                        mode=constants.REPLACE_DISK_CHG,
1047
                                        early_release=self.op.early_release)]
1048
        for instance_name in self.instance_names]
1049

    
1050
    else:
1051
      raise errors.ProgrammerError("No iallocator or remote node")
1052

    
1053
    return ResultWithJobs(jobs)
1054

    
1055

    
1056
class LUNodeMigrate(LogicalUnit):
1057
  """Migrate all instances from a node.
1058

1059
  """
1060
  HPATH = "node-migrate"
1061
  HTYPE = constants.HTYPE_NODE
1062
  REQ_BGL = False
1063

    
1064
  def CheckArguments(self):
1065
    pass
1066

    
1067
  def ExpandNames(self):
1068
    (self.op.node_uuid, self.op.node_name) = \
1069
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1070

    
1071
    self.share_locks = ShareAll()
1072
    self.needed_locks = {
1073
      locking.LEVEL_NODE: [self.op.node_uuid],
1074
      }
1075

    
1076
  def BuildHooksEnv(self):
1077
    """Build hooks env.
1078

1079
    This runs on the master, the primary and all the secondaries.
1080

1081
    """
1082
    return {
1083
      "NODE_NAME": self.op.node_name,
1084
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1085
      }
1086

    
1087
  def BuildHooksNodes(self):
1088
    """Build hooks nodes.
1089

1090
    """
1091
    nl = [self.cfg.GetMasterNode()]
1092
    return (nl, nl)
1093

    
1094
  def CheckPrereq(self):
1095
    pass
1096

    
1097
  def Exec(self, feedback_fn):
1098
    # Prepare jobs for migration instances
1099
    jobs = [
1100
      [opcodes.OpInstanceMigrate(
1101
        instance_name=inst.name,
1102
        mode=self.op.mode,
1103
        live=self.op.live,
1104
        iallocator=self.op.iallocator,
1105
        target_node=self.op.target_node,
1106
        allow_runtime_changes=self.op.allow_runtime_changes,
1107
        ignore_ipolicy=self.op.ignore_ipolicy)]
1108
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1109

    
1110
    # TODO: Run iallocator in this opcode and pass correct placement options to
1111
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1112
    # running the iallocator and the actual migration, a good consistency model
1113
    # will have to be found.
1114

    
1115
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1116
            frozenset([self.op.node_uuid]))
1117

    
1118
    return ResultWithJobs(jobs)
1119

    
1120

    
1121
def _GetStorageTypeArgs(cfg, storage_type):
1122
  """Returns the arguments for a storage type.
1123

1124
  """
1125
  # Special case for file storage
1126
  if storage_type == constants.ST_FILE:
1127
    # storage.FileStorage wants a list of storage directories
1128
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1129

    
1130
  return []
1131

    
1132

    
1133
class LUNodeModifyStorage(NoHooksLU):
1134
  """Logical unit for modifying a storage volume on a node.
1135

1136
  """
1137
  REQ_BGL = False
1138

    
1139
  def CheckArguments(self):
1140
    (self.op.node_uuid, self.op.node_name) = \
1141
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1142

    
1143
    storage_type = self.op.storage_type
1144

    
1145
    try:
1146
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1147
    except KeyError:
1148
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1149
                                 " modified" % storage_type,
1150
                                 errors.ECODE_INVAL)
1151

    
1152
    diff = set(self.op.changes.keys()) - modifiable
1153
    if diff:
1154
      raise errors.OpPrereqError("The following fields can not be modified for"
1155
                                 " storage units of type '%s': %r" %
1156
                                 (storage_type, list(diff)),
1157
                                 errors.ECODE_INVAL)
1158

    
1159
  def CheckPrereq(self):
1160
    """Check prerequisites.
1161

1162
    """
1163
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1164

    
1165
  def ExpandNames(self):
1166
    self.needed_locks = {
1167
      locking.LEVEL_NODE: self.op.node_uuid,
1168
      }
1169

    
1170
  def Exec(self, feedback_fn):
1171
    """Computes the list of nodes and their attributes.
1172

1173
    """
1174
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1175
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1176
                                          self.op.storage_type, st_args,
1177
                                          self.op.name, self.op.changes)
1178
    result.Raise("Failed to modify storage unit '%s' on %s" %
1179
                 (self.op.name, self.op.node_name))
1180

    
1181

    
1182
class NodeQuery(QueryBase):
1183
  FIELDS = query.NODE_FIELDS
1184

    
1185
  def ExpandNames(self, lu):
1186
    lu.needed_locks = {}
1187
    lu.share_locks = ShareAll()
1188

    
1189
    if self.names:
1190
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1191
    else:
1192
      self.wanted = locking.ALL_SET
1193

    
1194
    self.do_locking = (self.use_locking and
1195
                       query.NQ_LIVE in self.requested_data)
1196

    
1197
    if self.do_locking:
1198
      # If any non-static field is requested we need to lock the nodes
1199
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1200
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1201

    
1202
  def DeclareLocks(self, lu, level):
1203
    pass
1204

    
1205
  def _GetQueryData(self, lu):
1206
    """Computes the list of nodes and their attributes.
1207

1208
    """
1209
    all_info = lu.cfg.GetAllNodesInfo()
1210

    
1211
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1212

    
1213
    # Gather data as requested
1214
    if query.NQ_LIVE in self.requested_data:
1215
      # filter out non-vm_capable nodes
1216
      toquery_node_uuids = [node.uuid for node in all_info.values()
1217
                            if node.vm_capable and node.uuid in node_uuids]
1218
      lvm_enabled = utils.storage.IsLvmEnabled(
1219
          lu.cfg.GetClusterInfo().enabled_disk_templates)
1220
      # FIXME: this per default asks for storage space information for all
1221
      # enabled disk templates. Fix this by making it possible to specify
1222
      # space report fields for specific disk templates.
1223
      raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1224
          lu.cfg, include_spindles=lvm_enabled)
1225
      storage_units = rpc.PrepareStorageUnitsForNodes(
1226
          lu.cfg, raw_storage_units, toquery_node_uuids)
1227
      default_hypervisor = lu.cfg.GetHypervisorType()
1228
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1229
      hvspecs = [(default_hypervisor, hvparams)]
1230
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1231
                                        hvspecs)
1232
      live_data = dict(
1233
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1234
                                        require_spindles=lvm_enabled))
1235
          for (uuid, nresult) in node_data.items()
1236
          if not nresult.fail_msg and nresult.payload)
1237
    else:
1238
      live_data = None
1239

    
1240
    if query.NQ_INST in self.requested_data:
1241
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1242
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1243

    
1244
      inst_data = lu.cfg.GetAllInstancesInfo()
1245
      inst_uuid_to_inst_name = {}
1246

    
1247
      for inst in inst_data.values():
1248
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1249
        if inst.primary_node in node_to_primary:
1250
          node_to_primary[inst.primary_node].add(inst.uuid)
1251
        for secnode in inst.secondary_nodes:
1252
          if secnode in node_to_secondary:
1253
            node_to_secondary[secnode].add(inst.uuid)
1254
    else:
1255
      node_to_primary = None
1256
      node_to_secondary = None
1257
      inst_uuid_to_inst_name = None
1258

    
1259
    if query.NQ_OOB in self.requested_data:
1260
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1261
                         for uuid, node in all_info.iteritems())
1262
    else:
1263
      oob_support = None
1264

    
1265
    if query.NQ_GROUP in self.requested_data:
1266
      groups = lu.cfg.GetAllNodeGroupsInfo()
1267
    else:
1268
      groups = {}
1269

    
1270
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1271
                               live_data, lu.cfg.GetMasterNode(),
1272
                               node_to_primary, node_to_secondary,
1273
                               inst_uuid_to_inst_name, groups, oob_support,
1274
                               lu.cfg.GetClusterInfo())
1275

    
1276

    
1277
class LUNodeQuery(NoHooksLU):
1278
  """Logical unit for querying nodes.
1279

1280
  """
1281
  # pylint: disable=W0142
1282
  REQ_BGL = False
1283

    
1284
  def CheckArguments(self):
1285
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1286
                         self.op.output_fields, self.op.use_locking)
1287

    
1288
  def ExpandNames(self):
1289
    self.nq.ExpandNames(self)
1290

    
1291
  def DeclareLocks(self, level):
1292
    self.nq.DeclareLocks(self, level)
1293

    
1294
  def Exec(self, feedback_fn):
1295
    return self.nq.OldStyleQuery(self)
1296

    
1297

    
1298
def _CheckOutputFields(fields, selected):
1299
  """Checks whether all selected fields are valid according to fields.
1300

1301
  @type fields: L{utils.FieldSet}
1302
  @param fields: fields set
1303
  @type selected: L{utils.FieldSet}
1304
  @param selected: fields set
1305

1306
  """
1307
  delta = fields.NonMatching(selected)
1308
  if delta:
1309
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1310
                               % ",".join(delta), errors.ECODE_INVAL)
1311

    
1312

    
1313
class LUNodeQueryvols(NoHooksLU):
1314
  """Logical unit for getting volumes on node(s).
1315

1316
  """
1317
  REQ_BGL = False
1318

    
1319
  def CheckArguments(self):
1320
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1321
                                      constants.VF_VG, constants.VF_NAME,
1322
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1323
                       self.op.output_fields)
1324

    
1325
  def ExpandNames(self):
1326
    self.share_locks = ShareAll()
1327

    
1328
    if self.op.nodes:
1329
      self.needed_locks = {
1330
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1331
        }
1332
    else:
1333
      self.needed_locks = {
1334
        locking.LEVEL_NODE: locking.ALL_SET,
1335
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1336
        }
1337

    
1338
  def Exec(self, feedback_fn):
1339
    """Computes the list of nodes and their attributes.
1340

1341
    """
1342
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1343
    volumes = self.rpc.call_node_volumes(node_uuids)
1344

    
1345
    ilist = self.cfg.GetAllInstancesInfo()
1346
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1347

    
1348
    output = []
1349
    for node_uuid in node_uuids:
1350
      nresult = volumes[node_uuid]
1351
      if nresult.offline:
1352
        continue
1353
      msg = nresult.fail_msg
1354
      if msg:
1355
        self.LogWarning("Can't compute volume data on node %s: %s",
1356
                        self.cfg.GetNodeName(node_uuid), msg)
1357
        continue
1358

    
1359
      node_vols = sorted(nresult.payload,
1360
                         key=operator.itemgetter(constants.VF_DEV))
1361

    
1362
      for vol in node_vols:
1363
        node_output = []
1364
        for field in self.op.output_fields:
1365
          if field == constants.VF_NODE:
1366
            val = self.cfg.GetNodeName(node_uuid)
1367
          elif field == constants.VF_PHYS:
1368
            val = vol[constants.VF_DEV]
1369
          elif field == constants.VF_VG:
1370
            val = vol[constants.VF_VG]
1371
          elif field == constants.VF_NAME:
1372
            val = vol[constants.VF_NAME]
1373
          elif field == constants.VF_SIZE:
1374
            val = int(float(vol[constants.VF_SIZE]))
1375
          elif field == constants.VF_INSTANCE:
1376
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1377
                                 vol[constants.VF_NAME]), None)
1378
            if inst is not None:
1379
              val = inst.name
1380
            else:
1381
              val = "-"
1382
          else:
1383
            raise errors.ParameterError(field)
1384
          node_output.append(str(val))
1385

    
1386
        output.append(node_output)
1387

    
1388
    return output
1389

    
1390

    
1391
class LUNodeQueryStorage(NoHooksLU):
1392
  """Logical unit for getting information on storage units on node(s).
1393

1394
  """
1395
  REQ_BGL = False
1396

    
1397
  def CheckArguments(self):
1398
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1399
                       self.op.output_fields)
1400

    
1401
  def ExpandNames(self):
1402
    self.share_locks = ShareAll()
1403

    
1404
    if self.op.nodes:
1405
      self.needed_locks = {
1406
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1407
        }
1408
    else:
1409
      self.needed_locks = {
1410
        locking.LEVEL_NODE: locking.ALL_SET,
1411
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1412
        }
1413

    
1414
  def CheckPrereq(self):
1415
    """Check prerequisites.
1416

1417
    """
1418
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1419

    
1420
  def Exec(self, feedback_fn):
1421
    """Computes the list of nodes and their attributes.
1422

1423
    """
1424
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1425

    
1426
    # Always get name to sort by
1427
    if constants.SF_NAME in self.op.output_fields:
1428
      fields = self.op.output_fields[:]
1429
    else:
1430
      fields = [constants.SF_NAME] + self.op.output_fields
1431

    
1432
    # Never ask for node or type as it's only known to the LU
1433
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1434
      while extra in fields:
1435
        fields.remove(extra)
1436

    
1437
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1438
    name_idx = field_idx[constants.SF_NAME]
1439

    
1440
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1441
    data = self.rpc.call_storage_list(self.node_uuids,
1442
                                      self.op.storage_type, st_args,
1443
                                      self.op.name, fields)
1444

    
1445
    result = []
1446

    
1447
    for node_uuid in utils.NiceSort(self.node_uuids):
1448
      node_name = self.cfg.GetNodeName(node_uuid)
1449
      nresult = data[node_uuid]
1450
      if nresult.offline:
1451
        continue
1452

    
1453
      msg = nresult.fail_msg
1454
      if msg:
1455
        self.LogWarning("Can't get storage data from node %s: %s",
1456
                        node_name, msg)
1457
        continue
1458

    
1459
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1460

    
1461
      for name in utils.NiceSort(rows.keys()):
1462
        row = rows[name]
1463

    
1464
        out = []
1465

    
1466
        for field in self.op.output_fields:
1467
          if field == constants.SF_NODE:
1468
            val = node_name
1469
          elif field == constants.SF_TYPE:
1470
            val = self.op.storage_type
1471
          elif field in field_idx:
1472
            val = row[field_idx[field]]
1473
          else:
1474
            raise errors.ParameterError(field)
1475

    
1476
          out.append(val)
1477

    
1478
        result.append(out)
1479

    
1480
    return result
1481

    
1482

    
1483
class LUNodeRemove(LogicalUnit):
1484
  """Logical unit for removing a node.
1485

1486
  """
1487
  HPATH = "node-remove"
1488
  HTYPE = constants.HTYPE_NODE
1489

    
1490
  def BuildHooksEnv(self):
1491
    """Build hooks env.
1492

1493
    """
1494
    return {
1495
      "OP_TARGET": self.op.node_name,
1496
      "NODE_NAME": self.op.node_name,
1497
      }
1498

    
1499
  def BuildHooksNodes(self):
1500
    """Build hooks nodes.
1501

1502
    This doesn't run on the target node in the pre phase as a failed
1503
    node would then be impossible to remove.
1504

1505
    """
1506
    all_nodes = self.cfg.GetNodeList()
1507
    try:
1508
      all_nodes.remove(self.op.node_uuid)
1509
    except ValueError:
1510
      pass
1511
    return (all_nodes, all_nodes)
1512

    
1513
  def CheckPrereq(self):
1514
    """Check prerequisites.
1515

1516
    This checks:
1517
     - the node exists in the configuration
1518
     - it does not have primary or secondary instances
1519
     - it's not the master
1520

1521
    Any errors are signaled by raising errors.OpPrereqError.
1522

1523
    """
1524
    (self.op.node_uuid, self.op.node_name) = \
1525
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1526
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1527
    assert node is not None
1528

    
1529
    masternode = self.cfg.GetMasterNode()
1530
    if node.uuid == masternode:
1531
      raise errors.OpPrereqError("Node is the master node, failover to another"
1532
                                 " node is required", errors.ECODE_INVAL)
1533

    
1534
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1535
      if node.uuid in instance.all_nodes:
1536
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1537
                                   " please remove first" % instance.name,
1538
                                   errors.ECODE_INVAL)
1539
    self.op.node_name = node.name
1540
    self.node = node
1541

    
1542
  def Exec(self, feedback_fn):
1543
    """Removes the node from the cluster.
1544

1545
    """
1546
    logging.info("Stopping the node daemon and removing configs from node %s",
1547
                 self.node.name)
1548

    
1549
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1550

    
1551
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1552
      "Not owning BGL"
1553

    
1554
    # Promote nodes to master candidate as needed
1555
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1556
    self.context.RemoveNode(self.node)
1557

    
1558
    # Run post hooks on the node before it's removed
1559
    RunPostHook(self, self.node.name)
1560

    
1561
    # we have to call this by name rather than by UUID, as the node is no longer
1562
    # in the config
1563
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1564
    msg = result.fail_msg
1565
    if msg:
1566
      self.LogWarning("Errors encountered on the remote node while leaving"
1567
                      " the cluster: %s", msg)
1568

    
1569
    # Remove node from our /etc/hosts
1570
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1571
      master_node_uuid = self.cfg.GetMasterNode()
1572
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1573
                                              constants.ETC_HOSTS_REMOVE,
1574
                                              self.node.name, None)
1575
      result.Raise("Can't update hosts file with new host data")
1576
      RedistributeAncillaryFiles(self)
1577

    
1578

    
1579
class LURepairNodeStorage(NoHooksLU):
1580
  """Repairs the volume group on a node.
1581

1582
  """
1583
  REQ_BGL = False
1584

    
1585
  def CheckArguments(self):
1586
    (self.op.node_uuid, self.op.node_name) = \
1587
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1588

    
1589
    storage_type = self.op.storage_type
1590

    
1591
    if (constants.SO_FIX_CONSISTENCY not in
1592
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1593
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1594
                                 " repaired" % storage_type,
1595
                                 errors.ECODE_INVAL)
1596

    
1597
  def ExpandNames(self):
1598
    self.needed_locks = {
1599
      locking.LEVEL_NODE: [self.op.node_uuid],
1600
      }
1601

    
1602
  def _CheckFaultyDisks(self, instance, node_uuid):
1603
    """Ensure faulty disks abort the opcode or at least warn."""
1604
    try:
1605
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1606
                                 node_uuid, True):
1607
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1608
                                   " node '%s'" %
1609
                                   (instance.name,
1610
                                    self.cfg.GetNodeName(node_uuid)),
1611
                                   errors.ECODE_STATE)
1612
    except errors.OpPrereqError, err:
1613
      if self.op.ignore_consistency:
1614
        self.LogWarning(str(err.args[0]))
1615
      else:
1616
        raise
1617

    
1618
  def CheckPrereq(self):
1619
    """Check prerequisites.
1620

1621
    """
1622
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1623

    
1624
    # Check whether any instance on this node has faulty disks
1625
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1626
      if not inst.disks_active:
1627
        continue
1628
      check_nodes = set(inst.all_nodes)
1629
      check_nodes.discard(self.op.node_uuid)
1630
      for inst_node_uuid in check_nodes:
1631
        self._CheckFaultyDisks(inst, inst_node_uuid)
1632

    
1633
  def Exec(self, feedback_fn):
1634
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1635
                (self.op.name, self.op.node_name))
1636

    
1637
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1638
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1639
                                           self.op.storage_type, st_args,
1640
                                           self.op.name,
1641
                                           constants.SO_FIX_CONSISTENCY)
1642
    result.Raise("Failed to repair storage unit '%s' on %s" %
1643
                 (self.op.name, self.op.node_name))