Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 1c4910f7

History | View | Annotate | Download (58.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
import ganeti.rpc.node as rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
46
  AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts
47

    
48

    
49
def _DecideSelfPromotion(lu, exceptions=None):
50
  """Decide whether I should promote myself as a master candidate.
51

52
  """
53
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
54
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
55
  # the new node will increase mc_max with one, so:
56
  mc_should = min(mc_should + 1, cp_size)
57
  return mc_now < mc_should
58

    
59

    
60
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
61
  """Ensure that a node has the given secondary ip.
62

63
  @type lu: L{LogicalUnit}
64
  @param lu: the LU on behalf of which we make the check
65
  @type node: L{objects.Node}
66
  @param node: the node to check
67
  @type secondary_ip: string
68
  @param secondary_ip: the ip to check
69
  @type prereq: boolean
70
  @param prereq: whether to throw a prerequisite or an execute error
71
  @raise errors.OpPrereqError: if the node doesn't have the ip,
72
  and prereq=True
73
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
74

75
  """
76
  # this can be called with a new node, which has no UUID yet, so perform the
77
  # RPC call using its name
78
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
79
  result.Raise("Failure checking secondary ip on node %s" % node.name,
80
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
81
  if not result.payload:
82
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
83
           " please fix and re-run this command" % secondary_ip)
84
    if prereq:
85
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
86
    else:
87
      raise errors.OpExecError(msg)
88

    
89

    
90
class LUNodeAdd(LogicalUnit):
91
  """Logical unit for adding node to the cluster.
92

93
  """
94
  HPATH = "node-add"
95
  HTYPE = constants.HTYPE_NODE
96
  _NFLAGS = ["master_capable", "vm_capable"]
97

    
98
  def CheckArguments(self):
99
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
100
    # validate/normalize the node name
101
    self.hostname = netutils.GetHostname(name=self.op.node_name,
102
                                         family=self.primary_ip_family)
103
    self.op.node_name = self.hostname.name
104

    
105
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
106
      raise errors.OpPrereqError("Cannot readd the master node",
107
                                 errors.ECODE_STATE)
108

    
109
    if self.op.readd and self.op.group:
110
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
111
                                 " being readded", errors.ECODE_INVAL)
112

    
113
  def BuildHooksEnv(self):
114
    """Build hooks env.
115

116
    This will run on all nodes before, and on all nodes + the new node after.
117

118
    """
119
    return {
120
      "OP_TARGET": self.op.node_name,
121
      "NODE_NAME": self.op.node_name,
122
      "NODE_PIP": self.op.primary_ip,
123
      "NODE_SIP": self.op.secondary_ip,
124
      "MASTER_CAPABLE": str(self.op.master_capable),
125
      "VM_CAPABLE": str(self.op.vm_capable),
126
      }
127

    
128
  def BuildHooksNodes(self):
129
    """Build hooks nodes.
130

131
    """
132
    hook_nodes = self.cfg.GetNodeList()
133
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
134
    if new_node_info is not None:
135
      # Exclude added node
136
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
137

    
138
    # add the new node as post hook node by name; it does not have an UUID yet
139
    return (hook_nodes, hook_nodes)
140

    
141
  def PreparePostHookNodes(self, post_hook_node_uuids):
142
    return post_hook_node_uuids + [self.new_node.uuid]
143

    
144
  def CheckPrereq(self):
145
    """Check prerequisites.
146

147
    This checks:
148
     - the new node is not already in the config
149
     - it is resolvable
150
     - its parameters (single/dual homed) matches the cluster
151

152
    Any errors are signaled by raising errors.OpPrereqError.
153

154
    """
155
    node_name = self.hostname.name
156
    self.op.primary_ip = self.hostname.ip
157
    if self.op.secondary_ip is None:
158
      if self.primary_ip_family == netutils.IP6Address.family:
159
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
160
                                   " IPv4 address must be given as secondary",
161
                                   errors.ECODE_INVAL)
162
      self.op.secondary_ip = self.op.primary_ip
163

    
164
    secondary_ip = self.op.secondary_ip
165
    if not netutils.IP4Address.IsValid(secondary_ip):
166
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
167
                                 " address" % secondary_ip, errors.ECODE_INVAL)
168

    
169
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
170
    if not self.op.readd and existing_node_info is not None:
171
      raise errors.OpPrereqError("Node %s is already in the configuration" %
172
                                 node_name, errors.ECODE_EXISTS)
173
    elif self.op.readd and existing_node_info is None:
174
      raise errors.OpPrereqError("Node %s is not in the configuration" %
175
                                 node_name, errors.ECODE_NOENT)
176

    
177
    self.changed_primary_ip = False
178

    
179
    for existing_node in self.cfg.GetAllNodesInfo().values():
180
      if self.op.readd and node_name == existing_node.name:
181
        if existing_node.secondary_ip != secondary_ip:
182
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
183
                                     " address configuration as before",
184
                                     errors.ECODE_INVAL)
185
        if existing_node.primary_ip != self.op.primary_ip:
186
          self.changed_primary_ip = True
187

    
188
        continue
189

    
190
      if (existing_node.primary_ip == self.op.primary_ip or
191
          existing_node.secondary_ip == self.op.primary_ip or
192
          existing_node.primary_ip == secondary_ip or
193
          existing_node.secondary_ip == secondary_ip):
194
        raise errors.OpPrereqError("New node ip address(es) conflict with"
195
                                   " existing node %s" % existing_node.name,
196
                                   errors.ECODE_NOTUNIQUE)
197

    
198
    # After this 'if' block, None is no longer a valid value for the
199
    # _capable op attributes
200
    if self.op.readd:
201
      assert existing_node_info is not None, \
202
        "Can't retrieve locked node %s" % node_name
203
      for attr in self._NFLAGS:
204
        if getattr(self.op, attr) is None:
205
          setattr(self.op, attr, getattr(existing_node_info, attr))
206
    else:
207
      for attr in self._NFLAGS:
208
        if getattr(self.op, attr) is None:
209
          setattr(self.op, attr, True)
210

    
211
    if self.op.readd and not self.op.vm_capable:
212
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
213
      if pri or sec:
214
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
215
                                   " flag set to false, but it already holds"
216
                                   " instances" % node_name,
217
                                   errors.ECODE_STATE)
218

    
219
    # check that the type of the node (single versus dual homed) is the
220
    # same as for the master
221
    myself = self.cfg.GetMasterNodeInfo()
222
    master_singlehomed = myself.secondary_ip == myself.primary_ip
223
    newbie_singlehomed = secondary_ip == self.op.primary_ip
224
    if master_singlehomed != newbie_singlehomed:
225
      if master_singlehomed:
226
        raise errors.OpPrereqError("The master has no secondary ip but the"
227
                                   " new node has one",
228
                                   errors.ECODE_INVAL)
