Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 884ec6d4

History | View | Annotate | Download (60.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
from ganeti import qlang
34
from ganeti import query
35
from ganeti import rpc
36
from ganeti import utils
37
from ganeti.masterd import iallocator
38

    
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40
  ResultWithJobs
41
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
44
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48
  FindFaultyInstanceDisks, CheckStorageTypeEnabled
49

    
50

    
51
def _DecideSelfPromotion(lu, exceptions=None):
52
  """Decide whether I should promote myself as a master candidate.
53

54
  """
55
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57
  # the new node will increase mc_max with one, so:
58
  mc_should = min(mc_should + 1, cp_size)
59
  return mc_now < mc_should
60

    
61

    
62
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63
  """Ensure that a node has the given secondary ip.
64

65
  @type lu: L{LogicalUnit}
66
  @param lu: the LU on behalf of which we make the check
67
  @type node: L{objects.Node}
68
  @param node: the node to check
69
  @type secondary_ip: string
70
  @param secondary_ip: the ip to check
71
  @type prereq: boolean
72
  @param prereq: whether to throw a prerequisite or an execute error
73
  @raise errors.OpPrereqError: if the node doesn't have the ip,
74
  and prereq=True
75
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76

77
  """
78
  # this can be called with a new node, which has no UUID yet, so perform the
79
  # RPC call using its name
80
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81
  result.Raise("Failure checking secondary ip on node %s" % node.name,
82
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
83
  if not result.payload:
84
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85
           " please fix and re-run this command" % secondary_ip)
86
    if prereq:
87
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88
    else:
89
      raise errors.OpExecError(msg)
90

    
91

    
92
class LUNodeAdd(LogicalUnit):
93
  """Logical unit for adding node to the cluster.
94

95
  """
96
  HPATH = "node-add"
97
  HTYPE = constants.HTYPE_NODE
98
  _NFLAGS = ["master_capable", "vm_capable"]
99

    
100
  def CheckArguments(self):
101
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102
    # validate/normalize the node name
103
    self.hostname = netutils.GetHostname(name=self.op.node_name,
104
                                         family=self.primary_ip_family)
105
    self.op.node_name = self.hostname.name
106

    
107
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108
      raise errors.OpPrereqError("Cannot readd the master node",
109
                                 errors.ECODE_STATE)
110

    
111
    if self.op.readd and self.op.group:
112
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
113
                                 " being readded", errors.ECODE_INVAL)
114

    
115
  def BuildHooksEnv(self):
116
    """Build hooks env.
117

118
    This will run on all nodes before, and on all nodes + the new node after.
119

120
    """
121
    return {
122
      "OP_TARGET": self.op.node_name,
123
      "NODE_NAME": self.op.node_name,
124
      "NODE_PIP": self.op.primary_ip,
125
      "NODE_SIP": self.op.secondary_ip,
126
      "MASTER_CAPABLE": str(self.op.master_capable),
127
      "VM_CAPABLE": str(self.op.vm_capable),
128
      }
129

    
130
  def BuildHooksNodes(self):
131
    """Build hooks nodes.
132

133
    """
134
    hook_nodes = self.cfg.GetNodeList()
135
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136
    if new_node_info is not None:
137
      # Exclude added node
138
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139

    
140
    # add the new node as post hook node by name; it does not have an UUID yet
141
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
142

    
143
  def CheckPrereq(self):
144
    """Check prerequisites.
145

146
    This checks:
147
     - the new node is not already in the config
148
     - it is resolvable
149
     - its parameters (single/dual homed) matches the cluster
150

151
    Any errors are signaled by raising errors.OpPrereqError.
152

153
    """
154
    node_name = self.hostname.name
155
    self.op.primary_ip = self.hostname.ip
156
    if self.op.secondary_ip is None:
157
      if self.primary_ip_family == netutils.IP6Address.family:
158
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159
                                   " IPv4 address must be given as secondary",
160
                                   errors.ECODE_INVAL)
161
      self.op.secondary_ip = self.op.primary_ip
162

    
163
    secondary_ip = self.op.secondary_ip
164
    if not netutils.IP4Address.IsValid(secondary_ip):
165
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166
                                 " address" % secondary_ip, errors.ECODE_INVAL)
167

    
168
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169
    if not self.op.readd and existing_node_info is not None:
170
      raise errors.OpPrereqError("Node %s is already in the configuration" %
171
                                 node_name, errors.ECODE_EXISTS)
172
    elif self.op.readd and existing_node_info is None:
173
      raise errors.OpPrereqError("Node %s is not in the configuration" %
174
                                 node_name, errors.ECODE_NOENT)
175

    
176
    self.changed_primary_ip = False
177

    
178
    for existing_node in self.cfg.GetAllNodesInfo().values():
179
      if self.op.readd and node_name == existing_node.name:
180
        if existing_node.secondary_ip != secondary_ip:
181
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
182
                                     " address configuration as before",
183
                                     errors.ECODE_INVAL)
184
        if existing_node.primary_ip != self.op.primary_ip:
185
          self.changed_primary_ip = True
186

    
187
        continue
188

    
189
      if (existing_node.primary_ip == self.op.primary_ip or
190
          existing_node.secondary_ip == self.op.primary_ip or
191
          existing_node.primary_ip == secondary_ip or
192
          existing_node.secondary_ip == secondary_ip):
193
        raise errors.OpPrereqError("New node ip address(es) conflict with"
194
                                   " existing node %s" % existing_node.name,
195
                                   errors.ECODE_NOTUNIQUE)
196

    
197
    # After this 'if' block, None is no longer a valid value for the
198
    # _capable op attributes
199
    if self.op.readd:
200
      assert existing_node_info is not None, \
201
        "Can't retrieve locked node %s" % node_name
202
      for attr in self._NFLAGS:
203
        if getattr(self.op, attr) is None:
204
          setattr(self.op, attr, getattr(existing_node_info, attr))
205
    else:
206
      for attr in self._NFLAGS:
207
        if getattr(self.op, attr) is None:
208
          setattr(self.op, attr, True)
209

    
210
    if self.op.readd and not self.op.vm_capable:
211
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212
      if pri or sec:
213
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214
                                   " flag set to false, but it already holds"
215
                                   " instances" % node_name,
216
                                   errors.ECODE_STATE)
217

    
218
    # check that the type of the node (single versus dual homed) is the
219
    # same as for the master
220
    myself = self.cfg.GetMasterNodeInfo()
221
    master_singlehomed = myself.secondary_ip == myself.primary_ip
222
    newbie_singlehomed = secondary_ip == self.op.primary_ip
223
    if master_singlehomed != newbie_singlehomed:
224
      if master_singlehomed:
225
        raise errors.OpPrereqError("The master has no secondary ip but the"
226
                                   " new node has one",
227
                                   errors.ECODE_INVAL)
228
      else:
229
        raise errors.OpPrereqError("The master has a secondary ip but the"
230
                                   " new node doesn't have one",
231
                                   errors.ECODE_INVAL)
