Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 31d3b918

History | View | Annotate | Download (58.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
import ganeti.rpc.node as rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
46
  AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts
47

    
48

    
49
def _DecideSelfPromotion(lu, exceptions=None):
50
  """Decide whether I should promote myself as a master candidate.
51

52
  """
53
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
54
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
55
  # the new node will increase mc_max with one, so:
56
  mc_should = min(mc_should + 1, cp_size)
57
  return mc_now < mc_should
58

    
59

    
60
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
61
  """Ensure that a node has the given secondary ip.
62

63
  @type lu: L{LogicalUnit}
64
  @param lu: the LU on behalf of which we make the check
65
  @type node: L{objects.Node}
66
  @param node: the node to check
67
  @type secondary_ip: string
68
  @param secondary_ip: the ip to check
69
  @type prereq: boolean
70
  @param prereq: whether to throw a prerequisite or an execute error
71
  @raise errors.OpPrereqError: if the node doesn't have the ip,
72
  and prereq=True
73
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
74

75
  """
76
  # this can be called with a new node, which has no UUID yet, so perform the
77
  # RPC call using its name
78
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
79
  result.Raise("Failure checking secondary ip on node %s" % node.name,
80
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
81
  if not result.payload:
82
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
83
           " please fix and re-run this command" % secondary_ip)
84
    if prereq:
85
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
86
    else:
87
      raise errors.OpExecError(msg)
88

    
89

    
90
class LUNodeAdd(LogicalUnit):
91
  """Logical unit for adding node to the cluster.
92

93
  """
94
  HPATH = "node-add"
95
  HTYPE = constants.HTYPE_NODE
96
  _NFLAGS = ["master_capable", "vm_capable"]
97

    
98
  def CheckArguments(self):
99
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
100
    # validate/normalize the node name
101
    self.hostname = netutils.GetHostname(name=self.op.node_name,
102
                                         family=self.primary_ip_family)
103
    self.op.node_name = self.hostname.name
104

    
105
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
106
      raise errors.OpPrereqError("Cannot readd the master node",
107
                                 errors.ECODE_STATE)
108

    
109
    if self.op.readd and self.op.group:
110
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
111
                                 " being readded", errors.ECODE_INVAL)
112

    
113
  def BuildHooksEnv(self):
114
    """Build hooks env.
115

116
    This will run on all nodes before, and on all nodes + the new node after.
117

118
    """
119
    return {
120
      "OP_TARGET": self.op.node_name,
121
      "NODE_NAME": self.op.node_name,
122
      "NODE_PIP": self.op.primary_ip,
123
      "NODE_SIP": self.op.secondary_ip,
124
      "MASTER_CAPABLE": str(self.op.master_capable),
125
      "VM_CAPABLE": str(self.op.vm_capable),
126
      }
127

    
128
  def BuildHooksNodes(self):
129
    """Build hooks nodes.
130

131
    """
132
    hook_nodes = self.cfg.GetNodeList()
133
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
134
    if new_node_info is not None:
135
      # Exclude added node
136
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
137

    
138
    # add the new node as post hook node by name; it does not have an UUID yet
139
    return (hook_nodes, hook_nodes, [self.op.node_name, ])
140

    
141
  def CheckPrereq(self):
142
    """Check prerequisites.
143

144
    This checks:
145
     - the new node is not already in the config
146
     - it is resolvable
147
     - its parameters (single/dual homed) matches the cluster
148

149
    Any errors are signaled by raising errors.OpPrereqError.
150

151
    """
152
    node_name = self.hostname.name
153
    self.op.primary_ip = self.hostname.ip
154
    if self.op.secondary_ip is None:
155
      if self.primary_ip_family == netutils.IP6Address.family:
156
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
157
                                   " IPv4 address must be given as secondary",
158
                                   errors.ECODE_INVAL)
159
      self.op.secondary_ip = self.op.primary_ip
160

    
161
    secondary_ip = self.op.secondary_ip
162
    if not netutils.IP4Address.IsValid(secondary_ip):
163
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
164
                                 " address" % secondary_ip, errors.ECODE_INVAL)
165

    
166
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
167
    if not self.op.readd and existing_node_info is not None:
168
      raise errors.OpPrereqError("Node %s is already in the configuration" %
169
                                 node_name, errors.ECODE_EXISTS)
170
    elif self.op.readd and existing_node_info is None:
171
      raise errors.OpPrereqError("Node %s is not in the configuration" %
172
                                 node_name, errors.ECODE_NOENT)
173

    
174
    self.changed_primary_ip = False
175

    
176
    for existing_node in self.cfg.GetAllNodesInfo().values():
177
      if self.op.readd and node_name == existing_node.name:
178
        if existing_node.secondary_ip != secondary_ip:
179
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
180
                                     " address configuration as before",
181
                                     errors.ECODE_INVAL)
182
        if existing_node.primary_ip != self.op.primary_ip:
183
          self.changed_primary_ip = True
184

    
185
        continue
186

    
187
      if (existing_node.primary_ip == self.op.primary_ip or
188
          existing_node.secondary_ip == self.op.primary_ip or
189
          existing_node.primary_ip == secondary_ip or
190
          existing_node.secondary_ip == secondary_ip):
191
        raise errors.OpPrereqError("New node ip address(es) conflict with"
192
                                   " existing node %s" % existing_node.name,
193
                                   errors.ECODE_NOTUNIQUE)
194

    
195
    # After this 'if' block, None is no longer a valid value for the
196
    # _capable op attributes
197
    if self.op.readd:
198
      assert existing_node_info is not None, \
199
        "Can't retrieve locked node %s" % node_name
200
      for attr in self._NFLAGS:
201
        if getattr(self.op, attr) is None:
202
          setattr(self.op, attr, getattr(existing_node_info, attr))
203
    else:
204
      for attr in self._NFLAGS:
205
        if getattr(self.op, attr) is None:
206
          setattr(self.op, attr, True)
207

    
208
    if self.op.readd and not self.op.vm_capable:
209
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
210
      if pri or sec:
211
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
212
                                   " flag set to false, but it already holds"
213
                                   " instances" % node_name,
214
                                   errors.ECODE_STATE)
215

    
216
    # check that the type of the node (single versus dual homed) is the
217
    # same as for the master
218
    myself = self.cfg.GetMasterNodeInfo()
219
    master_singlehomed = myself.secondary_ip == myself.primary_ip
220
    newbie_singlehomed = secondary_ip == self.op.primary_ip
221
    if master_singlehomed != newbie_singlehomed:
222
      if master_singlehomed:
223
        raise errors.OpPrereqError("The master has no secondary ip but the"
224
                                   " new node has one",
225
                                   errors.ECODE_INVAL)
226
      else:
