Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 4f758333

History | View | Annotate | Download (58 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
import ganeti.rpc.node as rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
46
  AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts
47

    
48

    
49
def _DecideSelfPromotion(lu, exceptions=None):
50
  """Decide whether I should promote myself as a master candidate.
51

52
  """
53
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
54
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
55
  # the new node will increase mc_max with one, so:
56
  mc_should = min(mc_should + 1, cp_size)
57
  return mc_now < mc_should
58

    
59

    
60
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
61
  """Ensure that a node has the given secondary ip.
62

63
  @type lu: L{LogicalUnit}
64
  @param lu: the LU on behalf of which we make the check
65
  @type node: L{objects.Node}
66
  @param node: the node to check
67
  @type secondary_ip: string
68
  @param secondary_ip: the ip to check
69
  @type prereq: boolean
70
  @param prereq: whether to throw a prerequisite or an execute error
71
  @raise errors.OpPrereqError: if the node doesn't have the ip,
72
  and prereq=True
73
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
74

75
  """
76
  # this can be called with a new node, which has no UUID yet, so perform the
77
  # RPC call using its name
78
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
79
  result.Raise("Failure checking secondary ip on node %s" % node.name,
80
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
81
  if not result.payload:
82
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
83
           " please fix and re-run this command" % secondary_ip)
84
    if prereq:
85
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
86
    else:
87
      raise errors.OpExecError(msg)
88

    
89

    
90
class LUNodeAdd(LogicalUnit):
91
  """Logical unit for adding node to the cluster.
92

93
  """
94
  HPATH = "node-add"
95
  HTYPE = constants.HTYPE_NODE
96
  _NFLAGS = ["master_capable", "vm_capable"]
97

    
98
  def CheckArguments(self):
99
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
100
    # validate/normalize the node name
101
    self.hostname = netutils.GetHostname(name=self.op.node_name,
102
                                         family=self.primary_ip_family)
103
    self.op.node_name = self.hostname.name
104

    
105
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
106
      raise errors.OpPrereqError("Cannot readd the master node",
107
                                 errors.ECODE_STATE)
108

    
109
    if self.op.readd and self.op.group:
110
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
111
                                 " being readded", errors.ECODE_INVAL)
112

    
113
  def BuildHooksEnv(self):
114
    """Build hooks env.
115

116
    This will run on all nodes before, and on all nodes + the new node after.
117

118
    """
119
    return {
120
      "OP_TARGET": self.op.node_name,
121
      "NODE_NAME": self.op.node_name,
122
      "NODE_PIP": self.op.primary_ip,
123
      "NODE_SIP": self.op.secondary_ip,
124
      "MASTER_CAPABLE": str(self.op.master_capable),
125
      "VM_CAPABLE": str(self.op.vm_capable),
126
      }
127

    
128
  def BuildHooksNodes(self):
129
    """Build hooks nodes.
130

131
    """
132
    hook_nodes = self.cfg.GetNodeList()
133
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
134
    if new_node_info is not None:
135
      # Exclude added node
136
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
137

    
138
    # add the new node as post hook node by name; it does not have an UUID yet
139
    return (hook_nodes, hook_nodes)
140

    
141
  def PreparePostHookNodes(self, post_hook_node_uuids):
142
    return post_hook_node_uuids + [self.new_node.uuid]
143

    
144
  def CheckPrereq(self):
145
    """Check prerequisites.
146

147
    This checks:
148
     - the new node is not already in the config
149
     - it is resolvable
150
     - its parameters (single/dual homed) matches the cluster
151

152
    Any errors are signaled by raising errors.OpPrereqError.
153

154
    """
155
    node_name = self.hostname.name
156
    self.op.primary_ip = self.hostname.ip
157
    if self.op.secondary_ip is None:
158
      if self.primary_ip_family == netutils.IP6Address.family:
159
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
160
                                   " IPv4 address must be given as secondary",
161
                                   errors.ECODE_INVAL)
162
      self.op.secondary_ip = self.op.primary_ip
163

    
164
    secondary_ip = self.op.secondary_ip
165
    if not netutils.IP4Address.IsValid(secondary_ip):
166
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
167
                                 " address" % secondary_ip, errors.ECODE_INVAL)
168

    
169
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
170
    if not self.op.readd and existing_node_info is not None:
171
      raise errors.OpPrereqError("Node %s is already in the configuration" %
172
                                 node_name, errors.ECODE_EXISTS)
173
    elif self.op.readd and existing_node_info is None:
174
      raise errors.OpPrereqError("Node %s is not in the configuration" %
175
                                 node_name, errors.ECODE_NOENT)
176

    
177
    self.changed_primary_ip = False
178

    
179
    for existing_node in self.cfg.GetAllNodesInfo().values():
180
      if self.op.readd and node_name == existing_node.name:
181
        if existing_node.secondary_ip != secondary_ip:
182
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
183
                                     " address configuration as before",
184
                                     errors.ECODE_INVAL)
185
        if existing_node.primary_ip != self.op.primary_ip:
186
          self.changed_primary_ip = True
187

    
188
        continue
189

    
190
      if (existing_node.primary_ip == self.op.primary_ip or
191
          existing_node.secondary_ip == self.op.primary_ip or
192
          existing_node.primary_ip == secondary_ip or
193
          existing_node.secondary_ip == secondary_ip):
194
        raise errors.OpPrereqError("New node ip address(es) conflict with"
195
                                   " existing node %s" % existing_node.name,
196
                                   errors.ECODE_NOTUNIQUE)
197

    
198
    # After this 'if' block, None is no longer a valid value for the
199
    # _capable op attributes
200
    if self.op.readd:
201
      assert existing_node_info is not None, \
202
        "Can't retrieve locked node %s" % node_name
203
      for attr in self._NFLAGS:
204
        if getattr(self.op, attr) is None:
205
          setattr(self.op, attr, getattr(existing_node_info, attr))
206
    else:
207
      for attr in self._NFLAGS:
208
        if getattr(self.op, attr) is None:
209
          setattr(self.op, attr, True)
210

    
211
    if self.op.readd and not self.op.vm_capable:
212
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
213
      if pri or sec:
214
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
215
                                   " flag set to false, but it already holds"
216
                                   " instances" % node_name,
217
                                   errors.ECODE_STATE)
218

    
219
    # check that the type of the node (single versus dual homed) is the
220
    # same as for the master
221
    myself = self.cfg.GetMasterNodeInfo()
222
    master_singlehomed = myself.secondary_ip == myself.primary_ip
223
    newbie_singlehomed = secondary_ip == self.op.primary_ip
224
    if master_singlehomed != newbie_singlehomed:
225
      if master_singlehomed:
226
        raise errors.OpPrereqError("The master has no secondary ip but the"
227
                                   " new node has one",
228
                                   errors.ECODE_INVAL)
229
      else:
230
        raise errors.OpPrereqError("The master has a secondary ip but the"
231
                                   " new node doesn't have one",
232
                                   errors.ECODE_INVAL)
233

    
234
    # checks reachability
235
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
236
      raise errors.OpPrereqError("Node not reachable by ping",
237
                                 errors.ECODE_ENVIRON)
238

    
239
    if not newbie_singlehomed:
240
      # check reachability from my secondary ip to newbie's secondary ip
241
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
242
                              source=myself.secondary_ip):
243
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
244
                                   " based ping to node daemon port",
245
                                   errors.ECODE_ENVIRON)
246

    
247
    if self.op.readd:
248
      exceptions = [existing_node_info.uuid]
249
    else:
250
      exceptions = []
251

    
252
    if self.op.master_capable:
253
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
254
    else:
255
      self.master_candidate = False
256

    
257
    node_group = self.cfg.LookupNodeGroup(self.op.group)
258

    
259
    if self.op.readd:
260
      self.new_node = existing_node_info
261
    else:
262
      self.new_node = objects.Node(name=node_name,
263
                                   primary_ip=self.op.primary_ip,
264
                                   secondary_ip=secondary_ip,
265
                                   master_candidate=self.master_candidate,
266
                                   offline=False, drained=False,
267
                                   group=node_group, ndparams={})
268

    
269
    if self.op.ndparams:
270
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
271
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
272
                           "node", "cluster or group")
273

    
274
    if self.op.hv_state:
275
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
276

    
277
    if self.op.disk_state:
278
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
279

    
280
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
281
    #       it a property on the base class.
282
    rpcrunner = rpc.DnsOnlyRunner()
283
    result = rpcrunner.call_version([node_name])[node_name]
284
    result.Raise("Can't get version information from node %s" % node_name)
285
    if constants.PROTOCOL_VERSION == result.payload:
286
      logging.info("Communication to node %s fine, sw version %s match",
287
                   node_name, result.payload)
288
    else:
289
      raise errors.OpPrereqError("Version mismatch master version %s,"
290
                                 " node version %s" %
291
                                 (constants.PROTOCOL_VERSION, result.payload),
292
                                 errors.ECODE_ENVIRON)
293

    
294
    vg_name = self.cfg.GetVGName()
295
    if vg_name is not None:
296
      vparams = {constants.NV_PVLIST: [vg_name]}
297
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
298
      cname = self.cfg.GetClusterName()
299
      result = rpcrunner.call_node_verify_light(
300
          [node_name], vparams, cname,
301
          self.cfg.GetClusterInfo().hvparams,
302
          {node_name: node_group},
303
          self.cfg.GetAllNodeGroupsInfoDict()
304
        )[node_name]
305
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
306
      if errmsgs:
307
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
308
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
309

    
310
  def _InitOpenVSwitch(self):
311
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
312
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
313

    
314
    ovs = filled_ndparams.get(constants.ND_OVS, None)
315
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
316
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
317

    
318
    if ovs:
319
      if not ovs_link:
320
        self.LogInfo("No physical interface for OpenvSwitch was given."
321
                     " OpenvSwitch will not have an outside connection. This"
322
                     " might not be what you want.")
323

    
324
      result = self.rpc.call_node_configure_ovs(
325
                 self.new_node.name, ovs_name, ovs_link)
326
      result.Raise("Failed to initialize OpenVSwitch on new node")
327

    
328
  def Exec(self, feedback_fn):
329
    """Adds the new node to the cluster.
330

331
    """
332
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
333
      "Not owning BGL"
334

    
335
    # We adding a new node so we assume it's powered
336
    self.new_node.powered = True
337

    
338
    # for re-adds, reset the offline/drained/master-candidate flags;
339
    # we need to reset here, otherwise offline would prevent RPC calls
340
    # later in the procedure; this also means that if the re-add
341
    # fails, we are left with a non-offlined, broken node
342
    if self.op.readd:
343
      self.new_node.offline = False
344
      self.new_node.drained = False
345
      self.LogInfo("Readding a node, the offline/drained flags were reset")
346
      # if we demote the node, we do cleanup later in the procedure
347
      self.new_node.master_candidate = self.master_candidate
348
      if self.changed_primary_ip:
349
        self.new_node.primary_ip = self.op.primary_ip
350

    
351
    # copy the master/vm_capable flags
352
    for attr in self._NFLAGS:
353
      setattr(self.new_node, attr, getattr(self.op, attr))
354

    
355
    # notify the user about any possible mc promotion
356
    if self.new_node.master_candidate:
357
      self.LogInfo("Node will be a master candidate")
358

    
359
    if self.op.ndparams:
360
      self.new_node.ndparams = self.op.ndparams
361
    else:
362
      self.new_node.ndparams = {}
363

    
364
    if self.op.hv_state:
365
      self.new_node.hv_state_static = self.new_hv_state
366

    
367
    if self.op.disk_state:
368
      self.new_node.disk_state_static = self.new_disk_state
369

    
370
    # Add node to our /etc/hosts, and add key to known_hosts
371
    if self.cfg.GetClusterInfo().modify_etc_hosts:
372
      master_node = self.cfg.GetMasterNode()
373
      result = self.rpc.call_etc_hosts_modify(
374
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
375
                 self.hostname.ip)
376
      result.Raise("Can't update hosts file with new host data")
377

    
378
    if self.new_node.secondary_ip != self.new_node.primary_ip:
379
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
380
                               False)
381

    
382
    node_verifier_uuids = [self.cfg.GetMasterNode()]
383
    node_verify_param = {
384
      constants.NV_NODELIST: ([self.new_node.name], {}),
385
      # TODO: do a node-net-test as well?
386
    }
387

    
388
    result = self.rpc.call_node_verify(
389
               node_verifier_uuids, node_verify_param,
390
               self.cfg.GetClusterName(),
391
               self.cfg.GetClusterInfo().hvparams,
392
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
393
               self.cfg.GetAllNodeGroupsInfoDict()
394
               )
395
    for verifier in node_verifier_uuids:
396
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
397
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
398
      if nl_payload:
399
        for failed in nl_payload:
400
          feedback_fn("ssh/hostname verification failed"
401
                      " (checking from %s): %s" %
402
                      (verifier, nl_payload[failed]))
403
        raise errors.OpExecError("ssh/hostname verification failed")
404

    
405
    self._InitOpenVSwitch()
406

    
407
    if self.op.readd:
408
      self.context.ReaddNode(self.new_node)
409
      RedistributeAncillaryFiles(self)
410
      # make sure we redistribute the config
