Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 6ccce5d4

History | View | Annotate | Download (58.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
import ganeti.rpc.node as rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
46
  AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts
47

    
48

    
49
def _DecideSelfPromotion(lu, exceptions=None):
50
  """Decide whether I should promote myself as a master candidate.
51

52
  """
53
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
54
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
55
  # the new node will increase mc_max with one, so:
56
  mc_should = min(mc_should + 1, cp_size)
57
  return mc_now < mc_should
58

    
59

    
60
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
61
  """Ensure that a node has the given secondary ip.
62

63
  @type lu: L{LogicalUnit}
64
  @param lu: the LU on behalf of which we make the check
65
  @type node: L{objects.Node}
66
  @param node: the node to check
67
  @type secondary_ip: string
68
  @param secondary_ip: the ip to check
69
  @type prereq: boolean
70
  @param prereq: whether to throw a prerequisite or an execute error
71
  @raise errors.OpPrereqError: if the node doesn't have the ip,
72
  and prereq=True
73
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
74

75
  """
76
  # this can be called with a new node, which has no UUID yet, so perform the
77
  # RPC call using its name
78
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
79
  result.Raise("Failure checking secondary ip on node %s" % node.name,
80
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
81
  if not result.payload:
82
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
83
           " please fix and re-run this command" % secondary_ip)
84
    if prereq:
85
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
86
    else:
87
      raise errors.OpExecError(msg)
88

    
89

    
90
class LUNodeAdd(LogicalUnit):
91
  """Logical unit for adding node to the cluster.
92

93
  """
94
  HPATH = "node-add"
95
  HTYPE = constants.HTYPE_NODE
96
  _NFLAGS = ["master_capable", "vm_capable"]
97

    
98
  def CheckArguments(self):
99
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
100
    # validate/normalize the node name
101
    self.hostname = netutils.GetHostname(name=self.op.node_name,
102
                                         family=self.primary_ip_family)
103
    self.op.node_name = self.hostname.name
104

    
105
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
106
      raise errors.OpPrereqError("Cannot readd the master node",
107
                                 errors.ECODE_STATE)
108

    
109
    if self.op.readd and self.op.group:
110
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
111
                                 " being readded", errors.ECODE_INVAL)
112

    
113
  def BuildHooksEnv(self):
114
    """Build hooks env.
115

116
    This will run on all nodes before, and on all nodes + the new node after.
117

118
    """
119
    return {
120
      "OP_TARGET": self.op.node_name,
121
      "NODE_NAME": self.op.node_name,
122
      "NODE_PIP": self.op.primary_ip,
123
      "NODE_SIP": self.op.secondary_ip,
124
      "MASTER_CAPABLE": str(self.op.master_capable),
125
      "VM_CAPABLE": str(self.op.vm_capable),
126
      }
127

    
128
  def BuildHooksNodes(self):
129
    """Build hooks nodes.
130

131
    """
132
    hook_nodes = self.cfg.GetNodeList()
133
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
134
    if new_node_info is not None:
135
      # Exclude added node
136
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
137

    
138
    # add the new node as post hook node by name; it does not have an UUID yet
139
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
140

    
141
  def CheckPrereq(self):
142
    """Check prerequisites.
143

144
    This checks:
145
     - the new node is not already in the config
146
     - it is resolvable
147
     - its parameters (single/dual homed) matches the cluster
148

149
    Any errors are signaled by raising errors.OpPrereqError.
150

151
    """
152
    node_name = self.hostname.name
153
    self.op.primary_ip = self.hostname.ip
154
    if self.op.secondary_ip is None:
155
      if self.primary_ip_family == netutils.IP6Address.family:
156
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
157
                                   " IPv4 address must be given as secondary",
158
                                   errors.ECODE_INVAL)
159
      self.op.secondary_ip = self.op.primary_ip
160

    
161
    secondary_ip = self.op.secondary_ip
162
    if not netutils.IP4Address.IsValid(secondary_ip):
163
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
164
                                 " address" % secondary_ip, errors.ECODE_INVAL)
165

    
166
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
167
    if not self.op.readd and existing_node_info is not None:
168
      raise errors.OpPrereqError("Node %s is already in the configuration" %
169
                                 node_name, errors.ECODE_EXISTS)
170
    elif self.op.readd and existing_node_info is None:
171
      raise errors.OpPrereqError("Node %s is not in the configuration" %
172
                                 node_name, errors.ECODE_NOENT)
173

    
174
    self.changed_primary_ip = False
175

    
176
    for existing_node in self.cfg.GetAllNodesInfo().values():
177
      if self.op.readd and node_name == existing_node.name:
178
        if existing_node.secondary_ip != secondary_ip:
179
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
180
                                     " address configuration as before",
181
                                     errors.ECODE_INVAL)
182
        if existing_node.primary_ip != self.op.primary_ip:
183
          self.changed_primary_ip = True
184

    
185
        continue
186

    
187
      if (existing_node.primary_ip == self.op.primary_ip or
188
          existing_node.secondary_ip == self.op.primary_ip or
189
          existing_node.primary_ip == secondary_ip or
190
          existing_node.secondary_ip == secondary_ip):
191
        raise errors.OpPrereqError("New node ip address(es) conflict with"
192
                                   " existing node %s" % existing_node.name,
193
                                   errors.ECODE_NOTUNIQUE)
194

    
195
    # After this 'if' block, None is no longer a valid value for the
196
    # _capable op attributes
197
    if self.op.readd:
198
      assert existing_node_info is not None, \
199
        "Can't retrieve locked node %s" % node_name
200
      for attr in self._NFLAGS:
201
        if getattr(self.op, attr) is None:
202
          setattr(self.op, attr, getattr(existing_node_info, attr))
203
    else:
204
      for attr in self._NFLAGS:
205
        if getattr(self.op, attr) is None:
206
          setattr(self.op, attr, True)
207

    
208
    if self.op.readd and not self.op.vm_capable:
209
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
210
      if pri or sec:
211
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
212
                                   " flag set to false, but it already holds"
213
                                   " instances" % node_name,
214
                                   errors.ECODE_STATE)
215

    
216
    # check that the type of the node (single versus dual homed) is the
217
    # same as for the master
218
    myself = self.cfg.GetMasterNodeInfo()
219
    master_singlehomed = myself.secondary_ip == myself.primary_ip
220
    newbie_singlehomed = secondary_ip == self.op.primary_ip
221
    if master_singlehomed != newbie_singlehomed:
222
      if master_singlehomed:
223
        raise errors.OpPrereqError("The master has no secondary ip but the"
224
                                   " new node has one",
225
                                   errors.ECODE_INVAL)
226
      else:
227
        raise errors.OpPrereqError("The master has a secondary ip but the"
228
                                   " new node doesn't have one",
229
                                   errors.ECODE_INVAL)