229
      else:
230
        raise errors.OpPrereqError("The master has a secondary ip but the"
231
                                   " new node doesn't have one",
232
                                   errors.ECODE_INVAL)
233

    
234
    # checks reachability
235
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
236
      raise errors.OpPrereqError("Node not reachable by ping",
237
                                 errors.ECODE_ENVIRON)
238

    
239
    if not newbie_singlehomed:
240
      # check reachability from my secondary ip to newbie's secondary ip
241
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
242
                              source=myself.secondary_ip):
243
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
244
                                   " based ping to node daemon port",
245
                                   errors.ECODE_ENVIRON)
246

    
247
    if self.op.readd:
248
      exceptions = [existing_node_info.uuid]
249
    else:
250
      exceptions = []
251

    
252
    if self.op.master_capable:
253
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
254
    else:
255
      self.master_candidate = False
256

    
257
    node_group = self.cfg.LookupNodeGroup(self.op.group)
258

    
259
    if self.op.readd:
260
      self.new_node = existing_node_info
261
    else:
262
      self.new_node = objects.Node(name=node_name,
263
                                   primary_ip=self.op.primary_ip,
264
                                   secondary_ip=secondary_ip,
265
                                   master_candidate=self.master_candidate,
266
                                   offline=False, drained=False,
267
                                   group=node_group, ndparams={})
268

    
269
    if self.op.ndparams:
270
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
271
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
272
                           "node", "cluster or group")
273

    
274
    if self.op.hv_state:
275
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
276

    
277
    if self.op.disk_state:
278
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
279

    
280
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
281
    #       it a property on the base class.
282
    rpcrunner = rpc.DnsOnlyRunner()
283
    result = rpcrunner.call_version([node_name])[node_name]
284
    result.Raise("Can't get version information from node %s" % node_name)
285
    if constants.PROTOCOL_VERSION == result.payload:
286
      logging.info("Communication to node %s fine, sw version %s match",
287
                   node_name, result.payload)
288
    else:
289
      raise errors.OpPrereqError("Version mismatch master version %s,"
290
                                 " node version %s" %
291
                                 (constants.PROTOCOL_VERSION, result.payload),
292
                                 errors.ECODE_ENVIRON)
293

    
294
    vg_name = self.cfg.GetVGName()
295
    if vg_name is not None:
296
      vparams = {constants.NV_PVLIST: [vg_name]}
297
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
298
      cname = self.cfg.GetClusterName()
299
      result = rpcrunner.call_node_verify_light(
300
          [node_name], vparams, cname,
301
          self.cfg.GetClusterInfo().hvparams,
302
          {node_name: node_group},
303
          self.cfg.GetAllNodeGroupsInfoDict()
304
        )[node_name]
305
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
306
      if errmsgs:
307
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
308
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
309

    
310
  def _InitOpenVSwitch(self):
311
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
312
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
313

    
314
    ovs = filled_ndparams.get(constants.ND_OVS, None)
315
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
316
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
317

    
318
    if ovs:
319
      if not ovs_link:
320
        self.LogInfo("No physical interface for OpenvSwitch was given."
321
                     " OpenvSwitch will not have an outside connection. This"
322
                     " might not be what you want.")
323

    
324
      result = self.rpc.call_node_configure_ovs(
325
                 self.new_node.name, ovs_name, ovs_link)
326
      result.Raise("Failed to initialize OpenVSwitch on new node")
327

    
328
  def Exec(self, feedback_fn):
329
    """Adds the new node to the cluster.
330

331
    """
332
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
333
      "Not owning BGL"
334

    
335
    # We adding a new node so we assume it's powered
336
    self.new_node.powered = True
337

    
338
    # for re-adds, reset the offline/drained/master-candidate flags;
339
    # we need to reset here, otherwise offline would prevent RPC calls
340
    # later in the procedure; this also means that if the re-add
341
    # fails, we are left with a non-offlined, broken node
342
    if self.op.readd:
343
      self.new_node.offline = False
344
      self.new_node.drained = False
345
      self.LogInfo("Readding a node, the offline/drained flags were reset")
346
      # if we demote the node, we do cleanup later in the procedure
347
      self.new_node.master_candidate = self.master_candidate
348
      if self.changed_primary_ip:
349
        self.new_node.primary_ip = self.op.primary_ip
350

    
351
    # copy the master/vm_capable flags
352
    for attr in self._NFLAGS:
353
      setattr(self.new_node, attr, getattr(self.op, attr))
354

    
355
    # notify the user about any possible mc promotion
356
    if self.new_node.master_candidate:
357
      self.LogInfo("Node will be a master candidate")
358

    
359
    if self.op.ndparams:
360
      self.new_node.ndparams = self.op.ndparams
361
    else:
362
      self.new_node.ndparams = {}
363

    
364
    if self.op.hv_state:
365
      self.new_node.hv_state_static = self.new_hv_state
366

    
367
    if self.op.disk_state:
368
      self.new_node.disk_state_static = self.new_disk_state
369

    
370
    # Add node to our /etc/hosts, and add key to known_hosts
371
    if self.cfg.GetClusterInfo().modify_etc_hosts:
372
      master_node = self.cfg.GetMasterNode()
373
      result = self.rpc.call_etc_hosts_modify(
374
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
375
                 self.hostname.ip)
376
      result.Raise("Can't update hosts file with new host data")
377

    
378
    if self.new_node.secondary_ip != self.new_node.primary_ip:
379
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
380
                               False)
381

    
382
    node_verifier_uuids = [self.cfg.GetMasterNode()]
383
    node_verify_param = {
384
      constants.NV_NODELIST: ([self.new_node.name], {}),
385
      # TODO: do a node-net-test as well?
386
    }
387

    
388
    result = self.rpc.call_node_verify(
389
               node_verifier_uuids, node_verify_param,
390
               self.cfg.GetClusterName(),
391
               self.cfg.GetClusterInfo().hvparams,
392
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
393
               self.cfg.GetAllNodeGroupsInfoDict()
394
               )
395
    for verifier in node_verifier_uuids:
396
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
397
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
398
      if nl_payload:
399
        for failed in nl_payload:
400
          feedback_fn("ssh/hostname verification failed"
401
                      " (checking from %s): %s" %
402
                      (verifier, nl_payload[failed]))
403
        raise errors.OpExecError("ssh/hostname verification failed")
404

    
405
    self._InitOpenVSwitch()
406

    
407
    if self.op.readd:
408
      self.context.ReaddNode(self.new_node)
409
      RedistributeAncillaryFiles(self)
410
      # make sure we redistribute the config
411
      self.cfg.Update(self.new_node, feedback_fn)
412
      # and make sure the new node will not have old files around
413
      if not self.new_node.master_candidate:
414
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
415
        result.Warn("Node failed to demote itself from master candidate status",
416
                    self.LogWarning)
417
    else:
418
      self.context.AddNode(self.new_node, self.proc.GetECId())
419
      RedistributeAncillaryFiles(self)
420

    
421
    cluster = self.cfg.GetClusterInfo()
422
    # We create a new certificate even if the node is readded
423
    digest = CreateNewClientCert(self, self.new_node.uuid)
424
    if self.new_node.master_candidate:
425
      utils.AddNodeToCandidateCerts(self.new_node.uuid, digest,
426
                                    cluster.candidate_certs)
427
      self.cfg.Update(cluster, feedback_fn)
428
    else:
429
      if self.new_node.uuid in cluster.candidate_certs:
430
        utils.RemoveNodeFromCandidateCerts(self.new_node.uuid,
431
                                           cluster.candidate_certs)
432
        self.cfg.Update(cluster, feedback_fn)
433

    
434

    
435
class LUNodeSetParams(LogicalUnit):
436
  """Modifies the parameters of a node.
437

438
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
439
      to the node role (as _ROLE_*)
440
  @cvar _R2F: a dictionary from node role to tuples of flags
441
  @cvar _FLAGS: a list of attribute names corresponding to the flags
442

443
  """
444
  HPATH = "node-modify"
445
  HTYPE = constants.HTYPE_NODE
446
  REQ_BGL = False
447
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
448
  _F2R = {
449
    (True, False, False): _ROLE_CANDIDATE,
450
    (False, True, False): _ROLE_DRAINED,
451
    (False, False, True): _ROLE_OFFLINE,
452
    (False, False, False): _ROLE_REGULAR,
453
    }
454
  _R2F = dict((v, k) for k, v in _F2R.items())
455
  _FLAGS = ["master_candidate", "drained", "offline"]
456

    
457
  def CheckArguments(self):
458
    (self.op.node_uuid, self.op.node_name) = \
459
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
460
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
461
                self.op.master_capable, self.op.vm_capable,
462
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
463
                self.op.disk_state]
464
    if all_mods.count(None) == len(all_mods):
465
      raise errors.OpPrereqError("Please pass at least one modification",
466
                                 errors.ECODE_INVAL)
467
    if all_mods.count(True) > 1:
468
      raise errors.OpPrereqError("Can't set the node into more than one"
469
                                 " state at the same time",
470
                                 errors.ECODE_INVAL)
471

    
472
    # Boolean value that tells us whether we might be demoting from MC
473
    self.might_demote = (self.op.master_candidate is False or
474
                         self.op.offline is True or
475
                         self.op.drained is True or
476
                         self.op.master_capable is False)
477

    
478
    if self.op.secondary_ip:
479
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
480
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
481
                                   " address" % self.op.secondary_ip,
482
                                   errors.ECODE_INVAL)
483

    
484
    self.lock_all = self.op.auto_promote and self.might_demote
485
    self.lock_instances = self.op.secondary_ip is not None
486

    
487
  def _InstanceFilter(self, instance):
488
    """Filter for getting affected instances.
489

490
    """
491
    return (instance.disk_template in constants.DTS_INT_MIRROR and
492
            self.op.node_uuid in instance.all_nodes)
493

    
494
  def ExpandNames(self):
495
    if self.lock_all:
496
      self.needed_locks = {
497
        locking.LEVEL_NODE: locking.ALL_SET,
498

    
499
        # Block allocations when all nodes are locked
500
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
501
        }
502
    else:
503
      self.needed_locks = {
504
        locking.LEVEL_NODE: self.op.node_uuid,
505
        }
506

    
507
    # Since modifying a node can have severe effects on currently running
508
    # operations the resource lock is at least acquired in shared mode
509
    self.needed_locks[locking.LEVEL_NODE_RES] = \
510
      self.needed_locks[locking.LEVEL_NODE]
511

    
512
    # Get all locks except nodes in shared mode; they are not used for anything
513
    # but read-only access
514
    self.share_locks = ShareAll()
515
    self.share_locks[locking.LEVEL_NODE] = 0
516
    self.share_locks[locking.LEVEL_NODE_RES] = 0
517
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
518

    
519
    if self.lock_instances:
520
      self.needed_locks[locking.LEVEL_INSTANCE] = \
521
        self.cfg.GetInstanceNames(
522
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
523

    
524
  def BuildHooksEnv(self):
525
    """Build hooks env.
526

527
    This runs on the master node.
528

529
    """
530
    return {
531
      "OP_TARGET": self.op.node_name,
532
      "MASTER_CANDIDATE": str(self.op.master_candidate),
533
      "OFFLINE": str(self.op.offline),
534
      "DRAINED": str(self.op.drained),
535
      "MASTER_CAPABLE": str(self.op.master_capable),
536
      "VM_CAPABLE": str(self.op.vm_capable),
537
      }
538

    
539
  def BuildHooksNodes(self):
540
    """Build hooks nodes.
541

542
    """
543
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
544
    return (nl, nl)
545

    
546
  def CheckPrereq(self):
547
    """Check prerequisites.
548

549
    This only checks the instance list against the existing names.
550

551
    """
552
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
553
    if self.lock_instances:
554
      affected_instances = \
555
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
556

    
557
      # Verify instance locks
558
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
559
      wanted_instance_names = frozenset([inst.name for inst in
560
                                         affected_instances.values()])
561
      if wanted_instance_names - owned_instance_names:
562
        raise errors.OpPrereqError("Instances affected by changing node %s's"
563
                                   " secondary IP address have changed since"
564
                                   " locks were acquired, wanted '%s', have"
565
                                   " '%s'; retry the operation" %
566
                                   (node.name,
567
                                    utils.CommaJoin(wanted_instance_names),
568
                                    utils.CommaJoin(owned_instance_names)),
569
                                   errors.ECODE_STATE)
570
    else:
571
      affected_instances = None
572

    
573
    if (self.op.master_candidate is not None or
574
        self.op.drained is not None or
575
        self.op.offline is not None):
576
      # we can't change the master's node flags
577
      if node.uuid == self.cfg.GetMasterNode():
578
        raise errors.OpPrereqError("The master role can be changed"
579
                                   " only via master-failover",
580
                                   errors.ECODE_INVAL)
581

    
582
    if self.op.master_candidate and not node.master_capable:
583
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
584
                                 " it a master candidate" % node.name,
585
                                 errors.ECODE_STATE)
586

    
587
    if self.op.vm_capable is False:
588
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
589
      if ipri or isec:
590
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
591
                                   " the vm_capable flag" % node.name,
592
                                   errors.ECODE_STATE)
593

    
594
    if node.master_candidate and self.might_demote and not self.lock_all:
595
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
596
      # check if after removing the current node, we're missing master
597
      # candidates
598
      (mc_remaining, mc_should, _) = \