411
      self.cfg.Update(self.new_node, feedback_fn)
412
      # and make sure the new node will not have old files around
413
      if not self.new_node.master_candidate:
414
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
415
        result.Warn("Node failed to demote itself from master candidate status",
416
                    self.LogWarning)
417
    else:
418
      self.context.AddNode(self.new_node, self.proc.GetECId())
419
      RedistributeAncillaryFiles(self)
420

    
421
    # We create a new certificate even if the node is readded
422
    digest = CreateNewClientCert(self, self.new_node.uuid)
423
    if self.new_node.master_candidate:
424
      self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
425
    else:
426
      self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
427

    
428

    
429
class LUNodeSetParams(LogicalUnit):
430
  """Modifies the parameters of a node.
431

432
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
433
      to the node role (as _ROLE_*)
434
  @cvar _R2F: a dictionary from node role to tuples of flags
435
  @cvar _FLAGS: a list of attribute names corresponding to the flags
436

437
  """
438
  HPATH = "node-modify"
439
  HTYPE = constants.HTYPE_NODE
440
  REQ_BGL = False
441
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
442
  _F2R = {
443
    (True, False, False): _ROLE_CANDIDATE,
444
    (False, True, False): _ROLE_DRAINED,
445
    (False, False, True): _ROLE_OFFLINE,
446
    (False, False, False): _ROLE_REGULAR,
447
    }
448
  _R2F = dict((v, k) for k, v in _F2R.items())
449
  _FLAGS = ["master_candidate", "drained", "offline"]
450

    
451
  def CheckArguments(self):
452
    (self.op.node_uuid, self.op.node_name) = \
453
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
454
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
455
                self.op.master_capable, self.op.vm_capable,
456
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
457
                self.op.disk_state]
458
    if all_mods.count(None) == len(all_mods):
459
      raise errors.OpPrereqError("Please pass at least one modification",
460
                                 errors.ECODE_INVAL)
461
    if all_mods.count(True) > 1:
462
      raise errors.OpPrereqError("Can't set the node into more than one"
463
                                 " state at the same time",
464
                                 errors.ECODE_INVAL)
465

    
466
    # Boolean value that tells us whether we might be demoting from MC
467
    self.might_demote = (self.op.master_candidate is False or
468
                         self.op.offline is True or
469
                         self.op.drained is True or
470
                         self.op.master_capable is False)
471

    
472
    if self.op.secondary_ip:
473
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
474
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
475
                                   " address" % self.op.secondary_ip,
476
                                   errors.ECODE_INVAL)
477

    
478
    self.lock_all = self.op.auto_promote and self.might_demote
479
    self.lock_instances = self.op.secondary_ip is not None
480

    
481
  def _InstanceFilter(self, instance):
482
    """Filter for getting affected instances.
483

484
    """
485
    return (instance.disk_template in constants.DTS_INT_MIRROR and
486
            self.op.node_uuid in self.cfg.GetInstanceNodes(instance))
487

    
488
  def ExpandNames(self):
489
    if self.lock_all:
490
      self.needed_locks = {
491
        locking.LEVEL_NODE: locking.ALL_SET,
492

    
493
        # Block allocations when all nodes are locked
494
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
495
        }
496
    else:
497
      self.needed_locks = {
498
        locking.LEVEL_NODE: self.op.node_uuid,
499
        }
500

    
501
    # Since modifying a node can have severe effects on currently running
502
    # operations the resource lock is at least acquired in shared mode
503
    self.needed_locks[locking.LEVEL_NODE_RES] = \
504
      self.needed_locks[locking.LEVEL_NODE]
505

    
506
    # Get all locks except nodes in shared mode; they are not used for anything
507
    # but read-only access
508
    self.share_locks = ShareAll()
509
    self.share_locks[locking.LEVEL_NODE] = 0
510
    self.share_locks[locking.LEVEL_NODE_RES] = 0
511
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
512

    
513
    if self.lock_instances:
514
      self.needed_locks[locking.LEVEL_INSTANCE] = \
515
        self.cfg.GetInstanceNames(
516
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
517

    
518
  def BuildHooksEnv(self):
519
    """Build hooks env.
520

521
    This runs on the master node.
522

523
    """
524
    return {
525
      "OP_TARGET": self.op.node_name,
526
      "MASTER_CANDIDATE": str(self.op.master_candidate),
527
      "OFFLINE": str(self.op.offline),
528
      "DRAINED": str(self.op.drained),
529
      "MASTER_CAPABLE": str(self.op.master_capable),
530
      "VM_CAPABLE": str(self.op.vm_capable),
531
      }
532

    
533
  def BuildHooksNodes(self):
534
    """Build hooks nodes.
535

536
    """
537
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
538
    return (nl, nl)
539

    
540
  def CheckPrereq(self):
541
    """Check prerequisites.
542

543
    This only checks the instance list against the existing names.
544

545
    """
546
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
547
    if self.lock_instances:
548
      affected_instances = \
549
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
550

    
551
      # Verify instance locks
552
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
553
      wanted_instance_names = frozenset([inst.name for inst in
554
                                         affected_instances.values()])
555
      if wanted_instance_names - owned_instance_names:
556
        raise errors.OpPrereqError("Instances affected by changing node %s's"
557
                                   " secondary IP address have changed since"
558
                                   " locks were acquired, wanted '%s', have"
559
                                   " '%s'; retry the operation" %
560
                                   (node.name,
561
                                    utils.CommaJoin(wanted_instance_names),
562
                                    utils.CommaJoin(owned_instance_names)),
563
                                   errors.ECODE_STATE)
564
    else:
565
      affected_instances = None
566

    
567
    if (self.op.master_candidate is not None or
568
        self.op.drained is not None or
569
        self.op.offline is not None):
570
      # we can't change the master's node flags
571
      if node.uuid == self.cfg.GetMasterNode():
572
        raise errors.OpPrereqError("The master role can be changed"
573
                                   " only via master-failover",
574
                                   errors.ECODE_INVAL)
575

    
576
    if self.op.master_candidate and not node.master_capable:
577
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
578
                                 " it a master candidate" % node.name,
579
                                 errors.ECODE_STATE)
580

    
581
    if self.op.vm_capable is False:
582
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
583
      if ipri or isec:
584
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
585
                                   " the vm_capable flag" % node.name,
586
                                   errors.ECODE_STATE)
587

    
588
    if node.master_candidate and self.might_demote and not self.lock_all:
589
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
590
      # check if after removing the current node, we're missing master
591
      # candidates
592
      (mc_remaining, mc_should, _) = \
593
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
594
      if mc_remaining < mc_should:
595
        raise errors.OpPrereqError("Not enough master candidates, please"
596
                                   " pass auto promote option to allow"
597
                                   " promotion (--auto-promote or RAPI"
598
                                   " auto_promote=True)", errors.ECODE_STATE)
599

    
600
    self.old_flags = old_flags = (node.master_candidate,
601
                                  node.drained, node.offline)
602
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
603
    self.old_role = old_role = self._F2R[old_flags]
604

    
605
    # Check for ineffective changes
606
    for attr in self._FLAGS:
607
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
608
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
609
        setattr(self.op, attr, None)
610

    
611
    # Past this point, any flag change to False means a transition
612
    # away from the respective state, as only real changes are kept
613

    
614
    # TODO: We might query the real power state if it supports OOB
615
    if SupportsOob(self.cfg, node):
616
      if self.op.offline is False and not (node.powered or
617
                                           self.op.powered is True):
618
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
619
                                    " offline status can be reset") %
620
                                   self.op.node_name, errors.ECODE_STATE)
621
    elif self.op.powered is not None:
622
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
623
                                  " as it does not support out-of-band"
624
                                  " handling") % self.op.node_name,
625
                                 errors.ECODE_STATE)
626

    
627
    # If we're being deofflined/drained, we'll MC ourself if needed
628
    if (self.op.drained is False or self.op.offline is False or
629
        (self.op.master_capable and not node.master_capable)):
630
      if _DecideSelfPromotion(self):
631
        self.op.master_candidate = True
632
        self.LogInfo("Auto-promoting node to master candidate")
633

    
634
    # If we're no longer master capable, we'll demote ourselves from MC
635
    if self.op.master_capable is False and node.master_candidate:
636
      self.LogInfo("Demoting from master candidate")
637
      self.op.master_candidate = False
638

    
639
    # Compute new role
640
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
641
    if self.op.master_candidate:
642
      new_role = self._ROLE_CANDIDATE
643
    elif self.op.drained:
644
      new_role = self._ROLE_DRAINED
645
    elif self.op.offline:
646
      new_role = self._ROLE_OFFLINE
647
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
648
      # False is still in new flags, which means we're un-setting (the
649
      # only) True flag
650
      new_role = self._ROLE_REGULAR
651
    else: # no new flags, nothing, keep old role
652
      new_role = old_role
653

    
654
    self.new_role = new_role
655

    
656
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
657
      # Trying to transition out of offline status
658
      result = self.rpc.call_version([node.uuid])[node.uuid]
659
      if result.fail_msg:
660
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
661
                                   " to report its version: %s" %
662
                                   (node.name, result.fail_msg),
663
                                   errors.ECODE_STATE)
664
      else:
665
        self.LogWarning("Transitioning node from offline to online state"
666
                        " without using re-add. Please make sure the node"
667
                        " is healthy!")
668

    
669
    # When changing the secondary ip, verify if this is a single-homed to
670
    # multi-homed transition or vice versa, and apply the relevant
671
    # restrictions.
672
    if self.op.secondary_ip:
673
      # Ok even without locking, because this can't be changed by any LU
674
      master = self.cfg.GetMasterNodeInfo()
675
      master_singlehomed = master.secondary_ip == master.primary_ip
676
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
677
        if self.op.force and node.uuid == master.uuid:
678
          self.LogWarning("Transitioning from single-homed to multi-homed"
679
                          " cluster; all nodes will require a secondary IP"
680
                          " address")
681
        else:
682
          raise errors.OpPrereqError("Changing the secondary ip on a"
683
                                     " single-homed cluster requires the"
684
                                     " --force option to be passed, and the"
685
                                     " target node to be the master",
686
                                     errors.ECODE_INVAL)
687
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
688
        if self.op.force and node.uuid == master.uuid:
689
          self.LogWarning("Transitioning from multi-homed to single-homed"
690
                          " cluster; secondary IP addresses will have to be"
691
                          " removed")
692
        else:
693
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
694
                                     " same as the primary IP on a multi-homed"
695
                                     " cluster, unless the --force option is"
696
                                     " passed, and the target node is the"
697
                                     " master", errors.ECODE_INVAL)
698

    
699
      assert not (set([inst.name for inst in affected_instances.values()]) -
700
                  self.owned_locks(locking.LEVEL_INSTANCE))
701

    
702
      if node.offline:
703
        if affected_instances:
704
          msg = ("Cannot change secondary IP address: offline node has"
705
                 " instances (%s) configured to use it" %
706
                 utils.CommaJoin(
707
                   [inst.name for inst in affected_instances.values()]))
708
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
709
      else:
710
        # On online nodes, check that no instances are running, and that
711
        # the node has the new ip and we can reach it.
712
        for instance in affected_instances.values():
713
          CheckInstanceState(self, instance, INSTANCE_DOWN,
714
                             msg="cannot change secondary ip")
715

    
716
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
717
        if master.uuid != node.uuid:
718
          # check reachability from master secondary ip to new secondary ip
719
          if not netutils.TcpPing(self.op.secondary_ip,
720
                                  constants.DEFAULT_NODED_PORT,
721
                                  source=master.secondary_ip):
722
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
723
                                       " based ping to node daemon port",
724
                                       errors.ECODE_ENVIRON)
725

    
726
    if self.op.ndparams:
727
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
728
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
729
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
730
                           "node", "cluster or group")
731
      self.new_ndparams = new_ndparams
732

    
733
    if self.op.hv_state:
734
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
735
                                                node.hv_state_static)
736

    
737
    if self.op.disk_state:
738
      self.new_disk_state = \
739
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
740

    
741
  def Exec(self, feedback_fn):
742
    """Modifies a node.
743

744
    """
745
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
746
    result = []
747

    
748
    if self.op.ndparams:
749
      node.ndparams = self.new_ndparams
750

    
751
    if self.op.powered is not None:
752
      node.powered = self.op.powered
753

    
754
    if self.op.hv_state:
755
      node.hv_state_static = self.new_hv_state
756

    
757
    if self.op.disk_state:
758
      node.disk_state_static = self.new_disk_state
759

    
760
    for attr in ["master_capable", "vm_capable"]:
761
      val = getattr(self.op, attr)
762
      if val is not None:
763
        setattr(node, attr, val)
764
        result.append((attr, str(val)))
765

    
766
    if self.new_role != self.old_role:
767
      # Tell the node to demote itself, if no longer MC and not offline
768
      if self.old_role == self._ROLE_CANDIDATE and \
769
          self.new_role != self._ROLE_OFFLINE:
770
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
771
        if msg:
772
          self.LogWarning("Node failed to demote itself: %s", msg)
773

    
774
      new_flags = self._R2F[self.new_role]
775
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
776
        if of != nf:
777
          result.append((desc, str(nf)))
778
      (node.master_candidate, node.drained, node.offline) = new_flags