230

    
231
    # checks reachability
232
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
233
      raise errors.OpPrereqError("Node not reachable by ping",
234
                                 errors.ECODE_ENVIRON)
235

    
236
    if not newbie_singlehomed:
237
      # check reachability from my secondary ip to newbie's secondary ip
238
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
239
                              source=myself.secondary_ip):
240
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
241
                                   " based ping to node daemon port",
242
                                   errors.ECODE_ENVIRON)
243

    
244
    if self.op.readd:
245
      exceptions = [existing_node_info.uuid]
246
    else:
247
      exceptions = []
248

    
249
    if self.op.master_capable:
250
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
251
    else:
252
      self.master_candidate = False
253

    
254
    node_group = self.cfg.LookupNodeGroup(self.op.group)
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      self.new_node = objects.Node(name=node_name,
260
                                   primary_ip=self.op.primary_ip,
261
                                   secondary_ip=secondary_ip,
262
                                   master_candidate=self.master_candidate,
263
                                   offline=False, drained=False,
264
                                   group=node_group, ndparams={})
265

    
266
    if self.op.ndparams:
267
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
268
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
269
                           "node", "cluster or group")
270

    
271
    if self.op.hv_state:
272
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
273

    
274
    if self.op.disk_state:
275
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
276

    
277
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
278
    #       it a property on the base class.
279
    rpcrunner = rpc.DnsOnlyRunner()
280
    result = rpcrunner.call_version([node_name])[node_name]
281
    result.Raise("Can't get version information from node %s" % node_name)
282
    if constants.PROTOCOL_VERSION == result.payload:
283
      logging.info("Communication to node %s fine, sw version %s match",
284
                   node_name, result.payload)
285
    else:
286
      raise errors.OpPrereqError("Version mismatch master version %s,"
287
                                 " node version %s" %
288
                                 (constants.PROTOCOL_VERSION, result.payload),
289
                                 errors.ECODE_ENVIRON)
290

    
291
    vg_name = self.cfg.GetVGName()
292
    if vg_name is not None:
293
      vparams = {constants.NV_PVLIST: [vg_name]}
294
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
295
      cname = self.cfg.GetClusterName()
296
      result = rpcrunner.call_node_verify_light(
297
          [node_name], vparams, cname,
298
          self.cfg.GetClusterInfo().hvparams,
299
          {node_name: node_group},
300
          self.cfg.GetAllNodeGroupsInfoDict()
301
        )[node_name]
302
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
303
      if errmsgs:
304
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
305
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
306

    
307
  def _InitOpenVSwitch(self):
308
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
309
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
310

    
311
    ovs = filled_ndparams.get(constants.ND_OVS, None)
312
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
313
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
314

    
315
    if ovs:
316
      if not ovs_link:
317
        self.LogInfo("No physical interface for OpenvSwitch was given."
318
                     " OpenvSwitch will not have an outside connection. This"
319
                     " might not be what you want.")
320

    
321
      result = self.rpc.call_node_configure_ovs(
322
                 self.new_node.name, ovs_name, ovs_link)
323
      result.Raise("Failed to initialize OpenVSwitch on new node")
324

    
325
  def Exec(self, feedback_fn):
326
    """Adds the new node to the cluster.
327

328
    """
329
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
330
      "Not owning BGL"
331

    
332
    # We adding a new node so we assume it's powered
333
    self.new_node.powered = True
334

    
335
    # for re-adds, reset the offline/drained/master-candidate flags;
336
    # we need to reset here, otherwise offline would prevent RPC calls
337
    # later in the procedure; this also means that if the re-add
338
    # fails, we are left with a non-offlined, broken node
339
    if self.op.readd:
340
      self.new_node.offline = False
341
      self.new_node.drained = False
342
      self.LogInfo("Readding a node, the offline/drained flags were reset")
343
      # if we demote the node, we do cleanup later in the procedure
344
      self.new_node.master_candidate = self.master_candidate
345
      if self.changed_primary_ip:
346
        self.new_node.primary_ip = self.op.primary_ip
347

    
348
    # copy the master/vm_capable flags
349
    for attr in self._NFLAGS:
350
      setattr(self.new_node, attr, getattr(self.op, attr))
351

    
352
    # notify the user about any possible mc promotion
353
    if self.new_node.master_candidate:
354
      self.LogInfo("Node will be a master candidate")
355

    
356
    if self.op.ndparams:
357
      self.new_node.ndparams = self.op.ndparams
358
    else:
359
      self.new_node.ndparams = {}
360

    
361
    if self.op.hv_state:
362
      self.new_node.hv_state_static = self.new_hv_state
363

    
364
    if self.op.disk_state:
365
      self.new_node.disk_state_static = self.new_disk_state
366

    
367
    # Add node to our /etc/hosts, and add key to known_hosts
368
    if self.cfg.GetClusterInfo().modify_etc_hosts:
369
      master_node = self.cfg.GetMasterNode()
370
      result = self.rpc.call_etc_hosts_modify(
371
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
372
                 self.hostname.ip)
373
      result.Raise("Can't update hosts file with new host data")
374

    
375
    if self.new_node.secondary_ip != self.new_node.primary_ip:
376
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
377
                               False)
378

    
379
    node_verifier_uuids = [self.cfg.GetMasterNode()]
380
    node_verify_param = {
381
      constants.NV_NODELIST: ([self.new_node.name], {}),
382
      # TODO: do a node-net-test as well?
383
    }
384

    
385
    result = self.rpc.call_node_verify(
386
               node_verifier_uuids, node_verify_param,
387
               self.cfg.GetClusterName(),
388
               self.cfg.GetClusterInfo().hvparams,
389
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
390
               self.cfg.GetAllNodeGroupsInfoDict()
391
               )
392
    for verifier in node_verifier_uuids:
393
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
394
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
395
      if nl_payload:
396
        for failed in nl_payload:
397
          feedback_fn("ssh/hostname verification failed"
398
                      " (checking from %s): %s" %
399
                      (verifier, nl_payload[failed]))
400
        raise errors.OpExecError("ssh/hostname verification failed")
401

    
402
    self._InitOpenVSwitch()
403

    
404
    if self.op.readd:
405
      self.context.ReaddNode(self.new_node)
406
      RedistributeAncillaryFiles(self)
407
      # make sure we redistribute the config
408
      self.cfg.Update(self.new_node, feedback_fn)
409
      # and make sure the new node will not have old files around
410
      if not self.new_node.master_candidate:
411
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
412
        result.Warn("Node failed to demote itself from master candidate status",
413
                    self.LogWarning)
414
    else:
415
      self.context.AddNode(self.new_node, self.proc.GetECId())
416
      RedistributeAncillaryFiles(self)