232

    
233
    # checks reachability
234
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235
      raise errors.OpPrereqError("Node not reachable by ping",
236
                                 errors.ECODE_ENVIRON)
237

    
238
    if not newbie_singlehomed:
239
      # check reachability from my secondary ip to newbie's secondary ip
240
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241
                              source=myself.secondary_ip):
242
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243
                                   " based ping to node daemon port",
244
                                   errors.ECODE_ENVIRON)
245

    
246
    if self.op.readd:
247
      exceptions = [existing_node_info.uuid]
248
    else:
249
      exceptions = []
250

    
251
    if self.op.master_capable:
252
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253
    else:
254
      self.master_candidate = False
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      node_group = self.cfg.LookupNodeGroup(self.op.group)
260
      self.new_node = objects.Node(name=node_name,
261
                                   primary_ip=self.op.primary_ip,
262
                                   secondary_ip=secondary_ip,
263
                                   master_candidate=self.master_candidate,
264
                                   offline=False, drained=False,
265
                                   group=node_group, ndparams={})
266

    
267
    if self.op.ndparams:
268
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270
                           "node", "cluster or group")
271

    
272
    if self.op.hv_state:
273
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274

    
275
    if self.op.disk_state:
276
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277

    
278
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279
    #       it a property on the base class.
280
    rpcrunner = rpc.DnsOnlyRunner()
281
    result = rpcrunner.call_version([node_name])[node_name]
282
    result.Raise("Can't get version information from node %s" % node_name)
283
    if constants.PROTOCOL_VERSION == result.payload:
284
      logging.info("Communication to node %s fine, sw version %s match",
285
                   node_name, result.payload)
286
    else:
287
      raise errors.OpPrereqError("Version mismatch master version %s,"
288
                                 " node version %s" %
289
                                 (constants.PROTOCOL_VERSION, result.payload),
290
                                 errors.ECODE_ENVIRON)
291

    
292
    vg_name = self.cfg.GetVGName()
293
    if vg_name is not None:
294
      vparams = {constants.NV_PVLIST: [vg_name]}
295
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296
      cname = self.cfg.GetClusterName()
297
      result = rpcrunner.call_node_verify_light(
298
          [node_name], vparams, cname,
299
          self.cfg.GetClusterInfo().hvparams)[node_name]
300
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301
      if errmsgs:
302
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
304

    
305
  def _InitOpenVSwitch(self):
306
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
307
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
308

    
309
    ovs = filled_ndparams.get(constants.ND_OVS, None)
310
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
311
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
312

    
313
    if ovs:
314
      if not ovs_link:
315
        self.LogInfo("No physical interface for OpenvSwitch was given."
316
                     " OpenvSwitch will not have an outside connection. This"
317
                     " might not be what you want.")
318

    
319
      result = self.rpc.call_node_configure_ovs(
320
                 self.new_node.name, ovs_name, ovs_link)
321
      result.Raise("Failed to initialize OpenVSwitch on new node")
322

    
323
  def Exec(self, feedback_fn):
324
    """Adds the new node to the cluster.
325

326
    """
327
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
328
      "Not owning BGL"
329

    
330
    # We adding a new node so we assume it's powered
331
    self.new_node.powered = True
332

    
333
    # for re-adds, reset the offline/drained/master-candidate flags;
334
    # we need to reset here, otherwise offline would prevent RPC calls
335
    # later in the procedure; this also means that if the re-add
336
    # fails, we are left with a non-offlined, broken node
337
    if self.op.readd:
338
      self.new_node.offline = False
339
      self.new_node.drained = False
340
      self.LogInfo("Readding a node, the offline/drained flags were reset")
341
      # if we demote the node, we do cleanup later in the procedure
342
      self.new_node.master_candidate = self.master_candidate
343
      if self.changed_primary_ip:
344
        self.new_node.primary_ip = self.op.primary_ip
345

    
346
    # copy the master/vm_capable flags
347
    for attr in self._NFLAGS:
348
      setattr(self.new_node, attr, getattr(self.op, attr))
349

    
350
    # notify the user about any possible mc promotion
351
    if self.new_node.master_candidate:
352
      self.LogInfo("Node will be a master candidate")
353

    
354
    if self.op.ndparams:
355
      self.new_node.ndparams = self.op.ndparams
356
    else:
357
      self.new_node.ndparams = {}
358

    
359
    if self.op.hv_state:
360
      self.new_node.hv_state_static = self.new_hv_state
361

    
362
    if self.op.disk_state:
363
      self.new_node.disk_state_static = self.new_disk_state
364

    
365
    # Add node to our /etc/hosts, and add key to known_hosts
366
    if self.cfg.GetClusterInfo().modify_etc_hosts:
367
      master_node = self.cfg.GetMasterNode()
368
      result = self.rpc.call_etc_hosts_modify(
369
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
370
                 self.hostname.ip)
371
      result.Raise("Can't update hosts file with new host data")
372

    
373
    if self.new_node.secondary_ip != self.new_node.primary_ip:
374
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
375
                               False)
376

    
377
    node_verifier_uuids = [self.cfg.GetMasterNode()]
378
    node_verify_param = {
379
      constants.NV_NODELIST: ([self.new_node.name], {}),
380
      # TODO: do a node-net-test as well?
381
    }
382

    
383
    result = self.rpc.call_node_verify(
384
               node_verifier_uuids, node_verify_param,
385
               self.cfg.GetClusterName(),
386
               self.cfg.GetClusterInfo().hvparams)
387
    for verifier in node_verifier_uuids:
388
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
389
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
390
      if nl_payload:
391
        for failed in nl_payload:
392
          feedback_fn("ssh/hostname verification failed"
393
                      " (checking from %s): %s" %
394
                      (verifier, nl_payload[failed]))
395
        raise errors.OpExecError("ssh/hostname verification failed")
396

    
397
    self._InitOpenVSwitch()
398

    
399
    if self.op.readd:
400
      self.context.ReaddNode(self.new_node)
401
      RedistributeAncillaryFiles(self)
402
      # make sure we redistribute the config
403
      self.cfg.Update(self.new_node, feedback_fn)
404
      # and make sure the new node will not have old files around
405
      if not self.new_node.master_candidate:
406
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
407
        result.Warn("Node failed to demote itself from master candidate status",
408
                    self.LogWarning)
409
    else:
410
      self.context.AddNode(self.new_node, self.proc.GetECId())
411
      RedistributeAncillaryFiles(self)
412

    
413

    
414
class LUNodeSetParams(LogicalUnit):
415
  """Modifies the parameters of a node.
416

417
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
418
      to the node role (as _ROLE_*)
419
  @cvar _R2F: a dictionary from node role to tuples of flags
420
  @cvar _FLAGS: a list of attribute names corresponding to the flags
421

422
  """
423
  HPATH = "node-modify"
424
  HTYPE = constants.HTYPE_NODE
425
  REQ_BGL = False