779

    
780
      # we locked all nodes, we adjust the CP before updating this node
781
      if self.lock_all:
782
        AdjustCandidatePool(self, [node.uuid])
783

    
784
      # if node gets promoted, grant RPC priviledges
785
      if self.new_role == self._ROLE_CANDIDATE:
786
        AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
787
      # if node is demoted, revoke RPC priviledges
788
      if self.old_role == self._ROLE_CANDIDATE:
789
        RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
790

    
791
    if self.op.secondary_ip:
792
      node.secondary_ip = self.op.secondary_ip
793
      result.append(("secondary_ip", self.op.secondary_ip))
794

    
795
    # this will trigger configuration file update, if needed
796
    self.cfg.Update(node, feedback_fn)
797

    
798
    # this will trigger job queue propagation or cleanup if the mc
799
    # flag changed
800
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
801
      self.context.ReaddNode(node)
802

    
803
    return result
804

    
805

    
806
class LUNodePowercycle(NoHooksLU):
807
  """Powercycles a node.
808

809
  """
810
  REQ_BGL = False
811

    
812
  def CheckArguments(self):
813
    (self.op.node_uuid, self.op.node_name) = \
814
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
815

    
816
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
817
      raise errors.OpPrereqError("The node is the master and the force"
818
                                 " parameter was not set",
819
                                 errors.ECODE_INVAL)
820

    
821
  def ExpandNames(self):
822
    """Locking for PowercycleNode.
823

824
    This is a last-resort option and shouldn't block on other
825
    jobs. Therefore, we grab no locks.
826

827
    """
828
    self.needed_locks = {}
829

    
830
  def Exec(self, feedback_fn):
831
    """Reboots a node.
832

833
    """
834
    default_hypervisor = self.cfg.GetHypervisorType()
835
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
836
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
837
                                           default_hypervisor,
838
                                           hvparams)
839
    result.Raise("Failed to schedule the reboot")
840
    return result.payload
841

    
842

    
843
def _GetNodeInstancesInner(cfg, fn):
844
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
845

    
846

    
847
def _GetNodePrimaryInstances(cfg, node_uuid):
848
  """Returns primary instances on a node.
849

850
  """
851
  return _GetNodeInstancesInner(cfg,
852
                                lambda inst: node_uuid == inst.primary_node)
853

    
854

    
855
def _GetNodeSecondaryInstances(cfg, node_uuid):
856
  """Returns secondary instances on a node.
857

858
  """
859
  return _GetNodeInstancesInner(cfg,
860
                                lambda inst: node_uuid in
861
                                  cfg.GetInstanceSecondaryNodes(inst))
862

    
863

    
864
def _GetNodeInstances(cfg, node_uuid):
865
  """Returns a list of all primary and secondary instances on a node.
866

867
  """
868

    
869
  return _GetNodeInstancesInner(cfg,
870
                                lambda inst: node_uuid in
871
                                  cfg.GetInstanceNodes(inst))
872

    
873

    
874
class LUNodeEvacuate(NoHooksLU):
875
  """Evacuates instances off a list of nodes.
876

877
  """
878
  REQ_BGL = False
879

    
880
  def CheckArguments(self):
881
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
882

    
883
  def ExpandNames(self):
884
    (self.op.node_uuid, self.op.node_name) = \
885
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
886

    
887
    if self.op.remote_node is not None:
888
      (self.op.remote_node_uuid, self.op.remote_node) = \
889
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
890
                              self.op.remote_node)
891
      assert self.op.remote_node
892

    
893
      if self.op.node_uuid == self.op.remote_node_uuid:
894
        raise errors.OpPrereqError("Can not use evacuated node as a new"
895
                                   " secondary node", errors.ECODE_INVAL)
896

    
897
      if self.op.mode != constants.NODE_EVAC_SEC:
898
        raise errors.OpPrereqError("Without the use of an iallocator only"
899
                                   " secondary instances can be evacuated",
900
                                   errors.ECODE_INVAL)
901

    
902
    # Declare locks
903
    self.share_locks = ShareAll()
904
    self.needed_locks = {
905
      locking.LEVEL_INSTANCE: [],
906
      locking.LEVEL_NODEGROUP: [],
907
      locking.LEVEL_NODE: [],
908
      }
909

    
910
    # Determine nodes (via group) optimistically, needs verification once locks
911
    # have been acquired
912
    self.lock_nodes = self._DetermineNodes()
913

    
914
  def _DetermineNodes(self):
915
    """Gets the list of node UUIDs to operate on.
916

917
    """
918
    if self.op.remote_node is None:
919
      # Iallocator will choose any node(s) in the same group
920
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
921
    else:
922
      group_nodes = frozenset([self.op.remote_node_uuid])
923

    
924
    # Determine nodes to be locked
925
    return set([self.op.node_uuid]) | group_nodes
926

    
927
  def _DetermineInstances(self):
928
    """Builds list of instances to operate on.
929

930
    """
931
    assert self.op.mode in constants.NODE_EVAC_MODES
932

    
933
    if self.op.mode == constants.NODE_EVAC_PRI:
934
      # Primary instances only
935
      inst_fn = _GetNodePrimaryInstances
936
      assert self.op.remote_node is None, \
937
        "Evacuating primary instances requires iallocator"
938
    elif self.op.mode == constants.NODE_EVAC_SEC:
939
      # Secondary instances only
940
      inst_fn = _GetNodeSecondaryInstances
941
    else:
942
      # All instances
943
      assert self.op.mode == constants.NODE_EVAC_ALL
944
      inst_fn = _GetNodeInstances
945
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
946
      # per instance
947
      raise errors.OpPrereqError("Due to an issue with the iallocator"
948
                                 " interface it is not possible to evacuate"
949
                                 " all instances at once; specify explicitly"
950
                                 " whether to evacuate primary or secondary"
951
                                 " instances",
952
                                 errors.ECODE_INVAL)
953

    
954
    return inst_fn(self.cfg, self.op.node_uuid)
955

    
956
  def DeclareLocks(self, level):
957
    if level == locking.LEVEL_INSTANCE:
958
      # Lock instances optimistically, needs verification once node and group
959
      # locks have been acquired
960
      self.needed_locks[locking.LEVEL_INSTANCE] = \
961
        set(i.name for i in self._DetermineInstances())
962

    
963
    elif level == locking.LEVEL_NODEGROUP:
964
      # Lock node groups for all potential target nodes optimistically, needs
965
      # verification once nodes have been acquired
966
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
967
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
968

    
969
    elif level == locking.LEVEL_NODE:
970
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
971

    
972
  def CheckPrereq(self):
973
    # Verify locks
974
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
975
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
976
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
977

    
978
    need_nodes = self._DetermineNodes()