599
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
600
      if mc_remaining < mc_should:
601
        raise errors.OpPrereqError("Not enough master candidates, please"
602
                                   " pass auto promote option to allow"
603
                                   " promotion (--auto-promote or RAPI"
604
                                   " auto_promote=True)", errors.ECODE_STATE)
605

    
606
    self.old_flags = old_flags = (node.master_candidate,
607
                                  node.drained, node.offline)
608
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
609
    self.old_role = old_role = self._F2R[old_flags]
610

    
611
    # Check for ineffective changes
612
    for attr in self._FLAGS:
613
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
614
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
615
        setattr(self.op, attr, None)
616

    
617
    # Past this point, any flag change to False means a transition
618
    # away from the respective state, as only real changes are kept
619

    
620
    # TODO: We might query the real power state if it supports OOB
621
    if SupportsOob(self.cfg, node):
622
      if self.op.offline is False and not (node.powered or
623
                                           self.op.powered is True):
624
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
625
                                    " offline status can be reset") %
626
                                   self.op.node_name, errors.ECODE_STATE)
627
    elif self.op.powered is not None:
628
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
629
                                  " as it does not support out-of-band"
630
                                  " handling") % self.op.node_name,
631
                                 errors.ECODE_STATE)
632

    
633
    # If we're being deofflined/drained, we'll MC ourself if needed
634
    if (self.op.drained is False or self.op.offline is False or
635
        (self.op.master_capable and not node.master_capable)):
636
      if _DecideSelfPromotion(self):
637
        self.op.master_candidate = True
638
        self.LogInfo("Auto-promoting node to master candidate")
639

    
640
    # If we're no longer master capable, we'll demote ourselves from MC
641
    if self.op.master_capable is False and node.master_candidate:
642
      self.LogInfo("Demoting from master candidate")
643
      self.op.master_candidate = False
644

    
645
    # Compute new role
646
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
647
    if self.op.master_candidate:
648
      new_role = self._ROLE_CANDIDATE
649
    elif self.op.drained:
650
      new_role = self._ROLE_DRAINED
651
    elif self.op.offline:
652
      new_role = self._ROLE_OFFLINE
653
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
654
      # False is still in new flags, which means we're un-setting (the
655
      # only) True flag
656
      new_role = self._ROLE_REGULAR
657
    else: # no new flags, nothing, keep old role
658
      new_role = old_role
659

    
660
    self.new_role = new_role
661

    
662
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
663
      # Trying to transition out of offline status
664
      result = self.rpc.call_version([node.uuid])[node.uuid]
665
      if result.fail_msg:
666
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
667
                                   " to report its version: %s" %
668
                                   (node.name, result.fail_msg),
669
                                   errors.ECODE_STATE)
670
      else:
671
        self.LogWarning("Transitioning node from offline to online state"
672
                        " without using re-add. Please make sure the node"
673
                        " is healthy!")
674

    
675
    # When changing the secondary ip, verify if this is a single-homed to
676
    # multi-homed transition or vice versa, and apply the relevant
677
    # restrictions.
678
    if self.op.secondary_ip:
679
      # Ok even without locking, because this can't be changed by any LU
680
      master = self.cfg.GetMasterNodeInfo()
681
      master_singlehomed = master.secondary_ip == master.primary_ip
682
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
683
        if self.op.force and node.uuid == master.uuid:
684
          self.LogWarning("Transitioning from single-homed to multi-homed"
685
                          " cluster; all nodes will require a secondary IP"
686
                          " address")
687
        else:
688
          raise errors.OpPrereqError("Changing the secondary ip on a"
689
                                     " single-homed cluster requires the"
690
                                     " --force option to be passed, and the"
691
                                     " target node to be the master",
692
                                     errors.ECODE_INVAL)
693
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
694
        if self.op.force and node.uuid == master.uuid:
695
          self.LogWarning("Transitioning from multi-homed to single-homed"
696
                          " cluster; secondary IP addresses will have to be"
697
                          " removed")
698
        else:
699
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
700
                                     " same as the primary IP on a multi-homed"
701
                                     " cluster, unless the --force option is"
702
                                     " passed, and the target node is the"
703
                                     " master", errors.ECODE_INVAL)
704

    
705
      assert not (set([inst.name for inst in affected_instances.values()]) -
706
                  self.owned_locks(locking.LEVEL_INSTANCE))
707

    
708
      if node.offline:
709
        if affected_instances:
710
          msg = ("Cannot change secondary IP address: offline node has"
711
                 " instances (%s) configured to use it" %
712
                 utils.CommaJoin(
713
                   [inst.name for inst in affected_instances.values()]))
714
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
715
      else:
716
        # On online nodes, check that no instances are running, and that
717
        # the node has the new ip and we can reach it.
718
        for instance in affected_instances.values():
719
          CheckInstanceState(self, instance, INSTANCE_DOWN,
720
                             msg="cannot change secondary ip")
721

    
722
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
723
        if master.uuid != node.uuid:
724
          # check reachability from master secondary ip to new secondary ip
725
          if not netutils.TcpPing(self.op.secondary_ip,
726
                                  constants.DEFAULT_NODED_PORT,
727
                                  source=master.secondary_ip):
728
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
729
                                       " based ping to node daemon port",
730
                                       errors.ECODE_ENVIRON)
731

    
732
    if self.op.ndparams:
733
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
734
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
735
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
736
                           "node", "cluster or group")
737
      self.new_ndparams = new_ndparams
738

    
739
    if self.op.hv_state:
740
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
741
                                                node.hv_state_static)
742

    
743
    if self.op.disk_state:
744
      self.new_disk_state = \
745
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
746

    
747
  def Exec(self, feedback_fn):
748
    """Modifies a node.
749

750
    """
751
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
752
    result = []
753

    
754
    if self.op.ndparams:
755
      node.ndparams = self.new_ndparams
756

    
757
    if self.op.powered is not None:
758
      node.powered = self.op.powered
759

    
760
    if self.op.hv_state:
761
      node.hv_state_static = self.new_hv_state
762

    
763
    if self.op.disk_state:
764
      node.disk_state_static = self.new_disk_state
765

    
766
    for attr in ["master_capable", "vm_capable"]:
767
      val = getattr(self.op, attr)
768
      if val is not None:
769
        setattr(node, attr, val)
770
        result.append((attr, str(val)))
771

    
772
    if self.new_role != self.old_role:
773
      # Tell the node to demote itself, if no longer MC and not offline
774
      if self.old_role == self._ROLE_CANDIDATE and \
775
          self.new_role != self._ROLE_OFFLINE:
776
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
777
        if msg:
778
          self.LogWarning("Node failed to demote itself: %s", msg)
779

    
780
      new_flags = self._R2F[self.new_role]
781
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
782
        if of != nf:
783
          result.append((desc, str(nf)))
784
      (node.master_candidate, node.drained, node.offline) = new_flags
785

    
786
      # we locked all nodes, we adjust the CP before updating this node
787
      if self.lock_all:
788
        AdjustCandidatePool(self, [node.uuid], feedback_fn)
789

    
790
      cluster = self.cfg.GetClusterInfo()
791
      # if node gets promoted, grant RPC priviledges
792
      if self.new_role == self._ROLE_CANDIDATE:
793
        AddNodeCertToCandidateCerts(self, node.uuid, cluster)
794
      # if node is demoted, revoke RPC priviledges
795
      if self.old_role == self._ROLE_CANDIDATE:
796
        RemoveNodeCertFromCandidateCerts(node.uuid, cluster)
797

    
798
    if self.op.secondary_ip:
799
      node.secondary_ip = self.op.secondary_ip
800
      result.append(("secondary_ip", self.op.secondary_ip))
801

    
802
    # this will trigger configuration file update, if needed
803
    self.cfg.Update(node, feedback_fn)
804

    
805
    # this will trigger job queue propagation or cleanup if the mc
806
    # flag changed
807
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
808
      self.context.ReaddNode(node)
809

    
810
    return result
811

    
812

    
813
class LUNodePowercycle(NoHooksLU):
814
  """Powercycles a node.
815

816
  """
817
  REQ_BGL = False
818

    
819
  def CheckArguments(self):
820
    (self.op.node_uuid, self.op.node_name) = \
821
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
822

    
823
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
824
      raise errors.OpPrereqError("The node is the master and the force"
825
                                 " parameter was not set",
826
                                 errors.ECODE_INVAL)
827

    
828
  def ExpandNames(self):
829
    """Locking for PowercycleNode.
830

831
    This is a last-resort option and shouldn't block on other
832
    jobs. Therefore, we grab no locks.
833

834
    """
835
    self.needed_locks = {}
836

    
837
  def Exec(self, feedback_fn):
838
    """Reboots a node.
839

840
    """
841
    default_hypervisor = self.cfg.GetHypervisorType()
842
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
843
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
844
                                           default_hypervisor,
845
                                           hvparams)
846
    result.Raise("Failed to schedule the reboot")
847
    return result.payload
848

    
849

    
850
def _GetNodeInstancesInner(cfg, fn):
851
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
852

    
853

    
854
def _GetNodePrimaryInstances(cfg, node_uuid):
855
  """Returns primary instances on a node.
856

857
  """
858
  return _GetNodeInstancesInner(cfg,
859
                                lambda inst: node_uuid == inst.primary_node)
860

    
861

    
862
def _GetNodeSecondaryInstances(cfg, node_uuid):
863
  """Returns secondary instances on a node.
864

865
  """
866
  return _GetNodeInstancesInner(cfg,
867
                                lambda inst: node_uuid in inst.secondary_nodes)
868

    
869

    
870
def _GetNodeInstances(cfg, node_uuid):
871
  """Returns a list of all primary and secondary instances on a node.
872

873
  """
874

    
875
  return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
876

    
877

    
878
class LUNodeEvacuate(NoHooksLU):
879
  """Evacuates instances off a list of nodes.
880

881
  """
882
  REQ_BGL = False
883

    
884
  def CheckArguments(self):
885
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
886

    
887
  def ExpandNames(self):
888
    (self.op.node_uuid, self.op.node_name) = \
889
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
890

    
891
    if self.op.remote_node is not None:
892
      (self.op.remote_node_uuid, self.op.remote_node) = \
893
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
894
                              self.op.remote_node)
895
      assert self.op.remote_node
896

    
897
      if self.op.node_uuid == self.op.remote_node_uuid:
898
        raise errors.OpPrereqError("Can not use evacuated node as a new"
899
                                   " secondary node", errors.ECODE_INVAL)
900

    
901
      if self.op.mode != constants.NODE_EVAC_SEC:
902
        raise errors.OpPrereqError("Without the use of an iallocator only"
903
                                   " secondary instances can be evacuated",
904
                                   errors.ECODE_INVAL)
905

    
906
    # Declare locks
907
    self.share_locks = ShareAll()
908
    self.needed_locks = {
909
      locking.LEVEL_INSTANCE: [],
910
      locking.LEVEL_NODEGROUP: [],
911
      locking.LEVEL_NODE: [],
912
      }
913

    
914
    # Determine nodes (via group) optimistically, needs verification once locks
915
    # have been acquired
916
    self.lock_nodes = self._DetermineNodes()
917

    
918
  def _DetermineNodes(self):
919
    """Gets the list of node UUIDs to operate on.
920

921
    """
922
    if self.op.remote_node is None:
923
      # Iallocator will choose any node(s) in the same group
924
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
925
    else:
926
      group_nodes = frozenset([self.op.remote_node_uuid])
927

    
928
    # Determine nodes to be locked
929
    return set([self.op.node_uuid]) | group_nodes
930

    
931
  def _DetermineInstances(self):
932
    """Builds list of instances to operate on.
933

934
    """
935
    assert self.op.mode in constants.NODE_EVAC_MODES
936

    
937
    if self.op.mode == constants.NODE_EVAC_PRI:
938
      # Primary instances only
939
      inst_fn = _GetNodePrimaryInstances
940
      assert self.op.remote_node is None, \
941
        "Evacuating primary instances requires iallocator"
942
    elif self.op.mode == constants.NODE_EVAC_SEC:
943
      # Secondary instances only
944
      inst_fn = _GetNodeSecondaryInstances
945
    else:
946
      # All instances
947
      assert self.op.mode == constants.NODE_EVAC_ALL
948
      inst_fn = _GetNodeInstances
949
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
950
      # per instance
951
      raise errors.OpPrereqError("Due to an issue with the iallocator"
952
                                 " interface it is not possible to evacuate"
953
                                 " all instances at once; specify explicitly"
954
                                 " whether to evacuate primary or secondary"
955
                                 " instances",
956
                                 errors.ECODE_INVAL)
957

    
958
    return inst_fn(self.cfg, self.op.node_uuid)
959

    
960
  def DeclareLocks(self, level):
961
    if level == locking.LEVEL_INSTANCE:
962
      # Lock instances optimistically, needs verification once node and group
963
      # locks have been acquired
964
      self.needed_locks[locking.LEVEL_INSTANCE] = \
965
        set(i.name for i in self._DetermineInstances())
966

    
967
    elif level == locking.LEVEL_NODEGROUP:
968
      # Lock node groups for all potential target nodes optimistically, needs
969
      # verification once nodes have been acquired
970
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
971
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
972

    
973
    elif level == locking.LEVEL_NODE:
974
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
975

    
976
  def CheckPrereq(self):
977
    # Verify locks