426
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
427
  _F2R = {
428
    (True, False, False): _ROLE_CANDIDATE,
429
    (False, True, False): _ROLE_DRAINED,
430
    (False, False, True): _ROLE_OFFLINE,
431
    (False, False, False): _ROLE_REGULAR,
432
    }
433
  _R2F = dict((v, k) for k, v in _F2R.items())
434
  _FLAGS = ["master_candidate", "drained", "offline"]
435

    
436
  def CheckArguments(self):
437
    (self.op.node_uuid, self.op.node_name) = \
438
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
439
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
440
                self.op.master_capable, self.op.vm_capable,
441
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
442
                self.op.disk_state]
443
    if all_mods.count(None) == len(all_mods):
444
      raise errors.OpPrereqError("Please pass at least one modification",
445
                                 errors.ECODE_INVAL)
446
    if all_mods.count(True) > 1:
447
      raise errors.OpPrereqError("Can't set the node into more than one"
448
                                 " state at the same time",
449
                                 errors.ECODE_INVAL)
450

    
451
    # Boolean value that tells us whether we might be demoting from MC
452
    self.might_demote = (self.op.master_candidate is False or
453
                         self.op.offline is True or
454
                         self.op.drained is True or
455
                         self.op.master_capable is False)
456

    
457
    if self.op.secondary_ip:
458
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
459
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
460
                                   " address" % self.op.secondary_ip,
461
                                   errors.ECODE_INVAL)
462

    
463
    self.lock_all = self.op.auto_promote and self.might_demote
464
    self.lock_instances = self.op.secondary_ip is not None
465

    
466
  def _InstanceFilter(self, instance):
467
    """Filter for getting affected instances.
468

469
    """
470
    return (instance.disk_template in constants.DTS_INT_MIRROR and
471
            self.op.node_uuid in instance.all_nodes)
472

    
473
  def ExpandNames(self):
474
    if self.lock_all:
475
      self.needed_locks = {
476
        locking.LEVEL_NODE: locking.ALL_SET,
477

    
478
        # Block allocations when all nodes are locked
479
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
480
        }
481
    else:
482
      self.needed_locks = {
483
        locking.LEVEL_NODE: self.op.node_uuid,
484
        }
485

    
486
    # Since modifying a node can have severe effects on currently running
487
    # operations the resource lock is at least acquired in shared mode
488
    self.needed_locks[locking.LEVEL_NODE_RES] = \
489
      self.needed_locks[locking.LEVEL_NODE]
490

    
491
    # Get all locks except nodes in shared mode; they are not used for anything
492
    # but read-only access
493
    self.share_locks = ShareAll()
494
    self.share_locks[locking.LEVEL_NODE] = 0
495
    self.share_locks[locking.LEVEL_NODE_RES] = 0
496
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
497

    
498
    if self.lock_instances:
499
      self.needed_locks[locking.LEVEL_INSTANCE] = \
500
        self.cfg.GetInstanceNames(
501
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
502

    
503
  def BuildHooksEnv(self):
504
    """Build hooks env.
505

506
    This runs on the master node.
507

508
    """
509
    return {
510
      "OP_TARGET": self.op.node_name,
511
      "MASTER_CANDIDATE": str(self.op.master_candidate),
512
      "OFFLINE": str(self.op.offline),
513
      "DRAINED": str(self.op.drained),
514
      "MASTER_CAPABLE": str(self.op.master_capable),
515
      "VM_CAPABLE": str(self.op.vm_capable),
516
      }
517

    
518
  def BuildHooksNodes(self):
519
    """Build hooks nodes.
520

521
    """
522
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
523
    return (nl, nl)
524

    
525
  def CheckPrereq(self):
526
    """Check prerequisites.
527

528
    This only checks the instance list against the existing names.
529

530
    """
531
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
532
    if self.lock_instances:
533
      affected_instances = \
534
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
535

    
536
      # Verify instance locks
537
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
538
      wanted_instance_names = frozenset([inst.name for inst in
539
                                         affected_instances.values()])
540
      if wanted_instance_names - owned_instance_names:
541
        raise errors.OpPrereqError("Instances affected by changing node %s's"
542
                                   " secondary IP address have changed since"
543
                                   " locks were acquired, wanted '%s', have"
544
                                   " '%s'; retry the operation" %
545
                                   (node.name,
546
                                    utils.CommaJoin(wanted_instance_names),
547
                                    utils.CommaJoin(owned_instance_names)),
548
                                   errors.ECODE_STATE)
549
    else:
550
      affected_instances = None
551

    
552
    if (self.op.master_candidate is not None or
553
        self.op.drained is not None or
554
        self.op.offline is not None):
555
      # we can't change the master's node flags
556
      if node.uuid == self.cfg.GetMasterNode():
557
        raise errors.OpPrereqError("The master role can be changed"
558
                                   " only via master-failover",
559
                                   errors.ECODE_INVAL)
560

    
561
    if self.op.master_candidate and not node.master_capable:
562
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
563
                                 " it a master candidate" % node.name,
564
                                 errors.ECODE_STATE)
565

    
566
    if self.op.vm_capable is False:
567
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
568
      if ipri or isec:
569
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
570
                                   " the vm_capable flag" % node.name,
571
                                   errors.ECODE_STATE)
572

    
573
    if node.master_candidate and self.might_demote and not self.lock_all:
574
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
575
      # check if after removing the current node, we're missing master
576
      # candidates
577
      (mc_remaining, mc_should, _) = \
578
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
579
      if mc_remaining < mc_should:
580
        raise errors.OpPrereqError("Not enough master candidates, please"
581
                                   " pass auto promote option to allow"
582
                                   " promotion (--auto-promote or RAPI"
583
                                   " auto_promote=True)", errors.ECODE_STATE)
584

    
585
    self.old_flags = old_flags = (node.master_candidate,
586
                                  node.drained, node.offline)
587
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
588
    self.old_role = old_role = self._F2R[old_flags]
589

    
590
    # Check for ineffective changes
591
    for attr in self._FLAGS:
592
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
593
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
594
        setattr(self.op, attr, None)
595

    
596
    # Past this point, any flag change to False means a transition
597
    # away from the respective state, as only real changes are kept
598

    
599
    # TODO: We might query the real power state if it supports OOB
600
    if SupportsOob(self.cfg, node):
601
      if self.op.offline is False and not (node.powered or
602
                                           self.op.powered is True):
603
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
604
                                    " offline status can be reset") %
605
                                   self.op.node_name, errors.ECODE_STATE)
606
    elif self.op.powered is not None:
607
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
608
                                  " as it does not support out-of-band"
609
                                  " handling") % self.op.node_name,
610
                                 errors.ECODE_STATE)
611

    
612
    # If we're being deofflined/drained, we'll MC ourself if needed
613
    if (self.op.drained is False or self.op.offline is False or
614
        (self.op.master_capable and not node.master_capable)):
615
      if _DecideSelfPromotion(self):
616
        self.op.master_candidate = True
617
        self.LogInfo("Auto-promoting node to master candidate")
618

    
619
    # If we're no longer master capable, we'll demote ourselves from MC