979

    
980
    if not owned_nodes.issuperset(need_nodes):
981
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
982
                                 " locks were acquired, current nodes are"
983
                                 " are '%s', used to be '%s'; retry the"
984
                                 " operation" %
985
                                 (self.op.node_name,
986
                                  utils.CommaJoin(need_nodes),
987
                                  utils.CommaJoin(owned_nodes)),
988
                                 errors.ECODE_STATE)
989

    
990
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
991
    if owned_groups != wanted_groups:
992
      raise errors.OpExecError("Node groups changed since locks were acquired,"
993
                               " current groups are '%s', used to be '%s';"
994
                               " retry the operation" %
995
                               (utils.CommaJoin(wanted_groups),
996
                                utils.CommaJoin(owned_groups)))
997

    
998
    # Determine affected instances
999
    self.instances = self._DetermineInstances()
1000
    self.instance_names = [i.name for i in self.instances]
1001

    
1002
    if set(self.instance_names) != owned_instance_names:
1003
      raise errors.OpExecError("Instances on node '%s' changed since locks"
1004
                               " were acquired, current instances are '%s',"
1005
                               " used to be '%s'; retry the operation" %
1006
                               (self.op.node_name,
1007
                                utils.CommaJoin(self.instance_names),
1008
                                utils.CommaJoin(owned_instance_names)))
1009

    
1010
    if self.instance_names:
1011
      self.LogInfo("Evacuating instances from node '%s': %s",
1012
                   self.op.node_name,
1013
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1014
    else:
1015
      self.LogInfo("No instances to evacuate from node '%s'",
1016
                   self.op.node_name)
1017

    
1018
    if self.op.remote_node is not None:
1019
      for i in self.instances:
1020
        if i.primary_node == self.op.remote_node_uuid:
1021
          raise errors.OpPrereqError("Node %s is the primary node of"
1022
                                     " instance %s, cannot use it as"
1023
                                     " secondary" %
1024
                                     (self.op.remote_node, i.name),
1025
                                     errors.ECODE_INVAL)
1026

    
1027
  def Exec(self, feedback_fn):
1028
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1029

    
1030
    if not self.instance_names:
1031
      # No instances to evacuate
1032
      jobs = []
1033

    
1034
    elif self.op.iallocator is not None:
1035
      # TODO: Implement relocation to other group
1036
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1037
                                     instances=list(self.instance_names))
1038
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1039

    
1040
      ial.Run(self.op.iallocator)
1041

    
1042
      if not ial.success:
1043
        raise errors.OpPrereqError("Can't compute node evacuation using"
1044
                                   " iallocator '%s': %s" %
1045
                                   (self.op.iallocator, ial.info),
1046
                                   errors.ECODE_NORES)
1047

    
1048
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1049

    
1050
    elif self.op.remote_node is not None:
1051
      assert self.op.mode == constants.NODE_EVAC_SEC
1052
      jobs = [
1053
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1054
                                        remote_node=self.op.remote_node,
1055
                                        disks=[],
1056
                                        mode=constants.REPLACE_DISK_CHG,
1057
                                        early_release=self.op.early_release)]
1058
        for instance_name in self.instance_names]
1059

    
1060
    else:
1061
      raise errors.ProgrammerError("No iallocator or remote node")
1062

    
1063
    return ResultWithJobs(jobs)
1064

    
1065

    
1066
class LUNodeMigrate(LogicalUnit):
1067
  """Migrate all instances from a node.
1068

1069
  """
1070
  HPATH = "node-migrate"
1071
  HTYPE = constants.HTYPE_NODE
1072
  REQ_BGL = False
1073

    
1074
  def CheckArguments(self):
1075
    pass
1076

    
1077
  def ExpandNames(self):
1078
    (self.op.node_uuid, self.op.node_name) = \
1079
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1080

    
1081
    self.share_locks = ShareAll()
1082
    self.needed_locks = {
1083
      locking.LEVEL_NODE: [self.op.node_uuid],
1084
      }
1085

    
1086
  def BuildHooksEnv(self):
1087
    """Build hooks env.
1088

1089
    This runs on the master, the primary and all the secondaries.
1090

1091
    """
1092
    return {
1093
      "NODE_NAME": self.op.node_name,
1094
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1095
      }
1096

    
1097
  def BuildHooksNodes(self):
1098
    """Build hooks nodes.
1099

1100
    """
1101
    nl = [self.cfg.GetMasterNode()]
1102
    return (nl, nl)
1103

    
1104
  def CheckPrereq(self):
1105
    pass
1106

    
1107
  def Exec(self, feedback_fn):
1108
    # Prepare jobs for migration instances
1109
    jobs = [
1110
      [opcodes.OpInstanceMigrate(
1111
        instance_name=inst.name,
1112
        mode=self.op.mode,
1113
        live=self.op.live,
1114
        iallocator=self.op.iallocator,
1115
        target_node=self.op.target_node,
1116
        allow_runtime_changes=self.op.allow_runtime_changes,
1117
        ignore_ipolicy=self.op.ignore_ipolicy)]
1118
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1119

    
1120
    # TODO: Run iallocator in this opcode and pass correct placement options to
1121
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1122
    # running the iallocator and the actual migration, a good consistency model
1123
    # will have to be found.
1124

    
1125
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1126
            frozenset([self.op.node_uuid]))
1127

    
1128
    return ResultWithJobs(jobs)
1129

    
1130

    
1131
def _GetStorageTypeArgs(cfg, storage_type):
1132
  """Returns the arguments for a storage type.
1133

1134
  """
1135
  # Special case for file storage
1136

    
1137
  if storage_type == constants.ST_FILE:
1138
    return [[cfg.GetFileStorageDir()]]
1139
  elif storage_type == constants.ST_SHARED_FILE:
1140
    dts = cfg.GetClusterInfo().enabled_disk_templates
1141
    paths = []
1142
    if constants.DT_SHARED_FILE in dts:
1143
      paths.append(cfg.GetSharedFileStorageDir())
1144
    if constants.DT_GLUSTER in dts:
1145
      paths.append(cfg.GetGlusterStorageDir())
1146
    return [paths]
1147
  else:
1148
    return []
1149

    
1150

    
1151
class LUNodeModifyStorage(NoHooksLU):
1152
  """Logical unit for modifying a storage volume on a node.
1153

1154
  """
1155
  REQ_BGL = False
1156

    
1157
  def CheckArguments(self):
1158
    (self.op.node_uuid, self.op.node_name) = \
1159
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1160

    
1161
    storage_type = self.op.storage_type
1162

    
1163
    try:
1164
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1165
    except KeyError:
1166
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1167
                                 " modified" % storage_type,