417

    
418
    cluster = self.cfg.GetClusterInfo()
419
    # We create a new certificate even if the node is readded
420
    digest = CreateNewClientCert(self, self.new_node.uuid)
421
    if self.new_node.master_candidate:
422
      utils.AddNodeToCandidateCerts(self.new_node.uuid, digest,
423
                                    cluster.candidate_certs)
424
      self.cfg.Update(cluster, feedback_fn)
425
    else:
426
      if self.new_node.uuid in cluster.candidate_certs:
427
        utils.RemoveNodeFromCandidateCerts(self.new_node.uuid,
428
                                           cluster.candidate_certs)
429
        self.cfg.Update(cluster, feedback_fn)
430

    
431

    
432
class LUNodeSetParams(LogicalUnit):
433
  """Modifies the parameters of a node.
434

435
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
436
      to the node role (as _ROLE_*)
437
  @cvar _R2F: a dictionary from node role to tuples of flags
438
  @cvar _FLAGS: a list of attribute names corresponding to the flags
439

440
  """
441
  HPATH = "node-modify"
442
  HTYPE = constants.HTYPE_NODE
443
  REQ_BGL = False
444
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
445
  _F2R = {
446
    (True, False, False): _ROLE_CANDIDATE,
447
    (False, True, False): _ROLE_DRAINED,
448
    (False, False, True): _ROLE_OFFLINE,
449
    (False, False, False): _ROLE_REGULAR,
450
    }
451
  _R2F = dict((v, k) for k, v in _F2R.items())
452
  _FLAGS = ["master_candidate", "drained", "offline"]
453

    
454
  def CheckArguments(self):
455
    (self.op.node_uuid, self.op.node_name) = \
456
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
457
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
458
                self.op.master_capable, self.op.vm_capable,
459
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
460
                self.op.disk_state]
461
    if all_mods.count(None) == len(all_mods):
462
      raise errors.OpPrereqError("Please pass at least one modification",
463
                                 errors.ECODE_INVAL)
464
    if all_mods.count(True) > 1:
465
      raise errors.OpPrereqError("Can't set the node into more than one"
466
                                 " state at the same time",
467
                                 errors.ECODE_INVAL)
468

    
469
    # Boolean value that tells us whether we might be demoting from MC
470
    self.might_demote = (self.op.master_candidate is False or
471
                         self.op.offline is True or
472
                         self.op.drained is True or
473
                         self.op.master_capable is False)
474

    
475
    if self.op.secondary_ip:
476
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
477
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
478
                                   " address" % self.op.secondary_ip,
479
                                   errors.ECODE_INVAL)
480

    
481
    self.lock_all = self.op.auto_promote and self.might_demote
482
    self.lock_instances = self.op.secondary_ip is not None
483

    
484
  def _InstanceFilter(self, instance):
485
    """Filter for getting affected instances.
486

487
    """
488
    return (instance.disk_template in constants.DTS_INT_MIRROR and
489
            self.op.node_uuid in instance.all_nodes)
490

    
491
  def ExpandNames(self):
492
    if self.lock_all:
493
      self.needed_locks = {
494
        locking.LEVEL_NODE: locking.ALL_SET,
495

    
496
        # Block allocations when all nodes are locked
497
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
498
        }
499
    else:
500
      self.needed_locks = {
501
        locking.LEVEL_NODE: self.op.node_uuid,
502
        }
503

    
504
    # Since modifying a node can have severe effects on currently running
505
    # operations the resource lock is at least acquired in shared mode
506
    self.needed_locks[locking.LEVEL_NODE_RES] = \
507
      self.needed_locks[locking.LEVEL_NODE]
508

    
509
    # Get all locks except nodes in shared mode; they are not used for anything
510
    # but read-only access
511
    self.share_locks = ShareAll()
512
    self.share_locks[locking.LEVEL_NODE] = 0
513
    self.share_locks[locking.LEVEL_NODE_RES] = 0
514
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
515

    
516
    if self.lock_instances:
517
      self.needed_locks[locking.LEVEL_INSTANCE] = \
518
        self.cfg.GetInstanceNames(
519
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
520

    
521
  def BuildHooksEnv(self):
522
    """Build hooks env.
523

524
    This runs on the master node.
525

526
    """
527
    return {
528
      "OP_TARGET": self.op.node_name,
529
      "MASTER_CANDIDATE": str(self.op.master_candidate),
530
      "OFFLINE": str(self.op.offline),
531
      "DRAINED": str(self.op.drained),
532
      "MASTER_CAPABLE": str(self.op.master_capable),
533
      "VM_CAPABLE": str(self.op.vm_capable),
534
      }
535

    
536
  def BuildHooksNodes(self):
537
    """Build hooks nodes.
538

539
    """
540
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
541
    return (nl, nl)
542

    
543
  def CheckPrereq(self):
544
    """Check prerequisites.
545

546
    This only checks the instance list against the existing names.
547

548
    """
549
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
550
    if self.lock_instances:
551
      affected_instances = \
552
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
553

    
554
      # Verify instance locks
555
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
556
      wanted_instance_names = frozenset([inst.name for inst in
557
                                         affected_instances.values()])
558
      if wanted_instance_names - owned_instance_names:
559
        raise errors.OpPrereqError("Instances affected by changing node %s's"
560
                                   " secondary IP address have changed since"
561
                                   " locks were acquired, wanted '%s', have"
562
                                   " '%s'; retry the operation" %
563
                                   (node.name,
564
                                    utils.CommaJoin(wanted_instance_names),
565
                                    utils.CommaJoin(owned_instance_names)),
566
                                   errors.ECODE_STATE)
567
    else:
568
      affected_instances = None
569

    
570
    if (self.op.master_candidate is not None or
571
        self.op.drained is not None or
572
        self.op.offline is not None):
573
      # we can't change the master's node flags
574
      if node.uuid == self.cfg.GetMasterNode():
575
        raise errors.OpPrereqError("The master role can be changed"
576
                                   " only via master-failover",
577
                                   errors.ECODE_INVAL)
578

    
579
    if self.op.master_candidate and not node.master_capable:
580
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
581
                                 " it a master candidate" % node.name,
582
                                 errors.ECODE_STATE)
583

    
584
    if self.op.vm_capable is False:
585
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
586
      if ipri or isec:
587
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
588
                                   " the vm_capable flag" % node.name,
589
                                   errors.ECODE_STATE)
590

    
591
    if node.master_candidate and self.might_demote and not self.lock_all:
592
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
593
      # check if after removing the current node, we're missing master
594
      # candidates
595
      (mc_remaining, mc_should, _) = \
596
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
597
      if mc_remaining < mc_should:
598
        raise errors.OpPrereqError("Not enough master candidates, please"
599
                                   " pass auto promote option to allow"
600
                                   " promotion (--auto-promote or RAPI"
601
                                   " auto_promote=True)", errors.ECODE_STATE)
602

    
603
    self.old_flags = old_flags = (node.master_candidate,
604
                                  node.drained, node.offline)
605
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
606
    self.old_role = old_role = self._F2R[old_flags]
607

    
608
    # Check for ineffective changes
609
    for attr in self._FLAGS:
610
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
611
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
612
        setattr(self.op, attr, None)
613

    
614
    # Past this point, any flag change to False means a transition
615
    # away from the respective state, as only real changes are kept
616

    
617
    # TODO: We might query the real power state if it supports OOB
618
    if SupportsOob(self.cfg, node):
619
      if self.op.offline is False and not (node.powered or
620
                                           self.op.powered is True):
621
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
622
                                    " offline status can be reset") %
623
                                   self.op.node_name, errors.ECODE_STATE)
624
    elif self.op.powered is not None:
625
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
626
                                  " as it does not support out-of-band"
627
                                  " handling") % self.op.node_name,
628
                                 errors.ECODE_STATE)
629

    
630
    # If we're being deofflined/drained, we'll MC ourself if needed
631
    if (self.op.drained is False or self.op.offline is False or
632
        (self.op.master_capable and not node.master_capable)):
633
      if _DecideSelfPromotion(self):
634
        self.op.master_candidate = True
635
        self.LogInfo("Auto-promoting node to master candidate")
636

    
637
    # If we're no longer master capable, we'll demote ourselves from MC
638
    if self.op.master_capable is False and node.master_candidate:
639
      self.LogInfo("Demoting from master candidate")
640
      self.op.master_candidate = False
641

    
642
    # Compute new role
643
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
644
    if self.op.master_candidate:
645
      new_role = self._ROLE_CANDIDATE
646
    elif self.op.drained:
647
      new_role = self._ROLE_DRAINED
648
    elif self.op.offline:
649
      new_role = self._ROLE_OFFLINE
650
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
651
      # False is still in new flags, which means we're un-setting (the
652
      # only) True flag
653
      new_role = self._ROLE_REGULAR
654
    else: # no new flags, nothing, keep old role
655
      new_role = old_role
656

    
657
    self.new_role = new_role
658

    
659
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
660
      # Trying to transition out of offline status
661
      result = self.rpc.call_version([node.uuid])[node.uuid]
662
      if result.fail_msg:
663
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
664
                                   " to report its version: %s" %
665
                                   (node.name, result.fail_msg),
666
                                   errors.ECODE_STATE)
667
      else:
668
        self.LogWarning("Transitioning node from offline to online state"
669
                        " without using re-add. Please make sure the node"
670
                        " is healthy!")
671

    
672
    # When changing the secondary ip, verify if this is a single-homed to
673
    # multi-homed transition or vice versa, and apply the relevant
674
    # restrictions.
675
    if self.op.secondary_ip:
676
      # Ok even without locking, because this can't be changed by any LU
677
      master = self.cfg.GetMasterNodeInfo()
678
      master_singlehomed = master.secondary_ip == master.primary_ip
679
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
680
        if self.op.force and node.uuid == master.uuid:
681
          self.LogWarning("Transitioning from single-homed to multi-homed"
682
                          " cluster; all nodes will require a secondary IP"
683
                          " address")
684
        else:
685
          raise errors.OpPrereqError("Changing the secondary ip on a"
686
                                     " single-homed cluster requires the"
687
                                     " --force option to be passed, and the"
688
                                     " target node to be the master",
689
                                     errors.ECODE_INVAL)
690
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
691
        if self.op.force and node.uuid == master.uuid:
692
          self.LogWarning("Transitioning from multi-homed to single-homed"
693
                          " cluster; secondary IP addresses will have to be"
694
                          " removed")
695
        else:
696
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
697
                                     " same as the primary IP on a multi-homed"
698
                                     " cluster, unless the --force option is"
699
                                     " passed, and the target node is the"
700
                                     " master", errors.ECODE_INVAL)
701

    
702
      assert not (set([inst.name for inst in affected_instances.values()]) -
703
                  self.owned_locks(locking.LEVEL_INSTANCE))
704

    
705
      if node.offline:
706
        if affected_instances:
707
          msg = ("Cannot change secondary IP address: offline node has"
708
                 " instances (%s) configured to use it" %
709
                 utils.CommaJoin(
710
                   [inst.name for inst in affected_instances.values()]))
711
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
712
      else:
713
        # On online nodes, check that no instances are running, and that
714
        # the node has the new ip and we can reach it.
715
        for instance in affected_instances.values():
716
          CheckInstanceState(self, instance, INSTANCE_DOWN,
717
                             msg="cannot change secondary ip")
718

    
719
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
720
        if master.uuid != node.uuid:
721
          # check reachability from master secondary ip to new secondary ip
722
          if not netutils.TcpPing(self.op.secondary_ip,
723
                                  constants.DEFAULT_NODED_PORT,
724
                                  source=master.secondary_ip):
725
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
726
                                       " based ping to node daemon port",
727
                                       errors.ECODE_ENVIRON)
728

    
729
    if self.op.ndparams:
730
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
731
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
732
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
733
                           "node", "cluster or group")
734
      self.new_ndparams = new_ndparams
735

    
736
    if self.op.hv_state:
737
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
738
                                                node.hv_state_static)
739

    
740
    if self.op.disk_state:
741
      self.new_disk_state = \
742
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
743

    
744
  def Exec(self, feedback_fn):
745
    """Modifies a node.
746

747
    """
748
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
749
    result = []
750

    
751
    if self.op.ndparams:
752
      node.ndparams = self.new_ndparams
753

    
754
    if self.op.powered is not None:
755
      node.powered = self.op.powered
756

    
757
    if self.op.hv_state:
758
      node.hv_state_static = self.new_hv_state
759

    
760
    if self.op.disk_state:
761
      node.disk_state_static = self.new_disk_state
762

    
763
    for attr in ["master_capable", "vm_capable"]:
764
      val = getattr(self.op, attr)
765
      if val is not None:
766
        setattr(node, attr, val)
767
        result.append((attr, str(val)))
768

    
769
    if self.new_role != self.old_role:
770
      # Tell the node to demote itself, if no longer MC and not offline
771
      if self.old_role == self._ROLE_CANDIDATE and \
772
          self.new_role != self._ROLE_OFFLINE:
773
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
774
        if msg:
775
          self.LogWarning("Node failed to demote itself: %s", msg)
776

    
777
      new_flags = self._R2F[self.new_role]
778
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
779
        if of != nf:
780
          result.append((desc, str(nf)))
781
      (node.master_candidate, node.drained, node.offline) = new_flags