620
    if self.op.master_capable is False and node.master_candidate:
621
      self.LogInfo("Demoting from master candidate")
622
      self.op.master_candidate = False
623

    
624
    # Compute new role
625
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
626
    if self.op.master_candidate:
627
      new_role = self._ROLE_CANDIDATE
628
    elif self.op.drained:
629
      new_role = self._ROLE_DRAINED
630
    elif self.op.offline:
631
      new_role = self._ROLE_OFFLINE
632
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
633
      # False is still in new flags, which means we're un-setting (the
634
      # only) True flag
635
      new_role = self._ROLE_REGULAR
636
    else: # no new flags, nothing, keep old role
637
      new_role = old_role
638

    
639
    self.new_role = new_role
640

    
641
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
642
      # Trying to transition out of offline status
643
      result = self.rpc.call_version([node.uuid])[node.uuid]
644
      if result.fail_msg:
645
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
646
                                   " to report its version: %s" %
647
                                   (node.name, result.fail_msg),
648
                                   errors.ECODE_STATE)
649
      else:
650
        self.LogWarning("Transitioning node from offline to online state"
651
                        " without using re-add. Please make sure the node"
652
                        " is healthy!")
653

    
654
    # When changing the secondary ip, verify if this is a single-homed to
655
    # multi-homed transition or vice versa, and apply the relevant
656
    # restrictions.
657
    if self.op.secondary_ip:
658
      # Ok even without locking, because this can't be changed by any LU
659
      master = self.cfg.GetMasterNodeInfo()
660
      master_singlehomed = master.secondary_ip == master.primary_ip
661
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
662
        if self.op.force and node.uuid == master.uuid:
663
          self.LogWarning("Transitioning from single-homed to multi-homed"
664
                          " cluster; all nodes will require a secondary IP"
665
                          " address")
666
        else:
667
          raise errors.OpPrereqError("Changing the secondary ip on a"
668
                                     " single-homed cluster requires the"
669
                                     " --force option to be passed, and the"
670
                                     " target node to be the master",
671
                                     errors.ECODE_INVAL)
672
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
673
        if self.op.force and node.uuid == master.uuid:
674
          self.LogWarning("Transitioning from multi-homed to single-homed"
675
                          " cluster; secondary IP addresses will have to be"
676
                          " removed")
677
        else:
678
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
679
                                     " same as the primary IP on a multi-homed"
680
                                     " cluster, unless the --force option is"
681
                                     " passed, and the target node is the"
682
                                     " master", errors.ECODE_INVAL)
683

    
684
      assert not (set([inst.name for inst in affected_instances.values()]) -
685
                  self.owned_locks(locking.LEVEL_INSTANCE))
686

    
687
      if node.offline:
688
        if affected_instances:
689
          msg = ("Cannot change secondary IP address: offline node has"
690
                 " instances (%s) configured to use it" %
691
                 utils.CommaJoin(
692
                   [inst.name for inst in affected_instances.values()]))
693
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
694
      else:
695
        # On online nodes, check that no instances are running, and that
696
        # the node has the new ip and we can reach it.
697
        for instance in affected_instances.values():
698
          CheckInstanceState(self, instance, INSTANCE_DOWN,
699
                             msg="cannot change secondary ip")
700

    
701
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
702
        if master.uuid != node.uuid:
703
          # check reachability from master secondary ip to new secondary ip
704
          if not netutils.TcpPing(self.op.secondary_ip,
705
                                  constants.DEFAULT_NODED_PORT,
706
                                  source=master.secondary_ip):
707
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
708
                                       " based ping to node daemon port",
709
                                       errors.ECODE_ENVIRON)
710

    
711
    if self.op.ndparams:
712
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
713
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
714
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
715
                           "node", "cluster or group")
716
      self.new_ndparams = new_ndparams
717

    
718
    if self.op.hv_state:
719
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
720
                                                node.hv_state_static)
721

    
722
    if self.op.disk_state:
723
      self.new_disk_state = \
724
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
725

    
726
  def Exec(self, feedback_fn):
727
    """Modifies a node.
728

729
    """
730
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
731
    result = []
732

    
733
    if self.op.ndparams:
734
      node.ndparams = self.new_ndparams
735

    
736
    if self.op.powered is not None:
737
      node.powered = self.op.powered
738

    
739
    if self.op.hv_state:
740
      node.hv_state_static = self.new_hv_state
741

    
742
    if self.op.disk_state:
743
      node.disk_state_static = self.new_disk_state
744

    
745
    for attr in ["master_capable", "vm_capable"]:
746
      val = getattr(self.op, attr)
747
      if val is not None:
748
        setattr(node, attr, val)
749
        result.append((attr, str(val)))
750

    
751
    if self.new_role != self.old_role:
752
      # Tell the node to demote itself, if no longer MC and not offline
753
      if self.old_role == self._ROLE_CANDIDATE and \
754
          self.new_role != self._ROLE_OFFLINE:
755
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
756
        if msg:
757
          self.LogWarning("Node failed to demote itself: %s", msg)
758

    
759
      new_flags = self._R2F[self.new_role]
760
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
761
        if of != nf:
762
          result.append((desc, str(nf)))
763
      (node.master_candidate, node.drained, node.offline) = new_flags
764

    
765
      # we locked all nodes, we adjust the CP before updating this node
766
      if self.lock_all:
767
        AdjustCandidatePool(self, [node.uuid])
768

    
769
    if self.op.secondary_ip:
770
      node.secondary_ip = self.op.secondary_ip
771
      result.append(("secondary_ip", self.op.secondary_ip))
772

    
773
    # this will trigger configuration file update, if needed
774
    self.cfg.Update(node, feedback_fn)
775

    
776
    # this will trigger job queue propagation or cleanup if the mc
777
    # flag changed
778
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
779
      self.context.ReaddNode(node)
780

    
781
    return result
782

    
783

    
784
class LUNodePowercycle(NoHooksLU):
785
  """Powercycles a node.
786

787
  """
788
  REQ_BGL = False
789

    
790
  def CheckArguments(self):
791
    (self.op.node_uuid, self.op.node_name) = \
792
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
793

    
794
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
795
      raise errors.OpPrereqError("The node is the master and the force"
796
                                 " parameter was not set",
797
                                 errors.ECODE_INVAL)
798

    
799
  def ExpandNames(self):
800
    """Locking for PowercycleNode.
801

802
    This is a last-resort option and shouldn't block on other
803
    jobs. Therefore, we grab no locks.
804

805
    """
806
    self.needed_locks = {}
807

    
808
  def Exec(self, feedback_fn):
809
    """Reboots a node.
810

811
    """
812
    default_hypervisor = self.cfg.GetHypervisorType()
813
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
814
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
815
                                           default_hypervisor,
816
                                           hvparams)
817
    result.Raise("Failed to schedule the reboot")
818
    return result.payload
819

    
820

    
821
def _GetNodeInstancesInner(cfg, fn):
822
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
823

    
824

    
825
def _GetNodePrimaryInstances(cfg, node_uuid):
826
  """Returns primary instances on a node.
827

828
  """