227
        raise errors.OpPrereqError("The master has a secondary ip but the"
228
                                   " new node doesn't have one",
229
                                   errors.ECODE_INVAL)
230

    
231
    # checks reachability
232
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
233
      raise errors.OpPrereqError("Node not reachable by ping",
234
                                 errors.ECODE_ENVIRON)
235

    
236
    if not newbie_singlehomed:
237
      # check reachability from my secondary ip to newbie's secondary ip
238
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
239
                              source=myself.secondary_ip):
240
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
241
                                   " based ping to node daemon port",
242
                                   errors.ECODE_ENVIRON)
243

    
244
    if self.op.readd:
245
      exceptions = [existing_node_info.uuid]
246
    else:
247
      exceptions = []
248

    
249
    if self.op.master_capable:
250
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
251
    else:
252
      self.master_candidate = False
253

    
254
    node_group = self.cfg.LookupNodeGroup(self.op.group)
255

    
256
    if self.op.readd:
257
      self.new_node = existing_node_info
258
    else:
259
      self.new_node = objects.Node(name=node_name,
260
                                   primary_ip=self.op.primary_ip,
261
                                   secondary_ip=secondary_ip,
262
                                   master_candidate=self.master_candidate,
263
                                   offline=False, drained=False,
264
                                   group=node_group, ndparams={})
265

    
266
    if self.op.ndparams:
267
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
268
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
269
                           "node", "cluster or group")
270

    
271
    if self.op.hv_state:
272
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
273

    
274
    if self.op.disk_state:
275
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
276

    
277
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
278
    #       it a property on the base class.
279
    rpcrunner = rpc.DnsOnlyRunner()
280
    result = rpcrunner.call_version([node_name])[node_name]
281
    result.Raise("Can't get version information from node %s" % node_name)
282
    if constants.PROTOCOL_VERSION == result.payload:
283
      logging.info("Communication to node %s fine, sw version %s match",
284
                   node_name, result.payload)
285
    else:
286
      raise errors.OpPrereqError("Version mismatch master version %s,"
287
                                 " node version %s" %
288
                                 (constants.PROTOCOL_VERSION, result.payload),
289
                                 errors.ECODE_ENVIRON)
290

    
291
    vg_name = self.cfg.GetVGName()
292
    if vg_name is not None:
293
      vparams = {constants.NV_PVLIST: [vg_name]}
294
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
295
      cname = self.cfg.GetClusterName()
296
      result = rpcrunner.call_node_verify_light(
297
          [node_name], vparams, cname,
298
          self.cfg.GetClusterInfo().hvparams,
299
          {node_name: node_group},
300
          self.cfg.GetAllNodeGroupsInfoDict()
301
        )[node_name]
302
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
303
      if errmsgs:
304
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
305
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
306

    
307
  def _InitOpenVSwitch(self):
308
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
309
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
310

    
311
    ovs = filled_ndparams.get(constants.ND_OVS, None)
312
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
313
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
314

    
315
    if ovs:
316
      if not ovs_link:
317
        self.LogInfo("No physical interface for OpenvSwitch was given."
318
                     " OpenvSwitch will not have an outside connection. This"
319
                     " might not be what you want.")
320

    
321
      result = self.rpc.call_node_configure_ovs(
322
                 self.new_node.name, ovs_name, ovs_link)
323
      result.Raise("Failed to initialize OpenVSwitch on new node")
324

    
325
  def Exec(self, feedback_fn):
326
    """Adds the new node to the cluster.
327

328
    """
329
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
330
      "Not owning BGL"
331

    
332
    # We adding a new node so we assume it's powered
333
    self.new_node.powered = True
334

    
335
    # for re-adds, reset the offline/drained/master-candidate flags;
336
    # we need to reset here, otherwise offline would prevent RPC calls
337
    # later in the procedure; this also means that if the re-add
338
    # fails, we are left with a non-offlined, broken node
339
    if self.op.readd:
340
      self.new_node.offline = False
341
      self.new_node.drained = False
342
      self.LogInfo("Readding a node, the offline/drained flags were reset")
343
      # if we demote the node, we do cleanup later in the procedure
344
      self.new_node.master_candidate = self.master_candidate
345
      if self.changed_primary_ip:
346
        self.new_node.primary_ip = self.op.primary_ip
347

    
348
    # copy the master/vm_capable flags
349
    for attr in self._NFLAGS:
350
      setattr(self.new_node, attr, getattr(self.op, attr))
351

    
352
    # notify the user about any possible mc promotion
353
    if self.new_node.master_candidate:
354
      self.LogInfo("Node will be a master candidate")
355

    
356
    if self.op.ndparams:
357
      self.new_node.ndparams = self.op.ndparams
358
    else:
359
      self.new_node.ndparams = {}
360

    
361
    if self.op.hv_state:
362
      self.new_node.hv_state_static = self.new_hv_state
363

    
364
    if self.op.disk_state:
365
      self.new_node.disk_state_static = self.new_disk_state
366

    
367
    # Add node to our /etc/hosts, and add key to known_hosts
368
    if self.cfg.GetClusterInfo().modify_etc_hosts:
369
      master_node = self.cfg.GetMasterNode()
370
      result = self.rpc.call_etc_hosts_modify(
371
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
372
                 self.hostname.ip)
373
      result.Raise("Can't update hosts file with new host data")
374

    
375
    if self.new_node.secondary_ip != self.new_node.primary_ip:
376
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
377
                               False)
378

    
379
    node_verifier_uuids = [self.cfg.GetMasterNode()]
380
    node_verify_param = {
381
      constants.NV_NODELIST: ([self.new_node.name], {}),
382
      # TODO: do a node-net-test as well?
383
    }
384

    
385
    result = self.rpc.call_node_verify(
386
               node_verifier_uuids, node_verify_param,
387
               self.cfg.GetClusterName(),
388
               self.cfg.GetClusterInfo().hvparams,
389
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
390
               self.cfg.GetAllNodeGroupsInfoDict()
391
               )
392
    for verifier in node_verifier_uuids:
393
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
394
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
395
      if nl_payload:
396
        for failed in nl_payload:
397
          feedback_fn("ssh/hostname verification failed"
398
                      " (checking from %s): %s" %
399
                      (verifier, nl_payload[failed]))
400
        raise errors.OpExecError("ssh/hostname verification failed")
401

    
402
    self._InitOpenVSwitch()
403

    
404
    if self.op.readd:
405
      self.context.ReaddNode(self.new_node)
406
      RedistributeAncillaryFiles(self)
407
      # make sure we redistribute the config
408
      self.cfg.Update(self.new_node, feedback_fn)
409
      # and make sure the new node will not have old files around
410
      if not self.new_node.master_candidate:
411
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
412
        result.Warn("Node failed to demote itself from master candidate status",
413
                    self.LogWarning)