782

    
783
      # we locked all nodes, we adjust the CP before updating this node
784
      if self.lock_all:
785
        AdjustCandidatePool(self, [node.uuid], feedback_fn)
786

    
787
      cluster = self.cfg.GetClusterInfo()
788
      # if node gets promoted, grant RPC priviledges
789
      if self.new_role == self._ROLE_CANDIDATE:
790
        AddNodeCertToCandidateCerts(self, node.uuid, cluster)
791
      # if node is demoted, revoke RPC priviledges
792
      if self.old_role == self._ROLE_CANDIDATE:
793
        RemoveNodeCertFromCandidateCerts(node.uuid, cluster)
794

    
795
    if self.op.secondary_ip:
796
      node.secondary_ip = self.op.secondary_ip
797
      result.append(("secondary_ip", self.op.secondary_ip))
798

    
799
    # this will trigger configuration file update, if needed
800
    self.cfg.Update(node, feedback_fn)
801

    
802
    # this will trigger job queue propagation or cleanup if the mc
803
    # flag changed
804
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
805
      self.context.ReaddNode(node)
806

    
807
    return result
808

    
809

    
810
class LUNodePowercycle(NoHooksLU):
811
  """Powercycles a node.
812

813
  """
814
  REQ_BGL = False
815

    
816
  def CheckArguments(self):
817
    (self.op.node_uuid, self.op.node_name) = \
818
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
819

    
820
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
821
      raise errors.OpPrereqError("The node is the master and the force"
822
                                 " parameter was not set",
823
                                 errors.ECODE_INVAL)
824

    
825
  def ExpandNames(self):
826
    """Locking for PowercycleNode.
827

828
    This is a last-resort option and shouldn't block on other
829
    jobs. Therefore, we grab no locks.
830

831
    """
832
    self.needed_locks = {}
833

    
834
  def Exec(self, feedback_fn):
835
    """Reboots a node.
836

837
    """
838
    default_hypervisor = self.cfg.GetHypervisorType()
839
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
840
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
841
                                           default_hypervisor,
842
                                           hvparams)
843
    result.Raise("Failed to schedule the reboot")
844
    return result.payload
845

    
846

    
847
def _GetNodeInstancesInner(cfg, fn):
848
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
849

    
850

    
851
def _GetNodePrimaryInstances(cfg, node_uuid):
852
  """Returns primary instances on a node.
853

854
  """
855
  return _GetNodeInstancesInner(cfg,
856
                                lambda inst: node_uuid == inst.primary_node)
857

    
858

    
859
def _GetNodeSecondaryInstances(cfg, node_uuid):
860
  """Returns secondary instances on a node.
861

862
  """
863
  return _GetNodeInstancesInner(cfg,
864
                                lambda inst: node_uuid in
865
                                  cfg.GetInstanceSecondaryNodes(inst))
866

    
867

    
868
def _GetNodeInstances(cfg, node_uuid):
869
  """Returns a list of all primary and secondary instances on a node.
870

871
  """
872

    
873
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
874

    
875

    
876
class LUNodeEvacuate(NoHooksLU):
877
  """Evacuates instances off a list of nodes.
878

879
  """
880
  REQ_BGL = False
881

    
882
  def CheckArguments(self):
883
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
884

    
885
  def ExpandNames(self):
886
    (self.op.node_uuid, self.op.node_name) = \
887
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
888

    
889
    if self.op.remote_node is not None:
890
      (self.op.remote_node_uuid, self.op.remote_node) = \
891
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
892
                              self.op.remote_node)
893
      assert self.op.remote_node
894

    
895
      if self.op.node_uuid == self.op.remote_node_uuid:
896
        raise errors.OpPrereqError("Can not use evacuated node as a new"
897
                                   " secondary node", errors.ECODE_INVAL)
898

    
899
      if self.op.mode != constants.NODE_EVAC_SEC:
900
        raise errors.OpPrereqError("Without the use of an iallocator only"
901
                                   " secondary instances can be evacuated",
902
                                   errors.ECODE_INVAL)
903

    
904
    # Declare locks
905
    self.share_locks = ShareAll()
906
    self.needed_locks = {
907
      locking.LEVEL_INSTANCE: [],
908
      locking.LEVEL_NODEGROUP: [],
909
      locking.LEVEL_NODE: [],
910
      }
911

    
912
    # Determine nodes (via group) optimistically, needs verification once locks
913
    # have been acquired
914
    self.lock_nodes = self._DetermineNodes()
915

    
916
  def _DetermineNodes(self):
917
    """Gets the list of node UUIDs to operate on.
918

919
    """
920
    if self.op.remote_node is None:
921
      # Iallocator will choose any node(s) in the same group
922
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
923
    else:
924
      group_nodes = frozenset([self.op.remote_node_uuid])
925

    
926
    # Determine nodes to be locked
927
    return set([self.op.node_uuid]) | group_nodes
928

    
929
  def _DetermineInstances(self):
930
    """Builds list of instances to operate on.
931

932
    """
933
    assert self.op.mode in constants.NODE_EVAC_MODES
934

    
935
    if self.op.mode == constants.NODE_EVAC_PRI:
936
      # Primary instances only
937
      inst_fn = _GetNodePrimaryInstances
938
      assert self.op.remote_node is None, \
939
        "Evacuating primary instances requires iallocator"
940
    elif self.op.mode == constants.NODE_EVAC_SEC:
941
      # Secondary instances only
942
      inst_fn = _GetNodeSecondaryInstances
943
    else:
944
      # All instances
945
      assert self.op.mode == constants.NODE_EVAC_ALL
946
      inst_fn = _GetNodeInstances
947
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
948
      # per instance
949
      raise errors.OpPrereqError("Due to an issue with the iallocator"
950
                                 " interface it is not possible to evacuate"
951
                                 " all instances at once; specify explicitly"
952
                                 " whether to evacuate primary or secondary"
953
                                 " instances",
954
                                 errors.ECODE_INVAL)
955

    
956
    return inst_fn(self.cfg, self.op.node_uuid)
957

    
958
  def DeclareLocks(self, level):
959
    if level == locking.LEVEL_INSTANCE:
960
      # Lock instances optimistically, needs verification once node and group
961
      # locks have been acquired
962
      self.needed_locks[locking.LEVEL_INSTANCE] = \
963
        set(i.name for i in self._DetermineInstances())
964

    
965
    elif level == locking.LEVEL_NODEGROUP:
966
      # Lock node groups for all potential target nodes optimistically, needs
967
      # verification once nodes have been acquired
968
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
969
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
970

    
971
    elif level == locking.LEVEL_NODE:
972
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
973

    
974
  def CheckPrereq(self):
975
    # Verify locks
976
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
977
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
978
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
979

    
980
    need_nodes = self._DetermineNodes()