978
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
979
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
980
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
981

    
982
    need_nodes = self._DetermineNodes()
983

    
984
    if not owned_nodes.issuperset(need_nodes):
985
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
986
                                 " locks were acquired, current nodes are"
987
                                 " are '%s', used to be '%s'; retry the"
988
                                 " operation" %
989
                                 (self.op.node_name,
990
                                  utils.CommaJoin(need_nodes),
991
                                  utils.CommaJoin(owned_nodes)),
992
                                 errors.ECODE_STATE)
993

    
994
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
995
    if owned_groups != wanted_groups:
996
      raise errors.OpExecError("Node groups changed since locks were acquired,"
997
                               " current groups are '%s', used to be '%s';"
998
                               " retry the operation" %
999
                               (utils.CommaJoin(wanted_groups),
1000
                                utils.CommaJoin(owned_groups)))
1001

    
1002
    # Determine affected instances
1003
    self.instances = self._DetermineInstances()
1004
    self.instance_names = [i.name for i in self.instances]
1005

    
1006
    if set(self.instance_names) != owned_instance_names:
1007
      raise errors.OpExecError("Instances on node '%s' changed since locks"
1008
                               " were acquired, current instances are '%s',"
1009
                               " used to be '%s'; retry the operation" %
1010
                               (self.op.node_name,
1011
                                utils.CommaJoin(self.instance_names),
1012
                                utils.CommaJoin(owned_instance_names)))
1013

    
1014
    if self.instance_names:
1015
      self.LogInfo("Evacuating instances from node '%s': %s",
1016
                   self.op.node_name,
1017
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1018
    else:
1019
      self.LogInfo("No instances to evacuate from node '%s'",
1020
                   self.op.node_name)
1021

    
1022
    if self.op.remote_node is not None:
1023
      for i in self.instances:
1024
        if i.primary_node == self.op.remote_node_uuid:
1025
          raise errors.OpPrereqError("Node %s is the primary node of"
1026
                                     " instance %s, cannot use it as"
1027
                                     " secondary" %
1028
                                     (self.op.remote_node, i.name),
1029
                                     errors.ECODE_INVAL)
1030

    
1031
  def Exec(self, feedback_fn):
1032
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1033

    
1034
    if not self.instance_names:
1035
      # No instances to evacuate
1036
      jobs = []
1037

    
1038
    elif self.op.iallocator is not None:
1039
      # TODO: Implement relocation to other group
1040
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1041
                                     instances=list(self.instance_names))
1042
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1043

    
1044
      ial.Run(self.op.iallocator)
1045

    
1046
      if not ial.success:
1047
        raise errors.OpPrereqError("Can't compute node evacuation using"
1048
                                   " iallocator '%s': %s" %
1049
                                   (self.op.iallocator, ial.info),
1050
                                   errors.ECODE_NORES)
1051

    
1052
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1053

    
1054
    elif self.op.remote_node is not None:
1055
      assert self.op.mode == constants.NODE_EVAC_SEC
1056
      jobs = [
1057
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1058
                                        remote_node=self.op.remote_node,
1059
                                        disks=[],
1060
                                        mode=constants.REPLACE_DISK_CHG,
1061
                                        early_release=self.op.early_release)]
1062
        for instance_name in self.instance_names]
1063

    
1064
    else:
1065
      raise errors.ProgrammerError("No iallocator or remote node")
1066

    
1067
    return ResultWithJobs(jobs)
1068

    
1069

    
1070
class LUNodeMigrate(LogicalUnit):
1071
  """Migrate all instances from a node.
1072

1073
  """
1074
  HPATH = "node-migrate"
1075
  HTYPE = constants.HTYPE_NODE
1076
  REQ_BGL = False
1077

    
1078
  def CheckArguments(self):
1079
    pass
1080

    
1081
  def ExpandNames(self):
1082
    (self.op.node_uuid, self.op.node_name) = \
1083
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1084

    
1085
    self.share_locks = ShareAll()
1086
    self.needed_locks = {
1087
      locking.LEVEL_NODE: [self.op.node_uuid],
1088
      }
1089

    
1090
  def BuildHooksEnv(self):
1091
    """Build hooks env.
1092

1093
    This runs on the master, the primary and all the secondaries.
1094

1095
    """
1096
    return {
1097
      "NODE_NAME": self.op.node_name,
1098
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1099
      }
1100

    
1101
  def BuildHooksNodes(self):
1102
    """Build hooks nodes.
1103

1104
    """
1105
    nl = [self.cfg.GetMasterNode()]
1106
    return (nl, nl)
1107

    
1108
  def CheckPrereq(self):
1109
    pass
1110

    
1111
  def Exec(self, feedback_fn):
1112
    # Prepare jobs for migration instances
1113
    jobs = [
1114
      [opcodes.OpInstanceMigrate(
1115
        instance_name=inst.name,
1116
        mode=self.op.mode,
1117
        live=self.op.live,
1118
        iallocator=self.op.iallocator,
1119
        target_node=self.op.target_node,
1120
        allow_runtime_changes=self.op.allow_runtime_changes,
1121
        ignore_ipolicy=self.op.ignore_ipolicy)]
1122
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1123

    
1124
    # TODO: Run iallocator in this opcode and pass correct placement options to
1125
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1126
    # running the iallocator and the actual migration, a good consistency model
1127
    # will have to be found.
1128

    
1129
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1130
            frozenset([self.op.node_uuid]))
1131

    
1132
    return ResultWithJobs(jobs)
1133

    
1134

    
1135
def _GetStorageTypeArgs(cfg, storage_type):
1136
  """Returns the arguments for a storage type.
1137

1138
  """
1139
  # Special case for file storage
1140

    
1141
  if storage_type == constants.ST_FILE:
1142
    return [[cfg.GetFileStorageDir()]]
1143
  elif storage_type == constants.ST_SHARED_FILE:
1144
    dts = cfg.GetClusterInfo().enabled_disk_templates
1145
    paths = []
1146
    if constants.DT_SHARED_FILE in dts:
1147
      paths.append(cfg.GetSharedFileStorageDir())
1148
    if constants.DT_GLUSTER in dts:
1149
      paths.append(cfg.GetGlusterStorageDir())
1150
    return [paths]
1151
  else:
1152
    return []
1153

    
1154

    
1155
class LUNodeModifyStorage(NoHooksLU):
1156
  """Logical unit for modifying a storage volume on a node.
1157

1158
  """
1159
  REQ_BGL = False
1160

    
1161
  def CheckArguments(self):
1162
    (self.op.node_uuid, self.op.node_name) = \
1163
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1164

    
1165
    storage_type = self.op.storage_type
1166

    
1167
    try:
1168
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1169
    except KeyError:
1170
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1171
                                 " modified" % storage_type,
1172
                                 errors.ECODE_INVAL)