414
    else:
415
      self.context.AddNode(self.new_node, self.proc.GetECId())
416
      RedistributeAncillaryFiles(self)
417

    
418
    cluster = self.cfg.GetClusterInfo()
419
    # We create a new certificate even if the node is readded
420
    digest = CreateNewClientCert(self, self.new_node.uuid)
421
    if self.new_node.master_candidate:
422
      utils.AddNodeToCandidateCerts(self.new_node.uuid, digest,
423
                                    cluster.candidate_certs)
424
      self.cfg.Update(cluster, feedback_fn)
425
    else:
426
      if self.new_node.uuid in cluster.candidate_certs:
427
        utils.RemoveNodeFromCandidateCerts(self.new_node.uuid,
428
                                           cluster.candidate_certs)
429
        self.cfg.Update(cluster, feedback_fn)
430

    
431

    
432
class LUNodeSetParams(LogicalUnit):
433
  """Modifies the parameters of a node.
434

435
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
436
      to the node role (as _ROLE_*)
437
  @cvar _R2F: a dictionary from node role to tuples of flags
438
  @cvar _FLAGS: a list of attribute names corresponding to the flags
439

440
  """
441
  HPATH = "node-modify"
442
  HTYPE = constants.HTYPE_NODE
443
  REQ_BGL = False
444
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
445
  _F2R = {
446
    (True, False, False): _ROLE_CANDIDATE,
447
    (False, True, False): _ROLE_DRAINED,
448
    (False, False, True): _ROLE_OFFLINE,
449
    (False, False, False): _ROLE_REGULAR,
450
    }
451
  _R2F = dict((v, k) for k, v in _F2R.items())
452
  _FLAGS = ["master_candidate", "drained", "offline"]
453

    
454
  def CheckArguments(self):
455
    (self.op.node_uuid, self.op.node_name) = \
456
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
457
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
458
                self.op.master_capable, self.op.vm_capable,
459
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
460
                self.op.disk_state]
461
    if all_mods.count(None) == len(all_mods):
462
      raise errors.OpPrereqError("Please pass at least one modification",
463
                                 errors.ECODE_INVAL)
464
    if all_mods.count(True) > 1:
465
      raise errors.OpPrereqError("Can't set the node into more than one"
466
                                 " state at the same time",
467
                                 errors.ECODE_INVAL)
468

    
469
    # Boolean value that tells us whether we might be demoting from MC
470
    self.might_demote = (self.op.master_candidate is False or
471
                         self.op.offline is True or
472
                         self.op.drained is True or
473
                         self.op.master_capable is False)
474

    
475
    if self.op.secondary_ip:
476
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
477
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
478
                                   " address" % self.op.secondary_ip,
479
                                   errors.ECODE_INVAL)
480

    
481
    self.lock_all = self.op.auto_promote and self.might_demote
482
    self.lock_instances = self.op.secondary_ip is not None
483

    
484
  def _InstanceFilter(self, instance):
485
    """Filter for getting affected instances.
486

487
    """
488
    return (instance.disk_template in constants.DTS_INT_MIRROR and
489
            self.op.node_uuid in instance.all_nodes)
490

    
491
  def ExpandNames(self):
492
    if self.lock_all:
493
      self.needed_locks = {
494
        locking.LEVEL_NODE: locking.ALL_SET,
495

    
496
        # Block allocations when all nodes are locked
497
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
498
        }
499
    else:
500
      self.needed_locks = {
501
        locking.LEVEL_NODE: self.op.node_uuid,
502
        }
503

    
504
    # Since modifying a node can have severe effects on currently running
505
    # operations the resource lock is at least acquired in shared mode
506
    self.needed_locks[locking.LEVEL_NODE_RES] = \
507
      self.needed_locks[locking.LEVEL_NODE]
508

    
509
    # Get all locks except nodes in shared mode; they are not used for anything
510
    # but read-only access
511
    self.share_locks = ShareAll()
512
    self.share_locks[locking.LEVEL_NODE] = 0
513
    self.share_locks[locking.LEVEL_NODE_RES] = 0
514
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
515

    
516
    if self.lock_instances:
517
      self.needed_locks[locking.LEVEL_INSTANCE] = \
518
        self.cfg.GetInstanceNames(
519
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
520

    
521
  def BuildHooksEnv(self):
522
    """Build hooks env.
523

524
    This runs on the master node.
525

526
    """
527
    return {
528
      "OP_TARGET": self.op.node_name,
529
      "MASTER_CANDIDATE": str(self.op.master_candidate),
530
      "OFFLINE": str(self.op.offline),
531
      "DRAINED": str(self.op.drained),
532
      "MASTER_CAPABLE": str(self.op.master_capable),
533
      "VM_CAPABLE": str(self.op.vm_capable),
534
      }
535

    
536
  def BuildHooksNodes(self):
537
    """Build hooks nodes.
538

539
    """
540
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
541
    return (nl, nl)
542

    
543
  def CheckPrereq(self):
544
    """Check prerequisites.
545

546
    This only checks the instance list against the existing names.
547

548
    """
549
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
550
    if self.lock_instances:
551
      affected_instances = \
552
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
553

    
554
      # Verify instance locks
555
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
556
      wanted_instance_names = frozenset([inst.name for inst in
557
                                         affected_instances.values()])
558
      if wanted_instance_names - owned_instance_names:
559
        raise errors.OpPrereqError("Instances affected by changing node %s's"
560
                                   " secondary IP address have changed since"
561
                                   " locks were acquired, wanted '%s', have"
562
                                   " '%s'; retry the operation" %
563
                                   (node.name,
564
                                    utils.CommaJoin(wanted_instance_names),
565
                                    utils.CommaJoin(owned_instance_names)),
566
                                   errors.ECODE_STATE)
567
    else:
568
      affected_instances = None
569

    
570
    if (self.op.master_candidate is not None or
571
        self.op.drained is not None or
572
        self.op.offline is not None):
573
      # we can't change the master's node flags
574
      if node.uuid == self.cfg.GetMasterNode():
575
        raise errors.OpPrereqError("The master role can be changed"
576
                                   " only via master-failover",
577
                                   errors.ECODE_INVAL)
578

    
579
    if self.op.master_candidate and not node.master_capable:
580
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
581
                                 " it a master candidate" % node.name,
582
                                 errors.ECODE_STATE)
583

    
584
    if self.op.vm_capable is False:
585
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
586
      if ipri or isec:
587
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
588
                                   " the vm_capable flag" % node.name,
589
                                   errors.ECODE_STATE)
590

    
591
    if node.master_candidate and self.might_demote and not self.lock_all:
592
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
593
      # check if after removing the current node, we're missing master
594
      # candidates
595
      (mc_remaining, mc_should, _) = \