829
  return _GetNodeInstancesInner(cfg,
830
                                lambda inst: node_uuid == inst.primary_node)
831

    
832

    
833
def _GetNodeSecondaryInstances(cfg, node_uuid):
834
  """Returns secondary instances on a node.
835

836
  """
837
  return _GetNodeInstancesInner(cfg,
838
                                lambda inst: node_uuid in inst.secondary_nodes)
839

    
840

    
841
def _GetNodeInstances(cfg, node_uuid):
842
  """Returns a list of all primary and secondary instances on a node.
843

844
  """
845

    
846
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
847

    
848

    
849
class LUNodeEvacuate(NoHooksLU):
850
  """Evacuates instances off a list of nodes.
851

852
  """
853
  REQ_BGL = False
854

    
855
  def CheckArguments(self):
856
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
857

    
858
  def ExpandNames(self):
859
    (self.op.node_uuid, self.op.node_name) = \
860
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
861

    
862
    if self.op.remote_node is not None:
863
      (self.op.remote_node_uuid, self.op.remote_node) = \
864
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
865
                              self.op.remote_node)
866
      assert self.op.remote_node
867

    
868
      if self.op.node_uuid == self.op.remote_node_uuid:
869
        raise errors.OpPrereqError("Can not use evacuated node as a new"
870
                                   " secondary node", errors.ECODE_INVAL)
871

    
872
      if self.op.mode != constants.NODE_EVAC_SEC:
873
        raise errors.OpPrereqError("Without the use of an iallocator only"
874
                                   " secondary instances can be evacuated",
875
                                   errors.ECODE_INVAL)
876

    
877
    # Declare locks
878
    self.share_locks = ShareAll()
879
    self.needed_locks = {
880
      locking.LEVEL_INSTANCE: [],
881
      locking.LEVEL_NODEGROUP: [],
882
      locking.LEVEL_NODE: [],
883
      }
884

    
885
    # Determine nodes (via group) optimistically, needs verification once locks
886
    # have been acquired
887
    self.lock_nodes = self._DetermineNodes()
888

    
889
  def _DetermineNodes(self):
890
    """Gets the list of node UUIDs to operate on.
891

892
    """
893
    if self.op.remote_node is None:
894
      # Iallocator will choose any node(s) in the same group
895
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
896
    else:
897
      group_nodes = frozenset([self.op.remote_node_uuid])
898

    
899
    # Determine nodes to be locked
900
    return set([self.op.node_uuid]) | group_nodes
901

    
902
  def _DetermineInstances(self):
903
    """Builds list of instances to operate on.
904

905
    """
906
    assert self.op.mode in constants.NODE_EVAC_MODES
907

    
908
    if self.op.mode == constants.NODE_EVAC_PRI:
909
      # Primary instances only
910
      inst_fn = _GetNodePrimaryInstances
911
      assert self.op.remote_node is None, \
912
        "Evacuating primary instances requires iallocator"
913
    elif self.op.mode == constants.NODE_EVAC_SEC:
914
      # Secondary instances only
915
      inst_fn = _GetNodeSecondaryInstances
916
    else:
917
      # All instances
918
      assert self.op.mode == constants.NODE_EVAC_ALL
919
      inst_fn = _GetNodeInstances
920
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
921
      # per instance
922
      raise errors.OpPrereqError("Due to an issue with the iallocator"
923
                                 " interface it is not possible to evacuate"
924
                                 " all instances at once; specify explicitly"
925
                                 " whether to evacuate primary or secondary"
926
                                 " instances",
927
                                 errors.ECODE_INVAL)
928

    
929
    return inst_fn(self.cfg, self.op.node_uuid)
930

    
931
  def DeclareLocks(self, level):
932
    if level == locking.LEVEL_INSTANCE:
933
      # Lock instances optimistically, needs verification once node and group
934
      # locks have been acquired
935
      self.needed_locks[locking.LEVEL_INSTANCE] = \
936
        set(i.name for i in self._DetermineInstances())
937

    
938
    elif level == locking.LEVEL_NODEGROUP:
939
      # Lock node groups for all potential target nodes optimistically, needs
940
      # verification once nodes have been acquired
941
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
942
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
943

    
944
    elif level == locking.LEVEL_NODE:
945
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
946

    
947
  def CheckPrereq(self):
948
    # Verify locks
949
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
950
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
951
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
952

    
953
    need_nodes = self._DetermineNodes()
954

    
955
    if not owned_nodes.issuperset(need_nodes):
956
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
957
                                 " locks were acquired, current nodes are"
958
                                 " are '%s', used to be '%s'; retry the"
959
                                 " operation" %
960
                                 (self.op.node_name,
961
                                  utils.CommaJoin(need_nodes),
962
                                  utils.CommaJoin(owned_nodes)),
963
                                 errors.ECODE_STATE)
964

    
965
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
966
    if owned_groups != wanted_groups:
967
      raise errors.OpExecError("Node groups changed since locks were acquired,"
968
                               " current groups are '%s', used to be '%s';"
969
                               " retry the operation" %
970
                               (utils.CommaJoin(wanted_groups),
971
                                utils.CommaJoin(owned_groups)))
972

    
973
    # Determine affected instances
974
    self.instances = self._DetermineInstances()
975
    self.instance_names = [i.name for i in self.instances]
976

    
977
    if set(self.instance_names) != owned_instance_names:
978
      raise errors.OpExecError("Instances on node '%s' changed since locks"
979
                               " were acquired, current instances are '%s',"
980
                               " used to be '%s'; retry the operation" %
981
                               (self.op.node_name,
982
                                utils.CommaJoin(self.instance_names),
983
                                utils.CommaJoin(owned_instance_names)))
984

    
985
    if self.instance_names:
986
      self.LogInfo("Evacuating instances from node '%s': %s",
987
                   self.op.node_name,
988
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
989
    else:
990
      self.LogInfo("No instances to evacuate from node '%s'",
991
                   self.op.node_name)
992

    
993
    if self.op.remote_node is not None:
994
      for i in self.instances:
995
        if i.primary_node == self.op.remote_node_uuid:
996
          raise errors.OpPrereqError("Node %s is the primary node of"
997
                                     " instance %s, cannot use it as"
998
                                     " secondary" %
999
                                     (self.op.remote_node, i.name),
1000
                                     errors.ECODE_INVAL)
1001

    
1002
  def Exec(self, feedback_fn):
1003
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1004

    
1005
    if not self.instance_names:
1006
      # No instances to evacuate
1007
      jobs = []
1008

    
1009
    elif self.op.iallocator is not None:
1010
      # TODO: Implement relocation to other group
1011
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1012
                                     instances=list(self.instance_names))
1013
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1014

    
1015
      ial.Run(self.op.iallocator)
1016

    
1017
      if not ial.success:
1018
        raise errors.OpPrereqError("Can't compute node evacuation using"
1019
                                   " iallocator '%s': %s" %
1020
                                   (self.op.iallocator, ial.info),
1021
                                   errors.ECODE_NORES)