981

    
982
    if not owned_nodes.issuperset(need_nodes):
983
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
984
                                 " locks were acquired, current nodes are"
985
                                 " are '%s', used to be '%s'; retry the"
986
                                 " operation" %
987
                                 (self.op.node_name,
988
                                  utils.CommaJoin(need_nodes),
989
                                  utils.CommaJoin(owned_nodes)),
990
                                 errors.ECODE_STATE)
991

    
992
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
993
    if owned_groups != wanted_groups:
994
      raise errors.OpExecError("Node groups changed since locks were acquired,"
995
                               " current groups are '%s', used to be '%s';"
996
                               " retry the operation" %
997
                               (utils.CommaJoin(wanted_groups),
998
                                utils.CommaJoin(owned_groups)))
999

    
1000
    # Determine affected instances
1001
    self.instances = self._DetermineInstances()
1002
    self.instance_names = [i.name for i in self.instances]
1003

    
1004
    if set(self.instance_names) != owned_instance_names:
1005
      raise errors.OpExecError("Instances on node '%s' changed since locks"
1006
                               " were acquired, current instances are '%s',"
1007
                               " used to be '%s'; retry the operation" %
1008
                               (self.op.node_name,
1009
                                utils.CommaJoin(self.instance_names),
1010
                                utils.CommaJoin(owned_instance_names)))
1011

    
1012
    if self.instance_names:
1013
      self.LogInfo("Evacuating instances from node '%s': %s",
1014
                   self.op.node_name,
1015
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1016
    else:
1017
      self.LogInfo("No instances to evacuate from node '%s'",
1018
                   self.op.node_name)
1019

    
1020
    if self.op.remote_node is not None:
1021
      for i in self.instances:
1022
        if i.primary_node == self.op.remote_node_uuid:
1023
          raise errors.OpPrereqError("Node %s is the primary node of"
1024
                                     " instance %s, cannot use it as"
1025
                                     " secondary" %
1026
                                     (self.op.remote_node, i.name),
1027
                                     errors.ECODE_INVAL)
1028

    
1029
  def Exec(self, feedback_fn):
1030
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1031

    
1032
    if not self.instance_names:
1033
      # No instances to evacuate
1034
      jobs = []
1035

    
1036
    elif self.op.iallocator is not None:
1037
      # TODO: Implement relocation to other group
1038
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1039
                                     instances=list(self.instance_names))
1040
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1041

    
1042
      ial.Run(self.op.iallocator)
1043

    
1044
      if not ial.success:
1045
        raise errors.OpPrereqError("Can't compute node evacuation using"
1046
                                   " iallocator '%s': %s" %
1047
                                   (self.op.iallocator, ial.info),
1048
                                   errors.ECODE_NORES)
1049

    
1050
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1051

    
1052
    elif self.op.remote_node is not None:
1053
      assert self.op.mode == constants.NODE_EVAC_SEC
1054
      jobs = [
1055
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1056
                                        remote_node=self.op.remote_node,
1057
                                        disks=[],
1058
                                        mode=constants.REPLACE_DISK_CHG,
1059
                                        early_release=self.op.early_release)]
1060
        for instance_name in self.instance_names]
1061

    
1062
    else:
1063
      raise errors.ProgrammerError("No iallocator or remote node")
1064

    
1065
    return ResultWithJobs(jobs)
1066

    
1067

    
1068
class LUNodeMigrate(LogicalUnit):
1069
  """Migrate all instances from a node.
1070

1071
  """
1072
  HPATH = "node-migrate"
1073
  HTYPE = constants.HTYPE_NODE
1074
  REQ_BGL = False
1075

    
1076
  def CheckArguments(self):
1077
    pass
1078

    
1079
  def ExpandNames(self):
1080
    (self.op.node_uuid, self.op.node_name) = \
1081
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1082

    
1083
    self.share_locks = ShareAll()
1084
    self.needed_locks = {
1085
      locking.LEVEL_NODE: [self.op.node_uuid],
1086
      }
1087

    
1088
  def BuildHooksEnv(self):
1089
    """Build hooks env.
1090

1091
    This runs on the master, the primary and all the secondaries.
1092

1093
    """
1094
    return {
1095
      "NODE_NAME": self.op.node_name,
1096
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1097
      }
1098

    
1099
  def BuildHooksNodes(self):
1100
    """Build hooks nodes.
1101

1102
    """
1103
    nl = [self.cfg.GetMasterNode()]
1104
    return (nl, nl)
1105

    
1106
  def CheckPrereq(self):
1107
    pass
1108

    
1109
  def Exec(self, feedback_fn):
1110
    # Prepare jobs for migration instances
1111
    jobs = [
1112
      [opcodes.OpInstanceMigrate(
1113
        instance_name=inst.name,
1114
        mode=self.op.mode,
1115
        live=self.op.live,
1116
        iallocator=self.op.iallocator,
1117
        target_node=self.op.target_node,
1118
        allow_runtime_changes=self.op.allow_runtime_changes,
1119
        ignore_ipolicy=self.op.ignore_ipolicy)]
1120
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1121

    
1122
    # TODO: Run iallocator in this opcode and pass correct placement options to
1123
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1124
    # running the iallocator and the actual migration, a good consistency model
1125
    # will have to be found.
1126

    
1127
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1128
            frozenset([self.op.node_uuid]))
1129

    
1130
    return ResultWithJobs(jobs)
1131

    
1132

    
1133
def _GetStorageTypeArgs(cfg, storage_type):
1134
  """Returns the arguments for a storage type.
1135

1136
  """
1137
  # Special case for file storage
1138

    
1139
  if storage_type == constants.ST_FILE:
1140
    return [[cfg.GetFileStorageDir()]]
1141
  elif storage_type == constants.ST_SHARED_FILE:
1142
    dts = cfg.GetClusterInfo().enabled_disk_templates
1143
    paths = []
1144
    if constants.DT_SHARED_FILE in dts:
1145
      paths.append(cfg.GetSharedFileStorageDir())
1146
    if constants.DT_GLUSTER in dts:
1147
      paths.append(cfg.GetGlusterStorageDir())
1148
    return [paths]
1149
  else:
1150
    return []
1151

    
1152

    
1153
class LUNodeModifyStorage(NoHooksLU):
1154
  """Logical unit for modifying a storage volume on a node.
1155

1156
  """
1157
  REQ_BGL = False
1158

    
1159
  def CheckArguments(self):
1160
    (self.op.node_uuid, self.op.node_name) = \
1161
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1162

    
1163
    storage_type = self.op.storage_type
1164

    
1165
    try:
1166
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1167
    except KeyError:
1168
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1169
                                 " modified" % storage_type,
1170
                                 errors.ECODE_INVAL)