596
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
597
      if mc_remaining < mc_should:
598
        raise errors.OpPrereqError("Not enough master candidates, please"
599
                                   " pass auto promote option to allow"
600
                                   " promotion (--auto-promote or RAPI"
601
                                   " auto_promote=True)", errors.ECODE_STATE)
602

    
603
    self.old_flags = old_flags = (node.master_candidate,
604
                                  node.drained, node.offline)
605
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
606
    self.old_role = old_role = self._F2R[old_flags]
607

    
608
    # Check for ineffective changes
609
    for attr in self._FLAGS:
610
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
611
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
612
        setattr(self.op, attr, None)
613

    
614
    # Past this point, any flag change to False means a transition
615
    # away from the respective state, as only real changes are kept
616

    
617
    # TODO: We might query the real power state if it supports OOB
618
    if SupportsOob(self.cfg, node):
619
      if self.op.offline is False and not (node.powered or
620
                                           self.op.powered is True):
621
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
622
                                    " offline status can be reset") %
623
                                   self.op.node_name, errors.ECODE_STATE)
624
    elif self.op.powered is not None:
625
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
626
                                  " as it does not support out-of-band"
627
                                  " handling") % self.op.node_name,
628
                                 errors.ECODE_STATE)
629

    
630
    # If we're being deofflined/drained, we'll MC ourself if needed
631
    if (self.op.drained is False or self.op.offline is False or
632
        (self.op.master_capable and not node.master_capable)):
633
      if _DecideSelfPromotion(self):
634
        self.op.master_candidate = True
635
        self.LogInfo("Auto-promoting node to master candidate")
636

    
637
    # If we're no longer master capable, we'll demote ourselves from MC
638
    if self.op.master_capable is False and node.master_candidate:
639
      self.LogInfo("Demoting from master candidate")
640
      self.op.master_candidate = False
641

    
642
    # Compute new role
643
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
644
    if self.op.master_candidate:
645
      new_role = self._ROLE_CANDIDATE
646
    elif self.op.drained:
647
      new_role = self._ROLE_DRAINED
648
    elif self.op.offline:
649
      new_role = self._ROLE_OFFLINE
650
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
651
      # False is still in new flags, which means we're un-setting (the
652
      # only) True flag
653
      new_role = self._ROLE_REGULAR
654
    else: # no new flags, nothing, keep old role
655
      new_role = old_role
656

    
657
    self.new_role = new_role
658

    
659
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
660
      # Trying to transition out of offline status
661
      result = self.rpc.call_version([node.uuid])[node.uuid]
662
      if result.fail_msg:
663
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
664
                                   " to report its version: %s" %
665
                                   (node.name, result.fail_msg),
666
                                   errors.ECODE_STATE)
667
      else:
668
        self.LogWarning("Transitioning node from offline to online state"
669
                        " without using re-add. Please make sure the node"
670
                        " is healthy!")
671

    
672
    # When changing the secondary ip, verify if this is a single-homed to
673
    # multi-homed transition or vice versa, and apply the relevant
674
    # restrictions.
675
    if self.op.secondary_ip:
676
      # Ok even without locking, because this can't be changed by any LU
677
      master = self.cfg.GetMasterNodeInfo()
678
      master_singlehomed = master.secondary_ip == master.primary_ip
679
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
680
        if self.op.force and node.uuid == master.uuid:
681
          self.LogWarning("Transitioning from single-homed to multi-homed"
682
                          " cluster; all nodes will require a secondary IP"
683
                          " address")
684
        else:
685
          raise errors.OpPrereqError("Changing the secondary ip on a"
686
                                     " single-homed cluster requires the"
687
                                     " --force option to be passed, and the"
688
                                     " target node to be the master",
689
                                     errors.ECODE_INVAL)
690
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
691
        if self.op.force and node.uuid == master.uuid:
692
          self.LogWarning("Transitioning from multi-homed to single-homed"
693
                          " cluster; secondary IP addresses will have to be"
694
                          " removed")
695
        else:
696
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
697
                                     " same as the primary IP on a multi-homed"
698
                                     " cluster, unless the --force option is"
699
                                     " passed, and the target node is the"
700
                                     " master", errors.ECODE_INVAL)
701

    
702
      assert not (set([inst.name for inst in affected_instances.values()]) -
703
                  self.owned_locks(locking.LEVEL_INSTANCE))
704

    
705
      if node.offline:
706
        if affected_instances:
707
          msg = ("Cannot change secondary IP address: offline node has"
708
                 " instances (%s) configured to use it" %
709
                 utils.CommaJoin(
710
                   [inst.name for inst in affected_instances.values()]))
711
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
712
      else:
713
        # On online nodes, check that no instances are running, and that
714
        # the node has the new ip and we can reach it.
715
        for instance in affected_instances.values():
716
          CheckInstanceState(self, instance, INSTANCE_DOWN,
717
                             msg="cannot change secondary ip")
718

    
719
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
720
        if master.uuid != node.uuid:
721
          # check reachability from master secondary ip to new secondary ip
722
          if not netutils.TcpPing(self.op.secondary_ip,
723
                                  constants.DEFAULT_NODED_PORT,
724
                                  source=master.secondary_ip):
725
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
726
                                       " based ping to node daemon port",
727
                                       errors.ECODE_ENVIRON)
728

    
729
    if self.op.ndparams:
730
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
731
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
732
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
733
                           "node", "cluster or group")
734
      self.new_ndparams = new_ndparams
735

    
736
    if self.op.hv_state:
737
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
738
                                                node.hv_state_static)
739

    
740
    if self.op.disk_state:
741
      self.new_disk_state = \
742
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
743

    
744
  def Exec(self, feedback_fn):
745
    """Modifies a node.
746

747
    """
748
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
749
    result = []
750

    
751
    if self.op.ndparams:
752
      node.ndparams = self.new_ndparams
753

    
754
    if self.op.powered is not None:
755
      node.powered = self.op.powered
756

    
757
    if self.op.hv_state:
758
      node.hv_state_static = self.new_hv_state
759

    
760
    if self.op.disk_state:
761
      node.disk_state_static = self.new_disk_state
762

    
763
    for attr in ["master_capable", "vm_capable"]:
764
      val = getattr(self.op, attr)
765
      if val is not None:
766
        setattr(node, attr, val)
767
        result.append((attr, str(val)))
768

    
769
    if self.new_role != self.old_role:
770
      # Tell the node to demote itself, if no longer MC and not offline
771
      if self.old_role == self._ROLE_CANDIDATE and \
772
          self.new_role != self._ROLE_OFFLINE:
773
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
774
        if msg:
775
          self.LogWarning("Node failed to demote itself: %s", msg)
776

    
777
      new_flags = self._R2F[self.new_role]
778
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
779
        if of != nf:
780
          result.append((desc, str(nf)))
781
      (node.master_candidate, node.drained, node.offline) = new_flags
782

    
783
      # we locked all nodes, we adjust the CP before updating this node
784
      if self.lock_all:
785
        AdjustCandidatePool(self, [node.uuid], feedback_fn)
786

    
787
      cluster = self.cfg.GetClusterInfo()
788
      # if node gets promoted, grant RPC priviledges
789
      if self.new_role == self._ROLE_CANDIDATE:
790
        AddNodeCertToCandidateCerts(self, node.uuid, cluster)
791
      # if node is demoted, revoke RPC priviledges
792
      if self.old_role == self._ROLE_CANDIDATE:
793
        RemoveNodeCertFromCandidateCerts(node.uuid, cluster)
794

    
795
    if self.op.secondary_ip:
796
      node.secondary_ip = self.op.secondary_ip
797
      result.append(("secondary_ip", self.op.secondary_ip))
798

    
799
    # this will trigger configuration file update, if needed
800
    self.cfg.Update(node, feedback_fn)
801

    
802
    # this will trigger job queue propagation or cleanup if the mc
803
    # flag changed
804
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
805
      self.context.ReaddNode(node)
806

    
807
    return result
808

    
809

    
810
class LUNodePowercycle(NoHooksLU):
811
  """Powercycles a node.
812

813
  """
814
  REQ_BGL = False
815

    
816
  def CheckArguments(self):
817
    (self.op.node_uuid, self.op.node_name) = \
818
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
819

    
820
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
821
      raise errors.OpPrereqError("The node is the master and the force"
822
                                 " parameter was not set",
823
                                 errors.ECODE_INVAL)
824

    
825
  def ExpandNames(self):
826
    """Locking for PowercycleNode.
827

828
    This is a last-resort option and shouldn't block on other
829
    jobs. Therefore, we grab no locks.
830

831
    """
832
    self.needed_locks = {}
833

    
834
  def Exec(self, feedback_fn):
835
    """Reboots a node.
836

837
    """
838
    default_hypervisor = self.cfg.GetHypervisorType()
839
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
840
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
841
                                           default_hypervisor,
842
                                           hvparams)
843
    result.Raise("Failed to schedule the reboot")
844
    return result.payload
845

    
846

    
847
def _GetNodeInstancesInner(cfg, fn):
848
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
849

    
850

    
851
def _GetNodePrimaryInstances(cfg, node_uuid):
852
  """Returns primary instances on a node.
853

854
  """
855
  return _GetNodeInstancesInner(cfg,
856
                                lambda inst: node_uuid == inst.primary_node)
857

    
858

    
859
def _GetNodeSecondaryInstances(cfg, node_uuid):
860
  """Returns secondary instances on a node.
861

862
  """
863
  return _GetNodeInstancesInner(cfg,
864
                                lambda inst: node_uuid in inst.secondary_nodes)
865

    
866

    
867
def _GetNodeInstances(cfg, node_uuid):
868
  """Returns a list of all primary and secondary instances on a node.
869

870
  """
871

    
872
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
873

    
874

    
875
class LUNodeEvacuate(NoHooksLU):
876
  """Evacuates instances off a list of nodes.
877

878
  """
879
  REQ_BGL = False
880

    
881
  def CheckArguments(self):
882
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
883

    
884
  def ExpandNames(self):
885
    (self.op.node_uuid, self.op.node_name) = \
886
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
887

    
888
    if self.op.remote_node is not None:
889
      (self.op.remote_node_uuid, self.op.remote_node) = \
890
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
891
                              self.op.remote_node)
892
      assert self.op.remote_node
893

    
894
      if self.op.node_uuid == self.op.remote_node_uuid:
895
        raise errors.OpPrereqError("Can not use evacuated node as a new"
896
                                   " secondary node", errors.ECODE_INVAL)
897

    
898
      if self.op.mode != constants.NODE_EVAC_SEC:
899
        raise errors.OpPrereqError("Without the use of an iallocator only"
900
                                   " secondary instances can be evacuated",
901
                                   errors.ECODE_INVAL)
902

    
903
    # Declare locks
904
    self.share_locks = ShareAll()
905
    self.needed_locks = {
906
      locking.LEVEL_INSTANCE: [],
907
      locking.LEVEL_NODEGROUP: [],
908
      locking.LEVEL_NODE: [],
909
      }
910

    
911
    # Determine nodes (via group) optimistically, needs verification once locks
912
    # have been acquired
913
    self.lock_nodes = self._DetermineNodes()
914

    
915
  def _DetermineNodes(self):
916
    """Gets the list of node UUIDs to operate on.
917

918
    """
919
    if self.op.remote_node is None:
920
      # Iallocator will choose any node(s) in the same group
921
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
922
    else:
923
      group_nodes = frozenset([self.op.remote_node_uuid])
924

    
925
    # Determine nodes to be locked
926
    return set([self.op.node_uuid]) | group_nodes
927

    
928
  def _DetermineInstances(self):
929
    """Builds list of instances to operate on.
930

931
    """
932
    assert self.op.mode in constants.NODE_EVAC_MODES
933

    
934
    if self.op.mode == constants.NODE_EVAC_PRI:
935
      # Primary instances only
936
      inst_fn = _GetNodePrimaryInstances
937
      assert self.op.remote_node is None, \
938
        "Evacuating primary instances requires iallocator"
939
    elif self.op.mode == constants.NODE_EVAC_SEC:
940
      # Secondary instances only
941
      inst_fn = _GetNodeSecondaryInstances
942
    else:
943
      # All instances
944
      assert self.op.mode == constants.NODE_EVAC_ALL
945
      inst_fn = _GetNodeInstances
946
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
947
      # per instance
948
      raise errors.OpPrereqError("Due to an issue with the iallocator"
949
                                 " interface it is not possible to evacuate"
950
                                 " all instances at once; specify explicitly"
951
                                 " whether to evacuate primary or secondary"
952
                                 " instances",
953
                                 errors.ECODE_INVAL)
954

    
955
    return inst_fn(self.cfg, self.op.node_uuid)
956

    
957
  def DeclareLocks(self, level):
958
    if level == locking.LEVEL_INSTANCE:
959
      # Lock instances optimistically, needs verification once node and group
960
      # locks have been acquired
961
      self.needed_locks[locking.LEVEL_INSTANCE] = \
962
        set(i.name for i in self._DetermineInstances())
963

    
964
    elif level == locking.LEVEL_NODEGROUP:
965
      # Lock node groups for all potential target nodes optimistically, needs
966
      # verification once nodes have been acquired
967
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
968
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
969

    
970
    elif level == locking.LEVEL_NODE:
971
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
972

    
973
  def CheckPrereq(self):
974
    # Verify locks