1022

    
1023
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1024

    
1025
    elif self.op.remote_node is not None:
1026
      assert self.op.mode == constants.NODE_EVAC_SEC
1027
      jobs = [
1028
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1029
                                        remote_node=self.op.remote_node,
1030
                                        disks=[],
1031
                                        mode=constants.REPLACE_DISK_CHG,
1032
                                        early_release=self.op.early_release)]
1033
        for instance_name in self.instance_names]
1034

    
1035
    else:
1036
      raise errors.ProgrammerError("No iallocator or remote node")
1037

    
1038
    return ResultWithJobs(jobs)
1039

    
1040

    
1041
class LUNodeMigrate(LogicalUnit):
1042
  """Migrate all instances from a node.
1043

1044
  """
1045
  HPATH = "node-migrate"
1046
  HTYPE = constants.HTYPE_NODE
1047
  REQ_BGL = False
1048

    
1049
  def CheckArguments(self):
1050
    pass
1051

    
1052
  def ExpandNames(self):
1053
    (self.op.node_uuid, self.op.node_name) = \
1054
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1055

    
1056
    self.share_locks = ShareAll()
1057
    self.needed_locks = {
1058
      locking.LEVEL_NODE: [self.op.node_uuid],
1059
      }
1060

    
1061
  def BuildHooksEnv(self):
1062
    """Build hooks env.
1063

1064
    This runs on the master, the primary and all the secondaries.
1065

1066
    """
1067
    return {
1068
      "NODE_NAME": self.op.node_name,
1069
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1070
      }
1071

    
1072
  def BuildHooksNodes(self):
1073
    """Build hooks nodes.
1074

1075
    """
1076
    nl = [self.cfg.GetMasterNode()]
1077
    return (nl, nl)
1078

    
1079
  def CheckPrereq(self):
1080
    pass
1081

    
1082
  def Exec(self, feedback_fn):
1083
    # Prepare jobs for migration instances
1084
    jobs = [
1085
      [opcodes.OpInstanceMigrate(
1086
        instance_name=inst.name,
1087
        mode=self.op.mode,
1088
        live=self.op.live,
1089
        iallocator=self.op.iallocator,
1090
        target_node=self.op.target_node,
1091
        allow_runtime_changes=self.op.allow_runtime_changes,
1092
        ignore_ipolicy=self.op.ignore_ipolicy)]
1093
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1094

    
1095
    # TODO: Run iallocator in this opcode and pass correct placement options to
1096
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1097
    # running the iallocator and the actual migration, a good consistency model
1098
    # will have to be found.
1099

    
1100
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1101
            frozenset([self.op.node_uuid]))
1102

    
1103
    return ResultWithJobs(jobs)
1104

    
1105

    
1106
def _GetStorageTypeArgs(cfg, storage_type):
1107
  """Returns the arguments for a storage type.
1108

1109
  """
1110
  # Special case for file storage
1111
  if storage_type == constants.ST_FILE:
1112
    # storage.FileStorage wants a list of storage directories
1113
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1114

    
1115
  return []
1116

    
1117

    
1118
class LUNodeModifyStorage(NoHooksLU):
1119
  """Logical unit for modifying a storage volume on a node.
1120

1121
  """
1122
  REQ_BGL = False
1123

    
1124
  def CheckArguments(self):
1125
    (self.op.node_uuid, self.op.node_name) = \
1126
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1127

    
1128
    storage_type = self.op.storage_type
1129

    
1130
    try:
1131
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1132
    except KeyError:
1133
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1134
                                 " modified" % storage_type,
1135
                                 errors.ECODE_INVAL)
1136

    
1137
    diff = set(self.op.changes.keys()) - modifiable
1138
    if diff:
1139
      raise errors.OpPrereqError("The following fields can not be modified for"
1140
                                 " storage units of type '%s': %r" %
1141
                                 (storage_type, list(diff)),
1142
                                 errors.ECODE_INVAL)
1143

    
1144
  def CheckPrereq(self):
1145
    """Check prerequisites.
1146

1147
    """
1148
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1149

    
1150
  def ExpandNames(self):
1151
    self.needed_locks = {
1152
      locking.LEVEL_NODE: self.op.node_uuid,
1153
      }
1154

    
1155
  def Exec(self, feedback_fn):
1156
    """Computes the list of nodes and their attributes.
1157

1158
    """
1159
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1160
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1161
                                          self.op.storage_type, st_args,
1162
                                          self.op.name, self.op.changes)
1163
    result.Raise("Failed to modify storage unit '%s' on %s" %
1164
                 (self.op.name, self.op.node_name))
1165

    
1166

    
1167
class NodeQuery(QueryBase):
1168
  FIELDS = query.NODE_FIELDS
1169

    
1170
  def ExpandNames(self, lu):
1171
    lu.needed_locks = {}
1172
    lu.share_locks = ShareAll()
1173

    
1174
    if self.names:
1175
      (self.wanted, _) = GetWantedNodes(lu, self.names)
1176
    else:
1177
      self.wanted = locking.ALL_SET
1178

    
1179
    self.do_locking = (self.use_locking and
1180
                       query.NQ_LIVE in self.requested_data)
1181

    
1182
    if self.do_locking:
1183
      # If any non-static field is requested we need to lock the nodes
1184
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1185
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1186

    
1187
  def DeclareLocks(self, lu, level):
1188
    pass
1189

    
1190
  def _GetQueryData(self, lu):
1191
    """Computes the list of nodes and their attributes.
1192

1193
    """
1194
    all_info = lu.cfg.GetAllNodesInfo()
1195

    
1196
    node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1197

    
1198
    # Gather data as requested
1199
    if query.NQ_LIVE in self.requested_data:
1200
      # filter out non-vm_capable nodes
1201
      toquery_node_uuids = [node.uuid for node in all_info.values()
1202
                            if node.vm_capable and node.uuid in node_uuids]
1203
      default_template = lu.cfg.GetClusterInfo().enabled_disk_templates[0]
1204
      raw_storage_units = utils.storage.GetStorageUnits(
1205
          lu.cfg, [default_template])
1206
      storage_units = rpc.PrepareStorageUnitsForNodes(
1207
          lu.cfg, raw_storage_units, toquery_node_uuids)
1208
      default_hypervisor = lu.cfg.GetHypervisorType()
1209
      hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1210
      hvspecs = [(default_hypervisor, hvparams)]
1211
      node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1212
                                        hvspecs)
1213
      live_data = dict(
1214
          (uuid, rpc.MakeLegacyNodeInfo(nresult.payload, default_template))
1215
          for (uuid, nresult) in node_data.items()
1216
          if not nresult.fail_msg and nresult.payload)
1217
    else:
1218
      live_data = None
1219

    
1220
    if query.NQ_INST in self.requested_data:
1221
      node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1222
      node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1223

    
1224
      inst_data = lu.cfg.GetAllInstancesInfo()