1168
                                 errors.ECODE_INVAL)
1169

    
1170
    diff = set(self.op.changes.keys()) - modifiable
1171
    if diff:
1172
      raise errors.OpPrereqError("The following fields can not be modified for"
1173
                                 " storage units of type '%s': %r" %
1174
                                 (storage_type, list(diff)),
1175
                                 errors.ECODE_INVAL)
1176

    
1177
  def CheckPrereq(self):
1178
    """Check prerequisites.
1179

1180
    """
1181
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1182

    
1183
  def ExpandNames(self):
1184
    self.needed_locks = {
1185
      locking.LEVEL_NODE: self.op.node_uuid,
1186
      }
1187

    
1188
  def Exec(self, feedback_fn):
1189
    """Computes the list of nodes and their attributes.
1190

1191
    """
1192
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1193
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1194
                                          self.op.storage_type, st_args,
1195
                                          self.op.name, self.op.changes)
1196
    result.Raise("Failed to modify storage unit '%s' on %s" %
1197
                 (self.op.name, self.op.node_name))
1198

    
1199

    
1200
def _CheckOutputFields(fields, selected):
1201
  """Checks whether all selected fields are valid according to fields.
1202

1203
  @type fields: L{utils.FieldSet}
1204
  @param fields: fields set
1205
  @type selected: L{utils.FieldSet}
1206
  @param selected: fields set
1207

1208
  """
1209
  delta = fields.NonMatching(selected)
1210
  if delta:
1211
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1212
                               % ",".join(delta), errors.ECODE_INVAL)
1213

    
1214

    
1215
class LUNodeQueryvols(NoHooksLU):
1216
  """Logical unit for getting volumes on node(s).
1217

1218
  """
1219
  REQ_BGL = False
1220

    
1221
  def CheckArguments(self):
1222
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1223
                                      constants.VF_VG, constants.VF_NAME,
1224
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1225
                       self.op.output_fields)
1226

    
1227
  def ExpandNames(self):
1228
    self.share_locks = ShareAll()
1229

    
1230
    if self.op.nodes:
1231
      self.needed_locks = {
1232
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1233
        }
1234
    else:
1235
      self.needed_locks = {
1236
        locking.LEVEL_NODE: locking.ALL_SET,
1237
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1238
        }
1239

    
1240
  def Exec(self, feedback_fn):
1241
    """Computes the list of nodes and their attributes.
1242

1243
    """
1244
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1245
    volumes = self.rpc.call_node_volumes(node_uuids)
1246

    
1247
    ilist = self.cfg.GetAllInstancesInfo()
1248
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1249

    
1250
    output = []
1251
    for node_uuid in node_uuids:
1252
      nresult = volumes[node_uuid]
1253
      if nresult.offline:
1254
        continue
1255
      msg = nresult.fail_msg
1256
      if msg:
1257
        self.LogWarning("Can't compute volume data on node %s: %s",
1258
                        self.cfg.GetNodeName(node_uuid), msg)
1259
        continue
1260

    
1261
      node_vols = sorted(nresult.payload,
1262
                         key=operator.itemgetter(constants.VF_DEV))
1263

    
1264
      for vol in node_vols:
1265
        node_output = []
1266
        for field in self.op.output_fields:
1267
          if field == constants.VF_NODE:
1268
            val = self.cfg.GetNodeName(node_uuid)
1269
          elif field == constants.VF_PHYS:
1270
            val = vol[constants.VF_DEV]
1271
          elif field == constants.VF_VG:
1272
            val = vol[constants.VF_VG]
1273
          elif field == constants.VF_NAME:
1274
            val = vol[constants.VF_NAME]
1275
          elif field == constants.VF_SIZE:
1276
            val = int(float(vol[constants.VF_SIZE]))
1277
          elif field == constants.VF_INSTANCE:
1278
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1279
                                 vol[constants.VF_NAME]), None)
1280
            if inst is not None:
1281
              val = inst.name
1282
            else:
1283
              val = "-"
1284
          else:
1285
            raise errors.ParameterError(field)
1286
          node_output.append(str(val))
1287

    
1288
        output.append(node_output)
1289

    
1290
    return output
1291

    
1292

    
1293
class LUNodeQueryStorage(NoHooksLU):
1294
  """Logical unit for getting information on storage units on node(s).
1295

1296
  """
1297
  REQ_BGL = False
1298

    
1299
  def CheckArguments(self):
1300
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1301
                       self.op.output_fields)
1302

    
1303
  def ExpandNames(self):
1304
    self.share_locks = ShareAll()
1305

    
1306
    if self.op.nodes:
1307
      self.needed_locks = {
1308
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1309
        }
1310
    else:
1311
      self.needed_locks = {
1312
        locking.LEVEL_NODE: locking.ALL_SET,
1313
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1314
        }
1315

    
1316
  def _DetermineStorageType(self):
1317
    """Determines the default storage type of the cluster.
1318

1319
    """
1320
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1321
    default_storage_type = \
1322
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1323
    return default_storage_type
1324

    
1325
  def CheckPrereq(self):
1326
    """Check prerequisites.
1327

1328
    """
1329
    if self.op.storage_type:
1330
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1331
      self.storage_type = self.op.storage_type
1332
    else:
1333
      self.storage_type = self._DetermineStorageType()
1334
      supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1335
      if self.storage_type not in supported_storage_types:
1336
        raise errors.OpPrereqError(
1337
            "Storage reporting for storage type '%s' is not supported. Please"
1338
            " use the --storage-type option to specify one of the supported"
1339
            " storage types (%s) or set the default disk template to one that"
1340
            " supports storage reporting." %
1341
            (self.storage_type, utils.CommaJoin(supported_storage_types)))
1342

    
1343
  def Exec(self, feedback_fn):
1344
    """Computes the list of nodes and their attributes.
1345

1346
    """
1347
    if self.op.storage_type:
1348
      self.storage_type = self.op.storage_type
1349
    else:
1350
      self.storage_type = self._DetermineStorageType()
1351

    
1352
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1353

    
1354
    # Always get name to sort by
1355
    if constants.SF_NAME in self.op.output_fields:
1356
      fields = self.op.output_fields[:]
1357
    else:
1358
      fields = [constants.SF_NAME] + self.op.output_fields
1359

    
1360
    # Never ask for node or type as it's only known to the LU
1361
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1362
      while extra in fields:
1363
        fields.remove(extra)
1364

    
1365
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1366
    name_idx = field_idx[constants.SF_NAME]
1367

    
1368
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1369
    data = self.rpc.call_storage_list(self.node_uuids,
1370
                                      self.storage_type, st_args,
1371
                                      self.op.name, fields)
1372

    
1373
    result = []