975
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
976
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
977
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
978

    
979
    need_nodes = self._DetermineNodes()
980

    
981
    if not owned_nodes.issuperset(need_nodes):
982
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
983
                                 " locks were acquired, current nodes are"
984
                                 " are '%s', used to be '%s'; retry the"
985
                                 " operation" %
986
                                 (self.op.node_name,
987
                                  utils.CommaJoin(need_nodes),
988
                                  utils.CommaJoin(owned_nodes)),
989
                                 errors.ECODE_STATE)
990

    
991
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
992
    if owned_groups != wanted_groups:
993
      raise errors.OpExecError("Node groups changed since locks were acquired,"
994
                               " current groups are '%s', used to be '%s';"
995
                               " retry the operation" %
996
                               (utils.CommaJoin(wanted_groups),
997
                                utils.CommaJoin(owned_groups)))
998

    
999
    # Determine affected instances
1000
    self.instances = self._DetermineInstances()
1001
    self.instance_names = [i.name for i in self.instances]
1002

    
1003
    if set(self.instance_names) != owned_instance_names:
1004
      raise errors.OpExecError("Instances on node '%s' changed since locks"
1005
                               " were acquired, current instances are '%s',"
1006
                               " used to be '%s'; retry the operation" %
1007
                               (self.op.node_name,
1008
                                utils.CommaJoin(self.instance_names),
1009
                                utils.CommaJoin(owned_instance_names)))
1010

    
1011
    if self.instance_names:
1012
      self.LogInfo("Evacuating instances from node '%s': %s",
1013
                   self.op.node_name,
1014
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1015
    else:
1016
      self.LogInfo("No instances to evacuate from node '%s'",
1017
                   self.op.node_name)
1018

    
1019
    if self.op.remote_node is not None:
1020
      for i in self.instances:
1021
        if i.primary_node == self.op.remote_node_uuid:
1022
          raise errors.OpPrereqError("Node %s is the primary node of"
1023
                                     " instance %s, cannot use it as"
1024
                                     " secondary" %
1025
                                     (self.op.remote_node, i.name),
1026
                                     errors.ECODE_INVAL)
1027

    
1028
  def Exec(self, feedback_fn):
1029
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1030

    
1031
    if not self.instance_names:
1032
      # No instances to evacuate
1033
      jobs = []
1034

    
1035
    elif self.op.iallocator is not None:
1036
      # TODO: Implement relocation to other group
1037
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1038
                                     instances=list(self.instance_names))
1039
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1040

    
1041
      ial.Run(self.op.iallocator)
1042

    
1043
      if not ial.success:
1044
        raise errors.OpPrereqError("Can't compute node evacuation using"
1045
                                   " iallocator '%s': %s" %
1046
                                   (self.op.iallocator, ial.info),
1047
                                   errors.ECODE_NORES)
1048

    
1049
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1050

    
1051
    elif self.op.remote_node is not None:
1052
      assert self.op.mode == constants.NODE_EVAC_SEC
1053
      jobs = [
1054
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1055
                                        remote_node=self.op.remote_node,
1056
                                        disks=[],
1057
                                        mode=constants.REPLACE_DISK_CHG,
1058
                                        early_release=self.op.early_release)]
1059
        for instance_name in self.instance_names]
1060

    
1061
    else:
1062
      raise errors.ProgrammerError("No iallocator or remote node")
1063

    
1064
    return ResultWithJobs(jobs)
1065

    
1066

    
1067
class LUNodeMigrate(LogicalUnit):
1068
  """Migrate all instances from a node.
1069

1070
  """
1071
  HPATH = "node-migrate"
1072
  HTYPE = constants.HTYPE_NODE
1073
  REQ_BGL = False
1074

    
1075
  def CheckArguments(self):
1076
    pass
1077

    
1078
  def ExpandNames(self):
1079
    (self.op.node_uuid, self.op.node_name) = \
1080
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1081

    
1082
    self.share_locks = ShareAll()
1083
    self.needed_locks = {
1084
      locking.LEVEL_NODE: [self.op.node_uuid],
1085
      }
1086

    
1087
  def BuildHooksEnv(self):
1088
    """Build hooks env.
1089

1090
    This runs on the master, the primary and all the secondaries.
1091

1092
    """
1093
    return {
1094
      "NODE_NAME": self.op.node_name,
1095
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1096
      }
1097

    
1098
  def BuildHooksNodes(self):
1099
    """Build hooks nodes.
1100

1101
    """
1102
    nl = [self.cfg.GetMasterNode()]
1103
    return (nl, nl)
1104

    
1105
  def CheckPrereq(self):
1106
    pass
1107

    
1108
  def Exec(self, feedback_fn):
1109
    # Prepare jobs for migration instances
1110
    jobs = [
1111
      [opcodes.OpInstanceMigrate(
1112
        instance_name=inst.name,
1113
        mode=self.op.mode,
1114
        live=self.op.live,
1115
        iallocator=self.op.iallocator,
1116
        target_node=self.op.target_node,
1117
        allow_runtime_changes=self.op.allow_runtime_changes,
1118
        ignore_ipolicy=self.op.ignore_ipolicy)]
1119
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1120

    
1121
    # TODO: Run iallocator in this opcode and pass correct placement options to
1122
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1123
    # running the iallocator and the actual migration, a good consistency model
1124
    # will have to be found.
1125

    
1126
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1127
            frozenset([self.op.node_uuid]))
1128

    
1129
    return ResultWithJobs(jobs)
1130

    
1131

    
1132
def _GetStorageTypeArgs(cfg, storage_type):
1133
  """Returns the arguments for a storage type.
1134

1135
  """
1136
  # Special case for file storage
1137

    
1138
  if storage_type == constants.ST_FILE:
1139
    return [[cfg.GetFileStorageDir()]]
1140
  elif storage_type == constants.ST_SHARED_FILE:
1141
    dts = cfg.GetClusterInfo().enabled_disk_templates
1142
    paths = []
1143
    if constants.DT_SHARED_FILE in dts:
1144
      paths.append(cfg.GetSharedFileStorageDir())
1145
    if constants.DT_GLUSTER in dts:
1146
      paths.append(cfg.GetGlusterStorageDir())
1147
    return [paths]
1148
  else:
1149
    return []
1150

    
1151

    
1152
class LUNodeModifyStorage(NoHooksLU):
1153
  """Logical unit for modifying a storage volume on a node.
1154

1155
  """
1156
  REQ_BGL = False
1157

    
1158
  def CheckArguments(self):
1159
    (self.op.node_uuid, self.op.node_name) = \
1160
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1161

    
1162
    storage_type = self.op.storage_type
1163

    
1164
    try:
1165
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1166
    except KeyError:
1167
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1168
                                 " modified" % storage_type,
1169
                                 errors.ECODE_INVAL)