1173

    
1174
    diff = set(self.op.changes.keys()) - modifiable
1175
    if diff:
1176
      raise errors.OpPrereqError("The following fields can not be modified for"
1177
                                 " storage units of type '%s': %r" %
1178
                                 (storage_type, list(diff)),
1179
                                 errors.ECODE_INVAL)
1180

    
1181
  def CheckPrereq(self):
1182
    """Check prerequisites.
1183

1184
    """
1185
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1186

    
1187
  def ExpandNames(self):
1188
    self.needed_locks = {
1189
      locking.LEVEL_NODE: self.op.node_uuid,
1190
      }
1191

    
1192
  def Exec(self, feedback_fn):
1193
    """Computes the list of nodes and their attributes.
1194

1195
    """
1196
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1197
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1198
                                          self.op.storage_type, st_args,
1199
                                          self.op.name, self.op.changes)
1200
    result.Raise("Failed to modify storage unit '%s' on %s" %
1201
                 (self.op.name, self.op.node_name))
1202

    
1203

    
1204
def _CheckOutputFields(fields, selected):
1205
  """Checks whether all selected fields are valid according to fields.
1206

1207
  @type fields: L{utils.FieldSet}
1208
  @param fields: fields set
1209
  @type selected: L{utils.FieldSet}
1210
  @param selected: fields set
1211

1212
  """
1213
  delta = fields.NonMatching(selected)
1214
  if delta:
1215
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1216
                               % ",".join(delta), errors.ECODE_INVAL)
1217

    
1218

    
1219
class LUNodeQueryvols(NoHooksLU):
1220
  """Logical unit for getting volumes on node(s).
1221

1222
  """
1223
  REQ_BGL = False
1224

    
1225
  def CheckArguments(self):
1226
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1227
                                      constants.VF_VG, constants.VF_NAME,
1228
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1229
                       self.op.output_fields)
1230

    
1231
  def ExpandNames(self):
1232
    self.share_locks = ShareAll()
1233

    
1234
    if self.op.nodes:
1235
      self.needed_locks = {
1236
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1237
        }
1238
    else:
1239
      self.needed_locks = {
1240
        locking.LEVEL_NODE: locking.ALL_SET,
1241
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1242
        }
1243

    
1244
  def Exec(self, feedback_fn):
1245
    """Computes the list of nodes and their attributes.
1246

1247
    """
1248
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1249
    volumes = self.rpc.call_node_volumes(node_uuids)
1250

    
1251
    ilist = self.cfg.GetAllInstancesInfo()
1252
    vol2inst = MapInstanceLvsToNodes(ilist.values())
1253

    
1254
    output = []
1255
    for node_uuid in node_uuids:
1256
      nresult = volumes[node_uuid]
1257
      if nresult.offline:
1258
        continue
1259
      msg = nresult.fail_msg
1260
      if msg:
1261
        self.LogWarning("Can't compute volume data on node %s: %s",
1262
                        self.cfg.GetNodeName(node_uuid), msg)
1263
        continue
1264

    
1265
      node_vols = sorted(nresult.payload,
1266
                         key=operator.itemgetter(constants.VF_DEV))
1267

    
1268
      for vol in node_vols:
1269
        node_output = []
1270
        for field in self.op.output_fields:
1271
          if field == constants.VF_NODE:
1272
            val = self.cfg.GetNodeName(node_uuid)
1273
          elif field == constants.VF_PHYS:
1274
            val = vol[constants.VF_DEV]
1275
          elif field == constants.VF_VG:
1276
            val = vol[constants.VF_VG]
1277
          elif field == constants.VF_NAME:
1278
            val = vol[constants.VF_NAME]
1279
          elif field == constants.VF_SIZE:
1280
            val = int(float(vol[constants.VF_SIZE]))
1281
          elif field == constants.VF_INSTANCE:
1282
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1283
                                 vol[constants.VF_NAME]), None)
1284
            if inst is not None:
1285
              val = inst.name
1286
            else:
1287
              val = "-"
1288
          else:
1289
            raise errors.ParameterError(field)
1290
          node_output.append(str(val))
1291

    
1292
        output.append(node_output)
1293

    
1294
    return output
1295

    
1296

    
1297
class LUNodeQueryStorage(NoHooksLU):
1298
  """Logical unit for getting information on storage units on node(s).
1299

1300
  """
1301
  REQ_BGL = False
1302

    
1303
  def CheckArguments(self):
1304
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1305
                       self.op.output_fields)
1306

    
1307
  def ExpandNames(self):
1308
    self.share_locks = ShareAll()
1309

    
1310
    if self.op.nodes:
1311
      self.needed_locks = {
1312
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1313
        }
1314
    else:
1315
      self.needed_locks = {
1316
        locking.LEVEL_NODE: locking.ALL_SET,
1317
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1318
        }
1319

    
1320
  def _DetermineStorageType(self):
1321
    """Determines the default storage type of the cluster.
1322

1323
    """
1324
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1325
    default_storage_type = \
1326
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1327
    return default_storage_type
1328

    
1329
  def CheckPrereq(self):
1330
    """Check prerequisites.
1331

1332
    """
1333
    if self.op.storage_type:
1334
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1335
      self.storage_type = self.op.storage_type
1336
    else:
1337
      self.storage_type = self._DetermineStorageType()
1338
      supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1339
      if self.storage_type not in supported_storage_types:
1340
        raise errors.OpPrereqError(
1341
            "Storage reporting for storage type '%s' is not supported. Please"
1342
            " use the --storage-type option to specify one of the supported"
1343
            " storage types (%s) or set the default disk template to one that"
1344
            " supports storage reporting." %
1345
            (self.storage_type, utils.CommaJoin(supported_storage_types)))
1346

    
1347
  def Exec(self, feedback_fn):
1348
    """Computes the list of nodes and their attributes.
1349

1350
    """
1351
    if self.op.storage_type:
1352
      self.storage_type = self.op.storage_type
1353
    else:
1354
      self.storage_type = self._DetermineStorageType()
1355

    
1356
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1357

    
1358
    # Always get name to sort by
1359
    if constants.SF_NAME in self.op.output_fields:
1360
      fields = self.op.output_fields[:]
1361
    else:
1362
      fields = [constants.SF_NAME] + self.op.output_fields
1363

    
1364
    # Never ask for node or type as it's only known to the LU
1365
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1366
      while extra in fields:
1367
        fields.remove(extra)
1368

    
1369
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1370
    name_idx = field_idx[constants.SF_NAME]
1371

    
1372
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1373
    data = self.rpc.call_storage_list(self.node_uuids,
1374
                                      self.storage_type, st_args,
1375
                                      self.op.name, fields)
1376

    
1377
    result = []
1378

    
1379
    for node_uuid in utils.NiceSort(self.node_uuids):