1171

    
1172
    diff = set(self.op.changes.keys()) - modifiable
1173
    if diff:
1174
      raise errors.OpPrereqError("The following fields can not be modified for"
1175
                                 " storage units of type '%s': %r" %
1176
                                 (storage_type, list(diff)),
1177
                                 errors.ECODE_INVAL)
1178

    
1179
  def CheckPrereq(self):
1180
    """Check prerequisites.
1181

1182
    """
1183
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1184

    
1185
  def ExpandNames(self):
1186
    self.needed_locks = {
1187
      locking.LEVEL_NODE: self.op.node_uuid,
1188
      }
1189

    
1190
  def Exec(self, feedback_fn):
1191
    """Computes the list of nodes and their attributes.
1192

1193
    """
1194
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1195
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1196
                                          self.op.storage_type, st_args,
1197
                                          self.op.name, self.op.changes)
1198
    result.Raise("Failed to modify storage unit '%s' on %s" %
1199
                 (self.op.name, self.op.node_name))
1200

    
1201

    
1202
def _CheckOutputFields(fields, selected):
1203
  """Checks whether all selected fields are valid according to fields.
1204

1205
  @type fields: L{utils.FieldSet}
1206
  @param fields: fields set
1207
  @type selected: L{utils.FieldSet}
1208
  @param selected: fields set
1209

1210
  """
1211
  delta = fields.NonMatching(selected)
1212
  if delta:
1213
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1214
                               % ",".join(delta), errors.ECODE_INVAL)
1215

    
1216

    
1217
class LUNodeQueryvols(NoHooksLU):
1218
  """Logical unit for getting volumes on node(s).
1219

1220
  """
1221
  REQ_BGL = False
1222

    
1223
  def CheckArguments(self):
1224
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1225
                                      constants.VF_VG, constants.VF_NAME,
1226
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1227
                       self.op.output_fields)
1228

    
1229
  def ExpandNames(self):
1230
    self.share_locks = ShareAll()
1231

    
1232
    if self.op.nodes:
1233
      self.needed_locks = {
1234
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1235
        }
1236
    else:
1237
      self.needed_locks = {
1238
        locking.LEVEL_NODE: locking.ALL_SET,
1239
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1240
        }
1241

    
1242
  def Exec(self, feedback_fn):
1243
    """Computes the list of nodes and their attributes.
1244

1245
    """
1246
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1247
    volumes = self.rpc.call_node_volumes(node_uuids)
1248

    
1249
    ilist = self.cfg.GetAllInstancesInfo()
1250
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1251

    
1252
    output = []
1253
    for node_uuid in node_uuids:
1254
      nresult = volumes[node_uuid]
1255
      if nresult.offline:
1256
        continue
1257
      msg = nresult.fail_msg
1258
      if msg:
1259
        self.LogWarning("Can't compute volume data on node %s: %s",
1260
                        self.cfg.GetNodeName(node_uuid), msg)
1261
        continue
1262

    
1263
      node_vols = sorted(nresult.payload,
1264
                         key=operator.itemgetter(constants.VF_DEV))
1265

    
1266
      for vol in node_vols:
1267
        node_output = []
1268
        for field in self.op.output_fields:
1269
          if field == constants.VF_NODE:
1270
            val = self.cfg.GetNodeName(node_uuid)
1271
          elif field == constants.VF_PHYS:
1272
            val = vol[constants.VF_DEV]
1273
          elif field == constants.VF_VG:
1274
            val = vol[constants.VF_VG]
1275
          elif field == constants.VF_NAME:
1276
            val = vol[constants.VF_NAME]
1277
          elif field == constants.VF_SIZE:
1278
            val = int(float(vol[constants.VF_SIZE]))
1279
          elif field == constants.VF_INSTANCE:
1280
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1281
                                 vol[constants.VF_NAME]), None)
1282
            if inst is not None:
1283
              val = inst.name
1284
            else:
1285
              val = "-"
1286
          else:
1287
            raise errors.ParameterError(field)
1288
          node_output.append(str(val))
1289

    
1290
        output.append(node_output)
1291

    
1292
    return output
1293

    
1294

    
1295
class LUNodeQueryStorage(NoHooksLU):
1296
  """Logical unit for getting information on storage units on node(s).
1297

1298
  """
1299
  REQ_BGL = False
1300

    
1301
  def CheckArguments(self):
1302
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1303
                       self.op.output_fields)
1304

    
1305
  def ExpandNames(self):
1306
    self.share_locks = ShareAll()
1307

    
1308
    if self.op.nodes:
1309
      self.needed_locks = {
1310
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1311
        }
1312
    else:
1313
      self.needed_locks = {
1314
        locking.LEVEL_NODE: locking.ALL_SET,
1315
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1316
        }
1317

    
1318
  def _DetermineStorageType(self):
1319
    """Determines the default storage type of the cluster.
1320

1321
    """
1322
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1323
    default_storage_type = \
1324
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1325
    return default_storage_type
1326

    
1327
  def CheckPrereq(self):
1328
    """Check prerequisites.
1329

1330
    """
1331
    if self.op.storage_type:
1332
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1333
      self.storage_type = self.op.storage_type
1334
    else:
1335
      self.storage_type = self._DetermineStorageType()
1336
      supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1337
      if self.storage_type not in supported_storage_types:
1338
        raise errors.OpPrereqError(
1339
            "Storage reporting for storage type '%s' is not supported. Please"
1340
            " use the --storage-type option to specify one of the supported"
1341
            " storage types (%s) or set the default disk template to one that"
1342
            " supports storage reporting." %
1343
            (self.storage_type, utils.CommaJoin(supported_storage_types)))
1344

    
1345
  def Exec(self, feedback_fn):
1346
    """Computes the list of nodes and their attributes.
1347

1348
    """
1349
    if self.op.storage_type:
1350
      self.storage_type = self.op.storage_type
1351
    else:
1352
      self.storage_type = self._DetermineStorageType()
1353

    
1354
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1355

    
1356
    # Always get name to sort by
1357
    if constants.SF_NAME in self.op.output_fields:
1358
      fields = self.op.output_fields[:]
1359
    else:
1360
      fields = [constants.SF_NAME] + self.op.output_fields
1361

    
1362
    # Never ask for node or type as it's only known to the LU
1363
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1364
      while extra in fields:
1365
        fields.remove(extra)
1366

    
1367
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1368
    name_idx = field_idx[constants.SF_NAME]
1369

    
1370
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1371
    data = self.rpc.call_storage_list(self.node_uuids,
1372
                                      self.storage_type, st_args,
1373
                                      self.op.name, fields)
1374

    
1375
    result = []
1376

    
1377
    for node_uuid in utils.NiceSort(self.node_uuids):