1170

    
1171
    diff = set(self.op.changes.keys()) - modifiable
1172
    if diff:
1173
      raise errors.OpPrereqError("The following fields can not be modified for"
1174
                                 " storage units of type '%s': %r" %
1175
                                 (storage_type, list(diff)),
1176
                                 errors.ECODE_INVAL)
1177

    
1178
  def CheckPrereq(self):
1179
    """Check prerequisites.
1180

1181
    """
1182
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1183

    
1184
  def ExpandNames(self):
1185
    self.needed_locks = {
1186
      locking.LEVEL_NODE: self.op.node_uuid,
1187
      }
1188

    
1189
  def Exec(self, feedback_fn):
1190
    """Computes the list of nodes and their attributes.
1191

1192
    """
1193
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1194
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1195
                                          self.op.storage_type, st_args,
1196
                                          self.op.name, self.op.changes)
1197
    result.Raise("Failed to modify storage unit '%s' on %s" %
1198
                 (self.op.name, self.op.node_name))
1199

    
1200

    
1201
def _CheckOutputFields(fields, selected):
1202
  """Checks whether all selected fields are valid according to fields.
1203

1204
  @type fields: L{utils.FieldSet}
1205
  @param fields: fields set
1206
  @type selected: L{utils.FieldSet}
1207
  @param selected: fields set
1208

1209
  """
1210
  delta = fields.NonMatching(selected)
1211
  if delta:
1212
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1213
                               % ",".join(delta), errors.ECODE_INVAL)
1214

    
1215

    
1216
class LUNodeQueryvols(NoHooksLU):
1217
  """Logical unit for getting volumes on node(s).
1218

1219
  """
1220
  REQ_BGL = False
1221

    
1222
  def CheckArguments(self):
1223
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1224
                                      constants.VF_VG, constants.VF_NAME,
1225
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1226
                       self.op.output_fields)
1227

    
1228
  def ExpandNames(self):
1229
    self.share_locks = ShareAll()
1230

    
1231
    if self.op.nodes:
1232
      self.needed_locks = {
1233
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1234
        }
1235
    else:
1236
      self.needed_locks = {
1237
        locking.LEVEL_NODE: locking.ALL_SET,
1238
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1239
        }
1240

    
1241
  def Exec(self, feedback_fn):
1242
    """Computes the list of nodes and their attributes.
1243

1244
    """
1245
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1246
    volumes = self.rpc.call_node_volumes(node_uuids)
1247

    
1248
    ilist = self.cfg.GetAllInstancesInfo()
1249
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1250

    
1251
    output = []
1252
    for node_uuid in node_uuids:
1253
      nresult = volumes[node_uuid]
1254
      if nresult.offline:
1255
        continue
1256
      msg = nresult.fail_msg
1257
      if msg:
1258
        self.LogWarning("Can't compute volume data on node %s: %s",
1259
                        self.cfg.GetNodeName(node_uuid), msg)
1260
        continue
1261

    
1262
      node_vols = sorted(nresult.payload,
1263
                         key=operator.itemgetter(constants.VF_DEV))
1264

    
1265
      for vol in node_vols:
1266
        node_output = []
1267
        for field in self.op.output_fields:
1268
          if field == constants.VF_NODE:
1269
            val = self.cfg.GetNodeName(node_uuid)
1270
          elif field == constants.VF_PHYS:
1271
            val = vol[constants.VF_DEV]
1272
          elif field == constants.VF_VG:
1273
            val = vol[constants.VF_VG]
1274
          elif field == constants.VF_NAME:
1275
            val = vol[constants.VF_NAME]
1276
          elif field == constants.VF_SIZE:
1277
            val = int(float(vol[constants.VF_SIZE]))
1278
          elif field == constants.VF_INSTANCE:
1279
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1280
                                 vol[constants.VF_NAME]), None)
1281
            if inst is not None:
1282
              val = inst.name
1283
            else:
1284
              val = "-"
1285
          else:
1286
            raise errors.ParameterError(field)
1287
          node_output.append(str(val))
1288

    
1289
        output.append(node_output)
1290

    
1291
    return output
1292

    
1293

    
1294
class LUNodeQueryStorage(NoHooksLU):
1295
  """Logical unit for getting information on storage units on node(s).
1296

1297
  """
1298
  REQ_BGL = False
1299

    
1300
  def CheckArguments(self):
1301
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1302
                       self.op.output_fields)
1303

    
1304
  def ExpandNames(self):
1305
    self.share_locks = ShareAll()
1306

    
1307
    if self.op.nodes:
1308
      self.needed_locks = {
1309
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1310
        }
1311
    else:
1312
      self.needed_locks = {
1313
        locking.LEVEL_NODE: locking.ALL_SET,
1314
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1315
        }
1316

    
1317
  def _DetermineStorageType(self):
1318
    """Determines the default storage type of the cluster.
1319

1320
    """
1321
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1322
    default_storage_type = \
1323
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1324
    return default_storage_type
1325

    
1326
  def CheckPrereq(self):
1327
    """Check prerequisites.
1328

1329
    """
1330
    if self.op.storage_type:
1331
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1332
      self.storage_type = self.op.storage_type
1333
    else:
1334
      self.storage_type = self._DetermineStorageType()
1335
      supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1336
      if self.storage_type not in supported_storage_types:
1337
        raise errors.OpPrereqError(
1338
            "Storage reporting for storage type '%s' is not supported. Please"
1339
            " use the --storage-type option to specify one of the supported"
1340
            " storage types (%s) or set the default disk template to one that"
1341
            " supports storage reporting." %
1342
            (self.storage_type, utils.CommaJoin(supported_storage_types)))
1343

    
1344
  def Exec(self, feedback_fn):
1345
    """Computes the list of nodes and their attributes.
1346

1347
    """
1348
    if self.op.storage_type:
1349
      self.storage_type = self.op.storage_type
1350
    else:
1351
      self.storage_type = self._DetermineStorageType()
1352

    
1353
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1354

    
1355
    # Always get name to sort by
1356
    if constants.SF_NAME in self.op.output_fields:
1357
      fields = self.op.output_fields[:]
1358
    else:
1359
      fields = [constants.SF_NAME] + self.op.output_fields
1360

    
1361
    # Never ask for node or type as it's only known to the LU
1362
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1363
      while extra in fields:
1364
        fields.remove(extra)
1365

    
1366
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1367
    name_idx = field_idx[constants.SF_NAME]
1368

    
1369
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1370
    data = self.rpc.call_storage_list(self.node_uuids,
1371
                                      self.storage_type, st_args,
1372
                                      self.op.name, fields)
1373

    
1374
    result = []
1375

    
1376
    for node_uuid in utils.NiceSort(self.node_uuids):