1374

    
1375
    for node_uuid in utils.NiceSort(self.node_uuids):
1376
      node_name = self.cfg.GetNodeName(node_uuid)
1377
      nresult = data[node_uuid]
1378
      if nresult.offline:
1379
        continue
1380

    
1381
      msg = nresult.fail_msg
1382
      if msg:
1383
        self.LogWarning("Can't get storage data from node %s: %s",
1384
                        node_name, msg)
1385
        continue
1386

    
1387
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1388

    
1389
      for name in utils.NiceSort(rows.keys()):
1390
        row = rows[name]
1391

    
1392
        out = []
1393

    
1394
        for field in self.op.output_fields:
1395
          if field == constants.SF_NODE:
1396
            val = node_name
1397
          elif field == constants.SF_TYPE:
1398
            val = self.storage_type
1399
          elif field in field_idx:
1400
            val = row[field_idx[field]]
1401
          else:
1402
            raise errors.ParameterError(field)
1403

    
1404
          out.append(val)
1405

    
1406
        result.append(out)
1407

    
1408
    return result
1409

    
1410

    
1411
class LUNodeRemove(LogicalUnit):
1412
  """Logical unit for removing a node.
1413

1414
  """
1415
  HPATH = "node-remove"
1416
  HTYPE = constants.HTYPE_NODE
1417

    
1418
  def BuildHooksEnv(self):
1419
    """Build hooks env.
1420

1421
    """
1422
    return {
1423
      "OP_TARGET": self.op.node_name,
1424
      "NODE_NAME": self.op.node_name,
1425
      }
1426

    
1427
  def BuildHooksNodes(self):
1428
    """Build hooks nodes.
1429

1430
    This doesn't run on the target node in the pre phase as a failed
1431
    node would then be impossible to remove.
1432

1433
    """
1434
    all_nodes = self.cfg.GetNodeList()
1435
    try:
1436
      all_nodes.remove(self.op.node_uuid)
1437
    except ValueError:
1438
      pass
1439
    return (all_nodes, all_nodes)
1440

    
1441
  def CheckPrereq(self):
1442
    """Check prerequisites.
1443

1444
    This checks:
1445
     - the node exists in the configuration
1446
     - it does not have primary or secondary instances
1447
     - it's not the master
1448

1449
    Any errors are signaled by raising errors.OpPrereqError.
1450

1451
    """
1452
    (self.op.node_uuid, self.op.node_name) = \
1453
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1454
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1455
    assert node is not None
1456

    
1457
    masternode = self.cfg.GetMasterNode()
1458
    if node.uuid == masternode:
1459
      raise errors.OpPrereqError("Node is the master node, failover to another"
1460
                                 " node is required", errors.ECODE_INVAL)
1461

    
1462
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1463
      if node.uuid in self.cfg.GetInstanceNodes(instance):
1464
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1465
                                   " please remove first" % instance.name,
1466
                                   errors.ECODE_INVAL)
1467
    self.op.node_name = node.name
1468
    self.node = node
1469

    
1470
  def Exec(self, feedback_fn):
1471
    """Removes the node from the cluster.
1472

1473
    """
1474
    logging.info("Stopping the node daemon and removing configs from node %s",
1475
                 self.node.name)
1476

    
1477
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1478

    
1479
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1480
      "Not owning BGL"
1481

    
1482
    # Promote nodes to master candidate as needed
1483
    AdjustCandidatePool(self, [self.node.uuid])
1484
    self.context.RemoveNode(self.node)
1485

    
1486
    # Run post hooks on the node before it's removed
1487
    RunPostHook(self, self.node.name)
1488

    
1489
    # we have to call this by name rather than by UUID, as the node is no longer
1490
    # in the config
1491
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1492
    msg = result.fail_msg
1493
    if msg:
1494
      self.LogWarning("Errors encountered on the remote node while leaving"
1495
                      " the cluster: %s", msg)
1496

    
1497
    cluster = self.cfg.GetClusterInfo()
1498

    
1499
    # Remove node from candidate certificate list
1500
    if self.node.master_candidate:
1501
      self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1502

    
1503
    # Remove node from our /etc/hosts
1504
    if cluster.modify_etc_hosts:
1505
      master_node_uuid = self.cfg.GetMasterNode()
1506
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1507
                                              constants.ETC_HOSTS_REMOVE,
1508
                                              self.node.name, None)
1509
      result.Raise("Can't update hosts file with new host data")
1510
      RedistributeAncillaryFiles(self)
1511

    
1512

    
1513
class LURepairNodeStorage(NoHooksLU):
1514
  """Repairs the volume group on a node.
1515

1516
  """
1517
  REQ_BGL = False
1518

    
1519
  def CheckArguments(self):
1520
    (self.op.node_uuid, self.op.node_name) = \
1521
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1522

    
1523
    storage_type = self.op.storage_type
1524

    
1525
    if (constants.SO_FIX_CONSISTENCY not in
1526
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1527
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1528
                                 " repaired" % storage_type,
1529
                                 errors.ECODE_INVAL)
1530

    
1531
  def ExpandNames(self):
1532
    self.needed_locks = {
1533
      locking.LEVEL_NODE: [self.op.node_uuid],
1534
      }
1535

    
1536
  def _CheckFaultyDisks(self, instance, node_uuid):
1537
    """Ensure faulty disks abort the opcode or at least warn."""
1538
    try:
1539
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1540
                                 node_uuid, True):
1541
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1542
                                   " node '%s'" %
1543
                                   (instance.name,
1544
                                    self.cfg.GetNodeName(node_uuid)),
1545
                                   errors.ECODE_STATE)
1546
    except errors.OpPrereqError, err:
1547
      if self.op.ignore_consistency:
1548
        self.LogWarning(str(err.args[0]))
1549
      else:
1550
        raise
1551

    
1552
  def CheckPrereq(self):
1553
    """Check prerequisites.
1554

1555
    """
1556
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1557

    
1558
    # Check whether any instance on this node has faulty disks
1559
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1560
      if not inst.disks_active:
1561
        continue
1562
      check_nodes = set(self.cfg.GetInstanceNodes(inst))
1563
      check_nodes.discard(self.op.node_uuid)
1564
      for inst_node_uuid in check_nodes:
1565
        self._CheckFaultyDisks(inst, inst_node_uuid)
1566

    
1567
  def Exec(self, feedback_fn):
1568
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1569
                (self.op.name, self.op.node_name))
1570

    
1571
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1572
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1573
                                           self.op.storage_type, st_args,
1574
                                           self.op.name,
1575
                                           constants.SO_FIX_CONSISTENCY)
1576
    result.Raise("Failed to repair storage unit '%s' on %s" %
1577
                 (self.op.name, self.op.node_name))