1378
      node_name = self.cfg.GetNodeName(node_uuid)
1379
      nresult = data[node_uuid]
1380
      if nresult.offline:
1381
        continue
1382

    
1383
      msg = nresult.fail_msg
1384
      if msg:
1385
        self.LogWarning("Can't get storage data from node %s: %s",
1386
                        node_name, msg)
1387
        continue
1388

    
1389
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1390

    
1391
      for name in utils.NiceSort(rows.keys()):
1392
        row = rows[name]
1393

    
1394
        out = []
1395

    
1396
        for field in self.op.output_fields:
1397
          if field == constants.SF_NODE:
1398
            val = node_name
1399
          elif field == constants.SF_TYPE:
1400
            val = self.storage_type
1401
          elif field in field_idx:
1402
            val = row[field_idx[field]]
1403
          else:
1404
            raise errors.ParameterError(field)
1405

    
1406
          out.append(val)
1407

    
1408
        result.append(out)
1409

    
1410
    return result
1411

    
1412

    
1413
class LUNodeRemove(LogicalUnit):
1414
  """Logical unit for removing a node.
1415

1416
  """
1417
  HPATH = "node-remove"
1418
  HTYPE = constants.HTYPE_NODE
1419

    
1420
  def BuildHooksEnv(self):
1421
    """Build hooks env.
1422

1423
    """
1424
    return {
1425
      "OP_TARGET": self.op.node_name,
1426
      "NODE_NAME": self.op.node_name,
1427
      }
1428

    
1429
  def BuildHooksNodes(self):
1430
    """Build hooks nodes.
1431

1432
    This doesn't run on the target node in the pre phase as a failed
1433
    node would then be impossible to remove.
1434

1435
    """
1436
    all_nodes = self.cfg.GetNodeList()
1437
    try:
1438
      all_nodes.remove(self.op.node_uuid)
1439
    except ValueError:
1440
      pass
1441
    return (all_nodes, all_nodes)
1442

    
1443
  def CheckPrereq(self):
1444
    """Check prerequisites.
1445

1446
    This checks:
1447
     - the node exists in the configuration
1448
     - it does not have primary or secondary instances
1449
     - it's not the master
1450

1451
    Any errors are signaled by raising errors.OpPrereqError.
1452

1453
    """
1454
    (self.op.node_uuid, self.op.node_name) = \
1455
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1456
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1457
    assert node is not None
1458

    
1459
    masternode = self.cfg.GetMasterNode()
1460
    if node.uuid == masternode:
1461
      raise errors.OpPrereqError("Node is the master node, failover to another"
1462
                                 " node is required", errors.ECODE_INVAL)
1463

    
1464
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1465
      if node.uuid in instance.all_nodes:
1466
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1467
                                   " please remove first" % instance.name,
1468
                                   errors.ECODE_INVAL)
1469
    self.op.node_name = node.name
1470
    self.node = node
1471

    
1472
  def Exec(self, feedback_fn):
1473
    """Removes the node from the cluster.
1474

1475
    """
1476
    logging.info("Stopping the node daemon and removing configs from node %s",
1477
                 self.node.name)
1478

    
1479
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1480

    
1481
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1482
      "Not owning BGL"
1483

    
1484
    # Promote nodes to master candidate as needed
1485
    AdjustCandidatePool(self, [self.node.uuid], feedback_fn)
1486
    self.context.RemoveNode(self.node)
1487

    
1488
    # Run post hooks on the node before it's removed
1489
    RunPostHook(self, self.node.name)
1490

    
1491
    # we have to call this by name rather than by UUID, as the node is no longer
1492
    # in the config
1493
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1494
    msg = result.fail_msg
1495
    if msg:
1496
      self.LogWarning("Errors encountered on the remote node while leaving"
1497
                      " the cluster: %s", msg)
1498

    
1499
    cluster = self.cfg.GetClusterInfo()
1500

    
1501
    # Remove node from candidate certificate list
1502
    if self.node.master_candidate:
1503
      utils.RemoveNodeFromCandidateCerts(self.node.uuid,
1504
                                         cluster.candidate_certs)
1505
      self.cfg.Update(cluster, feedback_fn)
1506

    
1507
    # Remove node from our /etc/hosts
1508
    if cluster.modify_etc_hosts:
1509
      master_node_uuid = self.cfg.GetMasterNode()
1510
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1511
                                              constants.ETC_HOSTS_REMOVE,
1512
                                              self.node.name, None)
1513
      result.Raise("Can't update hosts file with new host data")
1514
      RedistributeAncillaryFiles(self)
1515

    
1516

    
1517
class LURepairNodeStorage(NoHooksLU):
1518
  """Repairs the volume group on a node.
1519

1520
  """
1521
  REQ_BGL = False
1522

    
1523
  def CheckArguments(self):
1524
    (self.op.node_uuid, self.op.node_name) = \
1525
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1526

    
1527
    storage_type = self.op.storage_type
1528

    
1529
    if (constants.SO_FIX_CONSISTENCY not in
1530
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1531
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1532
                                 " repaired" % storage_type,
1533
                                 errors.ECODE_INVAL)
1534

    
1535
  def ExpandNames(self):
1536
    self.needed_locks = {
1537
      locking.LEVEL_NODE: [self.op.node_uuid],
1538
      }
1539

    
1540
  def _CheckFaultyDisks(self, instance, node_uuid):
1541
    """Ensure faulty disks abort the opcode or at least warn."""
1542
    try:
1543
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1544
                                 node_uuid, True):
1545
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1546
                                   " node '%s'" %
1547
                                   (instance.name,
1548
                                    self.cfg.GetNodeName(node_uuid)),
1549
                                   errors.ECODE_STATE)
1550
    except errors.OpPrereqError, err:
1551
      if self.op.ignore_consistency:
1552
        self.LogWarning(str(err.args[0]))
1553
      else:
1554
        raise
1555

    
1556
  def CheckPrereq(self):
1557
    """Check prerequisites.
1558

1559
    """
1560
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1561

    
1562
    # Check whether any instance on this node has faulty disks
1563
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1564
      if not inst.disks_active:
1565
        continue
1566
      check_nodes = set(inst.all_nodes)
1567
      check_nodes.discard(self.op.node_uuid)
1568
      for inst_node_uuid in check_nodes:
1569
        self._CheckFaultyDisks(inst, inst_node_uuid)
1570

    
1571
  def Exec(self, feedback_fn):
1572
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1573
                (self.op.name, self.op.node_name))
1574

    
1575
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1576
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1577
                                           self.op.storage_type, st_args,
1578
                                           self.op.name,
1579
                                           constants.SO_FIX_CONSISTENCY)
1580
    result.Raise("Failed to repair storage unit '%s' on %s" %
1581
                 (self.op.name, self.op.node_name))