1377
      node_name = self.cfg.GetNodeName(node_uuid)
1378
      nresult = data[node_uuid]
1379
      if nresult.offline:
1380
        continue
1381

    
1382
      msg = nresult.fail_msg
1383
      if msg:
1384
        self.LogWarning("Can't get storage data from node %s: %s",
1385
                        node_name, msg)
1386
        continue
1387

    
1388
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1389

    
1390
      for name in utils.NiceSort(rows.keys()):
1391
        row = rows[name]
1392

    
1393
        out = []
1394

    
1395
        for field in self.op.output_fields:
1396
          if field == constants.SF_NODE:
1397
            val = node_name
1398
          elif field == constants.SF_TYPE:
1399
            val = self.storage_type
1400
          elif field in field_idx:
1401
            val = row[field_idx[field]]
1402
          else:
1403
            raise errors.ParameterError(field)
1404

    
1405
          out.append(val)
1406

    
1407
        result.append(out)
1408

    
1409
    return result
1410

    
1411

    
1412
class LUNodeRemove(LogicalUnit):
1413
  """Logical unit for removing a node.
1414

1415
  """
1416
  HPATH = "node-remove"
1417
  HTYPE = constants.HTYPE_NODE
1418

    
1419
  def BuildHooksEnv(self):
1420
    """Build hooks env.
1421

1422
    """
1423
    return {
1424
      "OP_TARGET": self.op.node_name,
1425
      "NODE_NAME": self.op.node_name,
1426
      }
1427

    
1428
  def BuildHooksNodes(self):
1429
    """Build hooks nodes.
1430

1431
    This doesn't run on the target node in the pre phase as a failed
1432
    node would then be impossible to remove.
1433

1434
    """
1435
    all_nodes = self.cfg.GetNodeList()
1436
    try:
1437
      all_nodes.remove(self.op.node_uuid)
1438
    except ValueError:
1439
      pass
1440
    return (all_nodes, all_nodes)
1441

    
1442
  def CheckPrereq(self):
1443
    """Check prerequisites.
1444

1445
    This checks:
1446
     - the node exists in the configuration
1447
     - it does not have primary or secondary instances
1448
     - it's not the master
1449

1450
    Any errors are signaled by raising errors.OpPrereqError.
1451

1452
    """
1453
    (self.op.node_uuid, self.op.node_name) = \
1454
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1455
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1456
    assert node is not None
1457

    
1458
    masternode = self.cfg.GetMasterNode()
1459
    if node.uuid == masternode:
1460
      raise errors.OpPrereqError("Node is the master node, failover to another"
1461
                                 " node is required", errors.ECODE_INVAL)
1462

    
1463
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1464
      if node.uuid in instance.all_nodes:
1465
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1466
                                   " please remove first" % instance.name,
1467
                                   errors.ECODE_INVAL)
1468
    self.op.node_name = node.name
1469
    self.node = node
1470

    
1471
  def Exec(self, feedback_fn):
1472
    """Removes the node from the cluster.
1473

1474
    """
1475
    logging.info("Stopping the node daemon and removing configs from node %s",
1476
                 self.node.name)
1477

    
1478
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1479

    
1480
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1481
      "Not owning BGL"
1482

    
1483
    # Promote nodes to master candidate as needed
1484
    AdjustCandidatePool(self, [self.node.uuid], feedback_fn)
1485
    self.context.RemoveNode(self.node)
1486

    
1487
    # Run post hooks on the node before it's removed
1488
    RunPostHook(self, self.node.name)
1489

    
1490
    # we have to call this by name rather than by UUID, as the node is no longer
1491
    # in the config
1492
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1493
    msg = result.fail_msg
1494
    if msg:
1495
      self.LogWarning("Errors encountered on the remote node while leaving"
1496
                      " the cluster: %s", msg)
1497

    
1498
    cluster = self.cfg.GetClusterInfo()
1499

    
1500
    # Remove node from candidate certificate list
1501
    if self.node.master_candidate:
1502
      utils.RemoveNodeFromCandidateCerts(self.node.uuid,
1503
                                         cluster.candidate_certs)
1504
      self.cfg.Update(cluster, feedback_fn)
1505

    
1506
    # Remove node from our /etc/hosts
1507
    if cluster.modify_etc_hosts:
1508
      master_node_uuid = self.cfg.GetMasterNode()
1509
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1510
                                              constants.ETC_HOSTS_REMOVE,
1511
                                              self.node.name, None)
1512
      result.Raise("Can't update hosts file with new host data")
1513
      RedistributeAncillaryFiles(self)
1514

    
1515

    
1516
class LURepairNodeStorage(NoHooksLU):
1517
  """Repairs the volume group on a node.
1518

1519
  """
1520
  REQ_BGL = False
1521

    
1522
  def CheckArguments(self):
1523
    (self.op.node_uuid, self.op.node_name) = \
1524
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1525

    
1526
    storage_type = self.op.storage_type
1527

    
1528
    if (constants.SO_FIX_CONSISTENCY not in
1529
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1530
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1531
                                 " repaired" % storage_type,
1532
                                 errors.ECODE_INVAL)
1533

    
1534
  def ExpandNames(self):
1535
    self.needed_locks = {
1536
      locking.LEVEL_NODE: [self.op.node_uuid],
1537
      }
1538

    
1539
  def _CheckFaultyDisks(self, instance, node_uuid):
1540
    """Ensure faulty disks abort the opcode or at least warn."""
1541
    try:
1542
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1543
                                 node_uuid, True):
1544
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1545
                                   " node '%s'" %
1546
                                   (instance.name,
1547
                                    self.cfg.GetNodeName(node_uuid)),
1548
                                   errors.ECODE_STATE)
1549
    except errors.OpPrereqError, err:
1550
      if self.op.ignore_consistency:
1551
        self.LogWarning(str(err.args[0]))
1552
      else:
1553
        raise
1554

    
1555
  def CheckPrereq(self):
1556
    """Check prerequisites.
1557

1558
    """
1559
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1560

    
1561
    # Check whether any instance on this node has faulty disks
1562
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1563
      if not inst.disks_active:
1564
        continue
1565
      check_nodes = set(inst.all_nodes)
1566
      check_nodes.discard(self.op.node_uuid)
1567
      for inst_node_uuid in check_nodes:
1568
        self._CheckFaultyDisks(inst, inst_node_uuid)
1569

    
1570
  def Exec(self, feedback_fn):
1571
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1572
                (self.op.name, self.op.node_name))
1573

    
1574
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1575
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1576
                                           self.op.storage_type, st_args,
1577
                                           self.op.name,
1578
                                           constants.SO_FIX_CONSISTENCY)
1579
    result.Raise("Failed to repair storage unit '%s' on %s" %
1580
                 (self.op.name, self.op.node_name))