1225
      inst_uuid_to_inst_name = {}
1226

    
1227
      for inst in inst_data.values():
1228
        inst_uuid_to_inst_name[inst.uuid] = inst.name
1229
        if inst.primary_node in node_to_primary:
1230
          node_to_primary[inst.primary_node].add(inst.uuid)
1231
        for secnode in inst.secondary_nodes:
1232
          if secnode in node_to_secondary:
1233
            node_to_secondary[secnode].add(inst.uuid)
1234
    else:
1235
      node_to_primary = None
1236
      node_to_secondary = None
1237
      inst_uuid_to_inst_name = None
1238

    
1239
    if query.NQ_OOB in self.requested_data:
1240
      oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1241
                         for uuid, node in all_info.iteritems())
1242
    else:
1243
      oob_support = None
1244

    
1245
    if query.NQ_GROUP in self.requested_data:
1246
      groups = lu.cfg.GetAllNodeGroupsInfo()
1247
    else:
1248
      groups = {}
1249

    
1250
    return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1251
                               live_data, lu.cfg.GetMasterNode(),
1252
                               node_to_primary, node_to_secondary,
1253
                               inst_uuid_to_inst_name, groups, oob_support,
1254
                               lu.cfg.GetClusterInfo())
1255

    
1256

    
1257
class LUNodeQuery(NoHooksLU):
1258
  """Logical unit for querying nodes.
1259

1260
  """
1261
  # pylint: disable=W0142
1262
  REQ_BGL = False
1263

    
1264
  def CheckArguments(self):
1265
    self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1266
                         self.op.output_fields, self.op.use_locking)
1267

    
1268
  def ExpandNames(self):
1269
    self.nq.ExpandNames(self)
1270

    
1271
  def DeclareLocks(self, level):
1272
    self.nq.DeclareLocks(self, level)
1273

    
1274
  def Exec(self, feedback_fn):
1275
    return self.nq.OldStyleQuery(self)
1276

    
1277

    
1278
def _CheckOutputFields(fields, selected):
1279
  """Checks whether all selected fields are valid according to fields.
1280

1281
  @type fields: L{utils.FieldSet}
1282
  @param fields: fields set
1283
  @type selected: L{utils.FieldSet}
1284
  @param selected: fields set
1285

1286
  """
1287
  delta = fields.NonMatching(selected)
1288
  if delta:
1289
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1290
                               % ",".join(delta), errors.ECODE_INVAL)
1291

    
1292

    
1293
class LUNodeQueryvols(NoHooksLU):
1294
  """Logical unit for getting volumes on node(s).
1295

1296
  """
1297
  REQ_BGL = False
1298

    
1299
  def CheckArguments(self):
1300
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1301
                                      constants.VF_VG, constants.VF_NAME,
1302
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1303
                       self.op.output_fields)
1304

    
1305
  def ExpandNames(self):
1306
    self.share_locks = ShareAll()
1307

    
1308
    if self.op.nodes:
1309
      self.needed_locks = {
1310
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1311
        }
1312
    else:
1313
      self.needed_locks = {
1314
        locking.LEVEL_NODE: locking.ALL_SET,
1315
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1316
        }
1317

    
1318
  def Exec(self, feedback_fn):
1319
    """Computes the list of nodes and their attributes.
1320

1321
    """
1322
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1323
    volumes = self.rpc.call_node_volumes(node_uuids)
1324

    
1325
    ilist = self.cfg.GetAllInstancesInfo()
1326
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1327

    
1328
    output = []
1329
    for node_uuid in node_uuids:
1330
      nresult = volumes[node_uuid]
1331
      if nresult.offline:
1332
        continue
1333
      msg = nresult.fail_msg
1334
      if msg:
1335
        self.LogWarning("Can't compute volume data on node %s: %s",
1336
                        self.cfg.GetNodeName(node_uuid), msg)
1337
        continue
1338

    
1339
      node_vols = sorted(nresult.payload,
1340
                         key=operator.itemgetter(constants.VF_DEV))
1341

    
1342
      for vol in node_vols:
1343
        node_output = []
1344
        for field in self.op.output_fields:
1345
          if field == constants.VF_NODE:
1346
            val = self.cfg.GetNodeName(node_uuid)
1347
          elif field == constants.VF_PHYS:
1348
            val = vol[constants.VF_DEV]
1349
          elif field == constants.VF_VG:
1350
            val = vol[constants.VF_VG]
1351
          elif field == constants.VF_NAME:
1352
            val = vol[constants.VF_NAME]
1353
          elif field == constants.VF_SIZE:
1354
            val = int(float(vol[constants.VF_SIZE]))
1355
          elif field == constants.VF_INSTANCE:
1356
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1357
                                 vol[constants.VF_NAME]), None)
1358
            if inst is not None:
1359
              val = inst.name
1360
            else:
1361
              val = "-"
1362
          else:
1363
            raise errors.ParameterError(field)
1364
          node_output.append(str(val))
1365

    
1366
        output.append(node_output)
1367

    
1368
    return output
1369

    
1370

    
1371
class LUNodeQueryStorage(NoHooksLU):
1372
  """Logical unit for getting information on storage units on node(s).
1373

1374
  """
1375
  REQ_BGL = False
1376

    
1377
  def CheckArguments(self):
1378
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1379
                       self.op.output_fields)
1380

    
1381
  def ExpandNames(self):
1382
    self.share_locks = ShareAll()
1383

    
1384
    if self.op.nodes:
1385
      self.needed_locks = {
1386
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1387
        }
1388
    else:
1389
      self.needed_locks = {
1390
        locking.LEVEL_NODE: locking.ALL_SET,
1391
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1392
        }
1393

    
1394
  def _DetermineStorageType(self):
1395
    """Determines the default storage type of the cluster.
1396

1397
    """
1398
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1399
    default_storage_type = \
1400
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1401
    return default_storage_type
1402

    
1403
  def CheckPrereq(self):
1404
    """Check prerequisites.
1405

1406
    """
1407
    if self.op.storage_type:
1408
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1409
      self.storage_type = self.op.storage_type
1410
    else:
1411
      self.storage_type = self._DetermineStorageType()
1412
      if self.storage_type not in constants.STS_REPORT:
1413
        raise errors.OpPrereqError(
1414
            "Storage reporting for storage type '%s' is not supported. Please"
1415
            " use the --storage-type option to specify one of the supported"
1416
            " storage types (%s) or set the default disk template to one that"
1417
            " supports storage reporting." %
1418
            (self.storage_type, utils.CommaJoin(constants.STS_REPORT)))
1419

    
1420
  def Exec(self, feedback_fn):
1421
    """Computes the list of nodes and their attributes.
1422

1423
    """
1424
    if self.op.storage_type:
1425
      self.storage_type = self.op.storage_type
1426
    else:
1427
      self.storage_type = self._DetermineStorageType()
1428

    
1429
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1430

    
1431
    # Always get name to sort by
1432
    if constants.SF_NAME in self.op.output_fields:
1433
      fields = self.op.output_fields[:]