1380
      node_name = self.cfg.GetNodeName(node_uuid)
1381
      nresult = data[node_uuid]
1382
      if nresult.offline:
1383
        continue
1384

    
1385
      msg = nresult.fail_msg
1386
      if msg:
1387
        self.LogWarning("Can't get storage data from node %s: %s",
1388
                        node_name, msg)
1389
        continue
1390

    
1391
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1392

    
1393
      for name in utils.NiceSort(rows.keys()):
1394
        row = rows[name]
1395

    
1396
        out = []
1397

    
1398
        for field in self.op.output_fields:
1399
          if field == constants.SF_NODE:
1400
            val = node_name
1401
          elif field == constants.SF_TYPE:
1402
            val = self.storage_type
1403
          elif field in field_idx:
1404
            val = row[field_idx[field]]
1405
          else:
1406
            raise errors.ParameterError(field)
1407

    
1408
          out.append(val)
1409

    
1410
        result.append(out)
1411

    
1412
    return result
1413

    
1414

    
1415
class LUNodeRemove(LogicalUnit):
1416
  """Logical unit for removing a node.
1417

1418
  """
1419
  HPATH = "node-remove"
1420
  HTYPE = constants.HTYPE_NODE
1421

    
1422
  def BuildHooksEnv(self):
1423
    """Build hooks env.
1424

1425
    """
1426
    return {
1427
      "OP_TARGET": self.op.node_name,
1428
      "NODE_NAME": self.op.node_name,
1429
      }
1430

    
1431
  def BuildHooksNodes(self):
1432
    """Build hooks nodes.
1433

1434
    This doesn't run on the target node in the pre phase as a failed
1435
    node would then be impossible to remove.
1436

1437
    """
1438
    all_nodes = self.cfg.GetNodeList()
1439
    try:
1440
      all_nodes.remove(self.op.node_uuid)
1441
    except ValueError:
1442
      pass
1443
    return (all_nodes, all_nodes)
1444

    
1445
  def CheckPrereq(self):
1446
    """Check prerequisites.
1447

1448
    This checks:
1449
     - the node exists in the configuration
1450
     - it does not have primary or secondary instances
1451
     - it's not the master
1452

1453
    Any errors are signaled by raising errors.OpPrereqError.
1454

1455
    """
1456
    (self.op.node_uuid, self.op.node_name) = \
1457
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1458
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1459
    assert node is not None
1460

    
1461
    masternode = self.cfg.GetMasterNode()
1462
    if node.uuid == masternode:
1463
      raise errors.OpPrereqError("Node is the master node, failover to another"
1464
                                 " node is required", errors.ECODE_INVAL)
1465

    
1466
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1467
      if node.uuid in instance.all_nodes:
1468
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1469
                                   " please remove first" % instance.name,
1470
                                   errors.ECODE_INVAL)
1471
    self.op.node_name = node.name
1472
    self.node = node
1473

    
1474
  def Exec(self, feedback_fn):
1475
    """Removes the node from the cluster.
1476

1477
    """
1478
    logging.info("Stopping the node daemon and removing configs from node %s",
1479
                 self.node.name)
1480

    
1481
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1482

    
1483
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1484
      "Not owning BGL"
1485

    
1486
    # Promote nodes to master candidate as needed
1487
    AdjustCandidatePool(self, [self.node.uuid], feedback_fn)
1488
    self.context.RemoveNode(self.node)
1489

    
1490
    # Run post hooks on the node before it's removed
1491
    RunPostHook(self, self.node.name)
1492

    
1493
    # we have to call this by name rather than by UUID, as the node is no longer
1494
    # in the config
1495
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1496
    msg = result.fail_msg
1497
    if msg:
1498
      self.LogWarning("Errors encountered on the remote node while leaving"
1499
                      " the cluster: %s", msg)
1500

    
1501
    cluster = self.cfg.GetClusterInfo()
1502

    
1503
    # Remove node from candidate certificate list
1504
    if self.node.master_candidate:
1505
      utils.RemoveNodeFromCandidateCerts(self.node.uuid,
1506
                                         cluster.candidate_certs)
1507
      self.cfg.Update(cluster, feedback_fn)
1508

    
1509
    # Remove node from our /etc/hosts
1510
    if cluster.modify_etc_hosts:
1511
      master_node_uuid = self.cfg.GetMasterNode()
1512
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1513
                                              constants.ETC_HOSTS_REMOVE,
1514
                                              self.node.name, None)
1515
      result.Raise("Can't update hosts file with new host data")
1516
      RedistributeAncillaryFiles(self)
1517

    
1518

    
1519
class LURepairNodeStorage(NoHooksLU):
1520
  """Repairs the volume group on a node.
1521

1522
  """
1523
  REQ_BGL = False
1524

    
1525
  def CheckArguments(self):
1526
    (self.op.node_uuid, self.op.node_name) = \
1527
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1528

    
1529
    storage_type = self.op.storage_type
1530

    
1531
    if (constants.SO_FIX_CONSISTENCY not in
1532
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1533
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1534
                                 " repaired" % storage_type,
1535
                                 errors.ECODE_INVAL)
1536

    
1537
  def ExpandNames(self):
1538
    self.needed_locks = {
1539
      locking.LEVEL_NODE: [self.op.node_uuid],
1540
      }
1541

    
1542
  def _CheckFaultyDisks(self, instance, node_uuid):
1543
    """Ensure faulty disks abort the opcode or at least warn."""
1544
    try:
1545
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1546
                                 node_uuid, True):
1547
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1548
                                   " node '%s'" %
1549
                                   (instance.name,
1550
                                    self.cfg.GetNodeName(node_uuid)),
1551
                                   errors.ECODE_STATE)
1552
    except errors.OpPrereqError, err:
1553
      if self.op.ignore_consistency:
1554
        self.LogWarning(str(err.args[0]))
1555
      else:
1556
        raise
1557

    
1558
  def CheckPrereq(self):
1559
    """Check prerequisites.
1560

1561
    """
1562
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1563

    
1564
    # Check whether any instance on this node has faulty disks
1565
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1566
      if not inst.disks_active:
1567
        continue
1568
      check_nodes = set(inst.all_nodes)
1569
      check_nodes.discard(self.op.node_uuid)
1570
      for inst_node_uuid in check_nodes:
1571
        self._CheckFaultyDisks(inst, inst_node_uuid)
1572

    
1573
  def Exec(self, feedback_fn):
1574
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1575
                (self.op.name, self.op.node_name))
1576

    
1577
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1578
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1579
                                           self.op.storage_type, st_args,
1580
                                           self.op.name,
1581
                                           constants.SO_FIX_CONSISTENCY)
1582
    result.Raise("Failed to repair storage unit '%s' on %s" %
1583
                 (self.op.name, self.op.node_name))