1434
    else:
1435
      fields = [constants.SF_NAME] + self.op.output_fields
1436

    
1437
    # Never ask for node or type as it's only known to the LU
1438
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1439
      while extra in fields:
1440
        fields.remove(extra)
1441

    
1442
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1443
    name_idx = field_idx[constants.SF_NAME]
1444

    
1445
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1446
    data = self.rpc.call_storage_list(self.node_uuids,
1447
                                      self.storage_type, st_args,
1448
                                      self.op.name, fields)
1449

    
1450
    result = []
1451

    
1452
    for node_uuid in utils.NiceSort(self.node_uuids):
1453
      node_name = self.cfg.GetNodeName(node_uuid)
1454
      nresult = data[node_uuid]
1455
      if nresult.offline:
1456
        continue
1457

    
1458
      msg = nresult.fail_msg
1459
      if msg:
1460
        self.LogWarning("Can't get storage data from node %s: %s",
1461
                        node_name, msg)
1462
        continue
1463

    
1464
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1465

    
1466
      for name in utils.NiceSort(rows.keys()):
1467
        row = rows[name]
1468

    
1469
        out = []
1470

    
1471
        for field in self.op.output_fields:
1472
          if field == constants.SF_NODE:
1473
            val = node_name
1474
          elif field == constants.SF_TYPE:
1475
            val = self.storage_type
1476
          elif field in field_idx:
1477
            val = row[field_idx[field]]
1478
          else:
1479
            raise errors.ParameterError(field)
1480

    
1481
          out.append(val)
1482

    
1483
        result.append(out)
1484

    
1485
    return result
1486

    
1487

    
1488
class LUNodeRemove(LogicalUnit):
1489
  """Logical unit for removing a node.
1490

1491
  """
1492
  HPATH = "node-remove"
1493
  HTYPE = constants.HTYPE_NODE
1494

    
1495
  def BuildHooksEnv(self):
1496
    """Build hooks env.
1497

1498
    """
1499
    return {
1500
      "OP_TARGET": self.op.node_name,
1501
      "NODE_NAME": self.op.node_name,
1502
      }
1503

    
1504
  def BuildHooksNodes(self):
1505
    """Build hooks nodes.
1506

1507
    This doesn't run on the target node in the pre phase as a failed
1508
    node would then be impossible to remove.
1509

1510
    """
1511
    all_nodes = self.cfg.GetNodeList()
1512
    try:
1513
      all_nodes.remove(self.op.node_uuid)
1514
    except ValueError:
1515
      pass
1516
    return (all_nodes, all_nodes)
1517

    
1518
  def CheckPrereq(self):
1519
    """Check prerequisites.
1520

1521
    This checks:
1522
     - the node exists in the configuration
1523
     - it does not have primary or secondary instances
1524
     - it's not the master
1525

1526
    Any errors are signaled by raising errors.OpPrereqError.
1527

1528
    """
1529
    (self.op.node_uuid, self.op.node_name) = \
1530
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1531
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1532
    assert node is not None
1533

    
1534
    masternode = self.cfg.GetMasterNode()
1535
    if node.uuid == masternode:
1536
      raise errors.OpPrereqError("Node is the master node, failover to another"
1537
                                 " node is required", errors.ECODE_INVAL)
1538

    
1539
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1540
      if node.uuid in instance.all_nodes:
1541
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1542
                                   " please remove first" % instance.name,
1543
                                   errors.ECODE_INVAL)
1544
    self.op.node_name = node.name
1545
    self.node = node
1546

    
1547
  def Exec(self, feedback_fn):
1548
    """Removes the node from the cluster.
1549

1550
    """
1551
    logging.info("Stopping the node daemon and removing configs from node %s",
1552
                 self.node.name)
1553

    
1554
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1555

    
1556
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1557
      "Not owning BGL"
1558

    
1559
    # Promote nodes to master candidate as needed
1560
    AdjustCandidatePool(self, exceptions=[self.node.uuid])
1561
    self.context.RemoveNode(self.node)
1562

    
1563
    # Run post hooks on the node before it's removed
1564
    RunPostHook(self, self.node.name)
1565

    
1566
    # we have to call this by name rather than by UUID, as the node is no longer
1567
    # in the config
1568
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1569
    msg = result.fail_msg
1570
    if msg:
1571
      self.LogWarning("Errors encountered on the remote node while leaving"
1572
                      " the cluster: %s", msg)
1573

    
1574
    # Remove node from our /etc/hosts
1575
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1576
      master_node_uuid = self.cfg.GetMasterNode()
1577
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1578
                                              constants.ETC_HOSTS_REMOVE,
1579
                                              self.node.name, None)
1580
      result.Raise("Can't update hosts file with new host data")
1581
      RedistributeAncillaryFiles(self)
1582

    
1583

    
1584
class LURepairNodeStorage(NoHooksLU):
1585
  """Repairs the volume group on a node.
1586

1587
  """
1588
  REQ_BGL = False
1589

    
1590
  def CheckArguments(self):
1591
    (self.op.node_uuid, self.op.node_name) = \
1592
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1593

    
1594
    storage_type = self.op.storage_type
1595

    
1596
    if (constants.SO_FIX_CONSISTENCY not in
1597
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1598
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1599
                                 " repaired" % storage_type,
1600
                                 errors.ECODE_INVAL)
1601

    
1602
  def ExpandNames(self):
1603
    self.needed_locks = {
1604
      locking.LEVEL_NODE: [self.op.node_uuid],
1605
      }
1606

    
1607
  def _CheckFaultyDisks(self, instance, node_uuid):
1608
    """Ensure faulty disks abort the opcode or at least warn."""
1609
    try:
1610
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1611
                                 node_uuid, True):
1612
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1613
                                   " node '%s'" %
1614
                                   (instance.name,
1615
                                    self.cfg.GetNodeName(node_uuid)),
1616
                                   errors.ECODE_STATE)
1617
    except errors.OpPrereqError, err:
1618
      if self.op.ignore_consistency:
1619
        self.LogWarning(str(err.args[0]))
1620
      else:
1621
        raise
1622

    
1623
  def CheckPrereq(self):
1624
    """Check prerequisites.
1625

1626
    """
1627
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1628

    
1629
    # Check whether any instance on this node has faulty disks
1630
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1631
      if not inst.disks_active:
1632
        continue
1633
      check_nodes = set(inst.all_nodes)
1634
      check_nodes.discard(self.op.node_uuid)
1635
      for inst_node_uuid in check_nodes:
1636
        self._CheckFaultyDisks(inst, inst_node_uuid)
1637

    
1638
  def Exec(self, feedback_fn):
1639
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1640
                (self.op.name, self.op.node_name))
1641

    
1642
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1643
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1644
                                           self.op.storage_type, st_args,
1645
                                           self.op.name,
1646
                                           constants.SO_FIX_CONSISTENCY)
1647
    result.Raise("Failed to repair storage unit '%s' on %s" %
1648
                 (self.op.name, self.op.node_name))