Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / node.py @ 809a055b

History | View | Annotate | Download (58.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with nodes."""
23

    
24
import logging
25
import operator
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import locking
30
from ganeti import netutils
31
from ganeti import objects
32
from ganeti import opcodes
33
import ganeti.rpc.node as rpc
34
from ganeti import utils
35
from ganeti.masterd import iallocator
36

    
37
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
38
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
39
  MergeAndVerifyHvState, MergeAndVerifyDiskState, \
40
  IsExclusiveStorageEnabledNode, CheckNodePVs, \
41
  RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
42
  CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
43
  AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
44
  GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
45
  FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
46
  AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts
47

    
48

    
49
def _DecideSelfPromotion(lu, exceptions=None):
50
  """Decide whether I should promote myself as a master candidate.
51

52
  """
53
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
54
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
55
  # the new node will increase mc_max with one, so:
56
  mc_should = min(mc_should + 1, cp_size)
57
  return mc_now < mc_should
58

    
59

    
60
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
61
  """Ensure that a node has the given secondary ip.
62

63
  @type lu: L{LogicalUnit}
64
  @param lu: the LU on behalf of which we make the check
65
  @type node: L{objects.Node}
66
  @param node: the node to check
67
  @type secondary_ip: string
68
  @param secondary_ip: the ip to check
69
  @type prereq: boolean
70
  @param prereq: whether to throw a prerequisite or an execute error
71
  @raise errors.OpPrereqError: if the node doesn't have the ip,
72
  and prereq=True
73
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
74

75
  """
76
  # this can be called with a new node, which has no UUID yet, so perform the
77
  # RPC call using its name
78
  result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
79
  result.Raise("Failure checking secondary ip on node %s" % node.name,
80
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
81
  if not result.payload:
82
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
83
           " please fix and re-run this command" % secondary_ip)
84
    if prereq:
85
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
86
    else:
87
      raise errors.OpExecError(msg)
88

    
89

    
90
class LUNodeAdd(LogicalUnit):
91
  """Logical unit for adding node to the cluster.
92

93
  """
94
  HPATH = "node-add"
95
  HTYPE = constants.HTYPE_NODE
96
  _NFLAGS = ["master_capable", "vm_capable"]
97

    
98
  def CheckArguments(self):
99
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
100
    # validate/normalize the node name
101
    self.hostname = netutils.GetHostname(name=self.op.node_name,
102
                                         family=self.primary_ip_family)
103
    self.op.node_name = self.hostname.name
104

    
105
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
106
      raise errors.OpPrereqError("Cannot readd the master node",
107
                                 errors.ECODE_STATE)
108

    
109
    if self.op.readd and self.op.group:
110
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
111
                                 " being readded", errors.ECODE_INVAL)
112

    
113
  def BuildHooksEnv(self):
114
    """Build hooks env.
115

116
    This will run on all nodes before, and on all nodes + the new node after.
117

118
    """
119
    return {
120
      "OP_TARGET": self.op.node_name,
121
      "NODE_NAME": self.op.node_name,
122
      "NODE_PIP": self.op.primary_ip,
123
      "NODE_SIP": self.op.secondary_ip,
124
      "MASTER_CAPABLE": str(self.op.master_capable),
125
      "VM_CAPABLE": str(self.op.vm_capable),
126
      }
127

    
128
  def BuildHooksNodes(self):
129
    """Build hooks nodes.
130

131
    """
132
    hook_nodes = self.cfg.GetNodeList()
133
    new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
134
    if new_node_info is not None:
135
      # Exclude added node
136
      hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
137

    
138
    # add the new node as post hook node by name; it does not have an UUID yet
139
    return (hook_nodes, hook_nodes)
140

    
141
  def PreparePostHookNodes(self, post_hook_node_uuids):
142
    return post_hook_node_uuids + [self.new_node.uuid]
143

    
144
  def CheckPrereq(self):
145
    """Check prerequisites.
146

147
    This checks:
148
     - the new node is not already in the config
149
     - it is resolvable
150
     - its parameters (single/dual homed) matches the cluster
151

152
    Any errors are signaled by raising errors.OpPrereqError.
153

154
    """
155
    node_name = self.hostname.name
156
    self.op.primary_ip = self.hostname.ip
157
    if self.op.secondary_ip is None:
158
      if self.primary_ip_family == netutils.IP6Address.family:
159
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
160
                                   " IPv4 address must be given as secondary",
161
                                   errors.ECODE_INVAL)
162
      self.op.secondary_ip = self.op.primary_ip
163

    
164
    secondary_ip = self.op.secondary_ip
165
    if not netutils.IP4Address.IsValid(secondary_ip):
166
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
167
                                 " address" % secondary_ip, errors.ECODE_INVAL)
168

    
169
    existing_node_info = self.cfg.GetNodeInfoByName(node_name)
170
    if not self.op.readd and existing_node_info is not None:
171
      raise errors.OpPrereqError("Node %s is already in the configuration" %
172
                                 node_name, errors.ECODE_EXISTS)
173
    elif self.op.readd and existing_node_info is None:
174
      raise errors.OpPrereqError("Node %s is not in the configuration" %
175
                                 node_name, errors.ECODE_NOENT)
176

    
177
    self.changed_primary_ip = False
178

    
179
    for existing_node in self.cfg.GetAllNodesInfo().values():
180
      if self.op.readd and node_name == existing_node.name:
181
        if existing_node.secondary_ip != secondary_ip:
182
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
183
                                     " address configuration as before",
184
                                     errors.ECODE_INVAL)
185
        if existing_node.primary_ip != self.op.primary_ip:
186
          self.changed_primary_ip = True
187

    
188
        continue
189

    
190
      if (existing_node.primary_ip == self.op.primary_ip or
191
          existing_node.secondary_ip == self.op.primary_ip or
192
          existing_node.primary_ip == secondary_ip or
193
          existing_node.secondary_ip == secondary_ip):
194
        raise errors.OpPrereqError("New node ip address(es) conflict with"
195
                                   " existing node %s" % existing_node.name,
196
                                   errors.ECODE_NOTUNIQUE)
197

    
198
    # After this 'if' block, None is no longer a valid value for the
199
    # _capable op attributes
200
    if self.op.readd:
201
      assert existing_node_info is not None, \
202
        "Can't retrieve locked node %s" % node_name
203
      for attr in self._NFLAGS:
204
        if getattr(self.op, attr) is None:
205
          setattr(self.op, attr, getattr(existing_node_info, attr))
206
    else:
207
      for attr in self._NFLAGS:
208
        if getattr(self.op, attr) is None:
209
          setattr(self.op, attr, True)
210

    
211
    if self.op.readd and not self.op.vm_capable:
212
      pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
213
      if pri or sec:
214
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
215
                                   " flag set to false, but it already holds"
216
                                   " instances" % node_name,
217
                                   errors.ECODE_STATE)
218

    
219
    # check that the type of the node (single versus dual homed) is the
220
    # same as for the master
221
    myself = self.cfg.GetMasterNodeInfo()
222
    master_singlehomed = myself.secondary_ip == myself.primary_ip
223
    newbie_singlehomed = secondary_ip == self.op.primary_ip
224
    if master_singlehomed != newbie_singlehomed:
225
      if master_singlehomed:
226
        raise errors.OpPrereqError("The master has no secondary ip but the"
227
                                   " new node has one",
228
                                   errors.ECODE_INVAL)
229
      else:
230
        raise errors.OpPrereqError("The master has a secondary ip but the"
231
                                   " new node doesn't have one",
232
                                   errors.ECODE_INVAL)
233

    
234
    # checks reachability
235
    if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
236
      raise errors.OpPrereqError("Node not reachable by ping",
237
                                 errors.ECODE_ENVIRON)
238

    
239
    if not newbie_singlehomed:
240
      # check reachability from my secondary ip to newbie's secondary ip
241
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
242
                              source=myself.secondary_ip):
243
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
244
                                   " based ping to node daemon port",
245
                                   errors.ECODE_ENVIRON)
246

    
247
    if self.op.readd:
248
      exceptions = [existing_node_info.uuid]
249
    else:
250
      exceptions = []
251

    
252
    if self.op.master_capable:
253
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
254
    else:
255
      self.master_candidate = False
256

    
257
    node_group = self.cfg.LookupNodeGroup(self.op.group)
258

    
259
    if self.op.readd:
260
      self.new_node = existing_node_info
261
    else:
262
      self.new_node = objects.Node(name=node_name,
263
                                   primary_ip=self.op.primary_ip,
264
                                   secondary_ip=secondary_ip,
265
                                   master_candidate=self.master_candidate,
266
                                   offline=False, drained=False,
267
                                   group=node_group, ndparams={})
268

    
269
    if self.op.ndparams:
270
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
271
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
272
                           "node", "cluster or group")
273

    
274
    if self.op.hv_state:
275
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
276

    
277
    if self.op.disk_state:
278
      self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
279

    
280
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
281
    #       it a property on the base class.
282
    rpcrunner = rpc.DnsOnlyRunner()
283
    result = rpcrunner.call_version([node_name])[node_name]
284
    result.Raise("Can't get version information from node %s" % node_name)
285
    if constants.PROTOCOL_VERSION == result.payload:
286
      logging.info("Communication to node %s fine, sw version %s match",
287
                   node_name, result.payload)
288
    else:
289
      raise errors.OpPrereqError("Version mismatch master version %s,"
290
                                 " node version %s" %
291
                                 (constants.PROTOCOL_VERSION, result.payload),
292
                                 errors.ECODE_ENVIRON)
293

    
294
    vg_name = self.cfg.GetVGName()
295
    if vg_name is not None:
296
      vparams = {constants.NV_PVLIST: [vg_name]}
297
      excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
298
      cname = self.cfg.GetClusterName()
299
      result = rpcrunner.call_node_verify_light(
300
          [node_name], vparams, cname,
301
          self.cfg.GetClusterInfo().hvparams,
302
          {node_name: node_group},
303
          self.cfg.GetAllNodeGroupsInfoDict()
304
        )[node_name]
305
      (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
306
      if errmsgs:
307
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
308
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
309

    
310
  def _InitOpenVSwitch(self):
311
    filled_ndparams = self.cfg.GetClusterInfo().FillND(
312
      self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
313

    
314
    ovs = filled_ndparams.get(constants.ND_OVS, None)
315
    ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
316
    ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
317

    
318
    if ovs:
319
      if not ovs_link:
320
        self.LogInfo("No physical interface for OpenvSwitch was given."
321
                     " OpenvSwitch will not have an outside connection. This"
322
                     " might not be what you want.")
323

    
324
      result = self.rpc.call_node_configure_ovs(
325
                 self.new_node.name, ovs_name, ovs_link)
326
      result.Raise("Failed to initialize OpenVSwitch on new node")
327

    
328
  def Exec(self, feedback_fn):
329
    """Adds the new node to the cluster.
330

331
    """
332
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
333
      "Not owning BGL"
334

    
335
    # We adding a new node so we assume it's powered
336
    self.new_node.powered = True
337

    
338
    # for re-adds, reset the offline/drained/master-candidate flags;
339
    # we need to reset here, otherwise offline would prevent RPC calls
340
    # later in the procedure; this also means that if the re-add
341
    # fails, we are left with a non-offlined, broken node
342
    if self.op.readd:
343
      self.new_node.offline = False
344
      self.new_node.drained = False
345
      self.LogInfo("Readding a node, the offline/drained flags were reset")
346
      # if we demote the node, we do cleanup later in the procedure
347
      self.new_node.master_candidate = self.master_candidate
348
      if self.changed_primary_ip:
349
        self.new_node.primary_ip = self.op.primary_ip
350

    
351
    # copy the master/vm_capable flags
352
    for attr in self._NFLAGS:
353
      setattr(self.new_node, attr, getattr(self.op, attr))
354

    
355
    # notify the user about any possible mc promotion
356
    if self.new_node.master_candidate:
357
      self.LogInfo("Node will be a master candidate")
358

    
359
    if self.op.ndparams:
360
      self.new_node.ndparams = self.op.ndparams
361
    else:
362
      self.new_node.ndparams = {}
363

    
364
    if self.op.hv_state:
365
      self.new_node.hv_state_static = self.new_hv_state
366

    
367
    if self.op.disk_state:
368
      self.new_node.disk_state_static = self.new_disk_state
369

    
370
    # Add node to our /etc/hosts, and add key to known_hosts
371
    if self.cfg.GetClusterInfo().modify_etc_hosts:
372
      master_node = self.cfg.GetMasterNode()
373
      result = self.rpc.call_etc_hosts_modify(
374
                 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
375
                 self.hostname.ip)
376
      result.Raise("Can't update hosts file with new host data")
377

    
378
    if self.new_node.secondary_ip != self.new_node.primary_ip:
379
      _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
380
                               False)
381

    
382
    node_verifier_uuids = [self.cfg.GetMasterNode()]
383
    node_verify_param = {
384
      constants.NV_NODELIST: ([self.new_node.name], {}),
385
      # TODO: do a node-net-test as well?
386
    }
387

    
388
    result = self.rpc.call_node_verify(
389
               node_verifier_uuids, node_verify_param,
390
               self.cfg.GetClusterName(),
391
               self.cfg.GetClusterInfo().hvparams,
392
               {self.new_node.name: self.cfg.LookupNodeGroup(self.op.group)},
393
               self.cfg.GetAllNodeGroupsInfoDict()
394
               )
395
    for verifier in node_verifier_uuids:
396
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
397
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
398
      if nl_payload:
399
        for failed in nl_payload:
400
          feedback_fn("ssh/hostname verification failed"
401
                      " (checking from %s): %s" %
402
                      (verifier, nl_payload[failed]))
403
        raise errors.OpExecError("ssh/hostname verification failed")
404

    
405
    self._InitOpenVSwitch()
406

    
407
    if self.op.readd:
408
      self.context.ReaddNode(self.new_node)
409
      RedistributeAncillaryFiles(self)
410
      # make sure we redistribute the config
411
      self.cfg.Update(self.new_node, feedback_fn)
412
      # and make sure the new node will not have old files around
413
      if not self.new_node.master_candidate:
414
        result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
415
        result.Warn("Node failed to demote itself from master candidate status",
416
                    self.LogWarning)
417
    else:
418
      self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
419
      RedistributeAncillaryFiles(self)
420

    
421
    # We create a new certificate even if the node is readded
422
    digest = CreateNewClientCert(self, self.new_node.uuid)
423
    if self.new_node.master_candidate:
424
      self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
425
    else:
426
      self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
427

    
428

    
429
class LUNodeSetParams(LogicalUnit):
430
  """Modifies the parameters of a node.
431

432
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
433
      to the node role (as _ROLE_*)
434
  @cvar _R2F: a dictionary from node role to tuples of flags
435
  @cvar _FLAGS: a list of attribute names corresponding to the flags
436

437
  """
438
  HPATH = "node-modify"
439
  HTYPE = constants.HTYPE_NODE
440
  REQ_BGL = False
441
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
442
  _F2R = {
443
    (True, False, False): _ROLE_CANDIDATE,
444
    (False, True, False): _ROLE_DRAINED,
445
    (False, False, True): _ROLE_OFFLINE,
446
    (False, False, False): _ROLE_REGULAR,
447
    }
448
  _R2F = dict((v, k) for k, v in _F2R.items())
449
  _FLAGS = ["master_candidate", "drained", "offline"]
450

    
451
  def CheckArguments(self):
452
    (self.op.node_uuid, self.op.node_name) = \
453
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
454
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
455
                self.op.master_capable, self.op.vm_capable,
456
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
457
                self.op.disk_state]
458
    if all_mods.count(None) == len(all_mods):
459
      raise errors.OpPrereqError("Please pass at least one modification",
460
                                 errors.ECODE_INVAL)
461
    if all_mods.count(True) > 1:
462
      raise errors.OpPrereqError("Can't set the node into more than one"
463
                                 " state at the same time",
464
                                 errors.ECODE_INVAL)
465

    
466
    # Boolean value that tells us whether we might be demoting from MC
467
    self.might_demote = (self.op.master_candidate is False or
468
                         self.op.offline is True or
469
                         self.op.drained is True or
470
                         self.op.master_capable is False)
471

    
472
    if self.op.secondary_ip:
473
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
474
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
475
                                   " address" % self.op.secondary_ip,
476
                                   errors.ECODE_INVAL)
477

    
478
    self.lock_all = self.op.auto_promote and self.might_demote
479
    self.lock_instances = self.op.secondary_ip is not None
480

    
481
  def _InstanceFilter(self, instance):
482
    """Filter for getting affected instances.
483

484
    """
485
    return (instance.disk_template in constants.DTS_INT_MIRROR and
486
            self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
487

    
488
  def ExpandNames(self):
489
    if self.lock_all:
490
      self.needed_locks = {
491
        locking.LEVEL_NODE: locking.ALL_SET,
492

    
493
        # Block allocations when all nodes are locked
494
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
495
        }
496
    else:
497
      self.needed_locks = {
498
        locking.LEVEL_NODE: self.op.node_uuid,
499
        }
500

    
501
    # Since modifying a node can have severe effects on currently running
502
    # operations the resource lock is at least acquired in shared mode
503
    self.needed_locks[locking.LEVEL_NODE_RES] = \
504
      self.needed_locks[locking.LEVEL_NODE]
505

    
506
    # Get all locks except nodes in shared mode; they are not used for anything
507
    # but read-only access
508
    self.share_locks = ShareAll()
509
    self.share_locks[locking.LEVEL_NODE] = 0
510
    self.share_locks[locking.LEVEL_NODE_RES] = 0
511
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
512

    
513
    if self.lock_instances:
514
      self.needed_locks[locking.LEVEL_INSTANCE] = \
515
        self.cfg.GetInstanceNames(
516
          self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
517

    
518
  def BuildHooksEnv(self):
519
    """Build hooks env.
520

521
    This runs on the master node.
522

523
    """
524
    return {
525
      "OP_TARGET": self.op.node_name,
526
      "MASTER_CANDIDATE": str(self.op.master_candidate),
527
      "OFFLINE": str(self.op.offline),
528
      "DRAINED": str(self.op.drained),
529
      "MASTER_CAPABLE": str(self.op.master_capable),
530
      "VM_CAPABLE": str(self.op.vm_capable),
531
      }
532

    
533
  def BuildHooksNodes(self):
534
    """Build hooks nodes.
535

536
    """
537
    nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
538
    return (nl, nl)
539

    
540
  def CheckPrereq(self):
541
    """Check prerequisites.
542

543
    This only checks the instance list against the existing names.
544

545
    """
546
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
547
    if self.lock_instances:
548
      affected_instances = \
549
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
550

    
551
      # Verify instance locks
552
      owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
553
      wanted_instance_names = frozenset([inst.name for inst in
554
                                         affected_instances.values()])
555
      if wanted_instance_names - owned_instance_names:
556
        raise errors.OpPrereqError("Instances affected by changing node %s's"
557
                                   " secondary IP address have changed since"
558
                                   " locks were acquired, wanted '%s', have"
559
                                   " '%s'; retry the operation" %
560
                                   (node.name,
561
                                    utils.CommaJoin(wanted_instance_names),
562
                                    utils.CommaJoin(owned_instance_names)),
563
                                   errors.ECODE_STATE)
564
    else:
565
      affected_instances = None
566

    
567
    if (self.op.master_candidate is not None or
568
        self.op.drained is not None or
569
        self.op.offline is not None):
570
      # we can't change the master's node flags
571
      if node.uuid == self.cfg.GetMasterNode():
572
        raise errors.OpPrereqError("The master role can be changed"
573
                                   " only via master-failover",
574
                                   errors.ECODE_INVAL)
575

    
576
    if self.op.master_candidate and not node.master_capable:
577
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
578
                                 " it a master candidate" % node.name,
579
                                 errors.ECODE_STATE)
580

    
581
    if self.op.vm_capable is False:
582
      (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
583
      if ipri or isec:
584
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
585
                                   " the vm_capable flag" % node.name,
586
                                   errors.ECODE_STATE)
587

    
588
    if node.master_candidate and self.might_demote and not self.lock_all:
589
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
590
      # check if after removing the current node, we're missing master
591
      # candidates
592
      (mc_remaining, mc_should, _) = \
593
          self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
594
      if mc_remaining < mc_should:
595
        raise errors.OpPrereqError("Not enough master candidates, please"
596
                                   " pass auto promote option to allow"
597
                                   " promotion (--auto-promote or RAPI"
598
                                   " auto_promote=True)", errors.ECODE_STATE)
599

    
600
    self.old_flags = old_flags = (node.master_candidate,
601
                                  node.drained, node.offline)
602
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
603
    self.old_role = old_role = self._F2R[old_flags]
604

    
605
    # Check for ineffective changes
606
    for attr in self._FLAGS:
607
      if getattr(self.op, attr) is False and getattr(node, attr) is False:
608
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
609
        setattr(self.op, attr, None)
610

    
611
    # Past this point, any flag change to False means a transition
612
    # away from the respective state, as only real changes are kept
613

    
614
    # TODO: We might query the real power state if it supports OOB
615
    if SupportsOob(self.cfg, node):
616
      if self.op.offline is False and not (node.powered or
617
                                           self.op.powered is True):
618
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
619
                                    " offline status can be reset") %
620
                                   self.op.node_name, errors.ECODE_STATE)
621
    elif self.op.powered is not None:
622
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
623
                                  " as it does not support out-of-band"
624
                                  " handling") % self.op.node_name,
625
                                 errors.ECODE_STATE)
626

    
627
    # If we're being deofflined/drained, we'll MC ourself if needed
628
    if (self.op.drained is False or self.op.offline is False or
629
        (self.op.master_capable and not node.master_capable)):
630
      if _DecideSelfPromotion(self):
631
        self.op.master_candidate = True
632
        self.LogInfo("Auto-promoting node to master candidate")
633

    
634
    # If we're no longer master capable, we'll demote ourselves from MC
635
    if self.op.master_capable is False and node.master_candidate:
636
      self.LogInfo("Demoting from master candidate")
637
      self.op.master_candidate = False
638

    
639
    # Compute new role
640
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
641
    if self.op.master_candidate:
642
      new_role = self._ROLE_CANDIDATE
643
    elif self.op.drained:
644
      new_role = self._ROLE_DRAINED
645
    elif self.op.offline:
646
      new_role = self._ROLE_OFFLINE
647
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
648
      # False is still in new flags, which means we're un-setting (the
649
      # only) True flag
650
      new_role = self._ROLE_REGULAR
651
    else: # no new flags, nothing, keep old role
652
      new_role = old_role
653

    
654
    self.new_role = new_role
655

    
656
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
657
      # Trying to transition out of offline status
658
      result = self.rpc.call_version([node.uuid])[node.uuid]
659
      if result.fail_msg:
660
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
661
                                   " to report its version: %s" %
662
                                   (node.name, result.fail_msg),
663
                                   errors.ECODE_STATE)
664
      else:
665
        self.LogWarning("Transitioning node from offline to online state"
666
                        " without using re-add. Please make sure the node"
667
                        " is healthy!")
668

    
669
    # When changing the secondary ip, verify if this is a single-homed to
670
    # multi-homed transition or vice versa, and apply the relevant
671
    # restrictions.
672
    if self.op.secondary_ip:
673
      # Ok even without locking, because this can't be changed by any LU
674
      master = self.cfg.GetMasterNodeInfo()
675
      master_singlehomed = master.secondary_ip == master.primary_ip
676
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
677
        if self.op.force and node.uuid == master.uuid:
678
          self.LogWarning("Transitioning from single-homed to multi-homed"
679
                          " cluster; all nodes will require a secondary IP"
680
                          " address")
681
        else:
682
          raise errors.OpPrereqError("Changing the secondary ip on a"
683
                                     " single-homed cluster requires the"
684
                                     " --force option to be passed, and the"
685
                                     " target node to be the master",
686
                                     errors.ECODE_INVAL)
687
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
688
        if self.op.force and node.uuid == master.uuid:
689
          self.LogWarning("Transitioning from multi-homed to single-homed"
690
                          " cluster; secondary IP addresses will have to be"
691
                          " removed")
692
        else:
693
          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
694
                                     " same as the primary IP on a multi-homed"
695
                                     " cluster, unless the --force option is"
696
                                     " passed, and the target node is the"
697
                                     " master", errors.ECODE_INVAL)
698

    
699
      assert not (set([inst.name for inst in affected_instances.values()]) -
700
                  self.owned_locks(locking.LEVEL_INSTANCE))
701

    
702
      if node.offline:
703
        if affected_instances:
704
          msg = ("Cannot change secondary IP address: offline node has"
705
                 " instances (%s) configured to use it" %
706
                 utils.CommaJoin(
707
                   [inst.name for inst in affected_instances.values()]))
708
          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
709
      else:
710
        # On online nodes, check that no instances are running, and that
711
        # the node has the new ip and we can reach it.
712
        for instance in affected_instances.values():
713
          CheckInstanceState(self, instance, INSTANCE_DOWN,
714
                             msg="cannot change secondary ip")
715

    
716
        _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
717
        if master.uuid != node.uuid:
718
          # check reachability from master secondary ip to new secondary ip
719
          if not netutils.TcpPing(self.op.secondary_ip,
720
                                  constants.DEFAULT_NODED_PORT,
721
                                  source=master.secondary_ip):
722
            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
723
                                       " based ping to node daemon port",
724
                                       errors.ECODE_ENVIRON)
725

    
726
    if self.op.ndparams:
727
      new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
728
      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
729
      CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
730
                           "node", "cluster or group")
731
      self.new_ndparams = new_ndparams
732

    
733
    if self.op.hv_state:
734
      self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
735
                                                node.hv_state_static)
736

    
737
    if self.op.disk_state:
738
      self.new_disk_state = \
739
        MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
740

    
741
  def Exec(self, feedback_fn):
742
    """Modifies a node.
743

744
    """
745
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
746
    result = []
747

    
748
    if self.op.ndparams:
749
      node.ndparams = self.new_ndparams
750

    
751
    if self.op.powered is not None:
752
      node.powered = self.op.powered
753

    
754
    if self.op.hv_state:
755
      node.hv_state_static = self.new_hv_state
756

    
757
    if self.op.disk_state:
758
      node.disk_state_static = self.new_disk_state
759

    
760
    for attr in ["master_capable", "vm_capable"]:
761
      val = getattr(self.op, attr)
762
      if val is not None:
763
        setattr(node, attr, val)
764
        result.append((attr, str(val)))
765

    
766
    if self.op.secondary_ip:
767
      node.secondary_ip = self.op.secondary_ip
768
      result.append(("secondary_ip", self.op.secondary_ip))
769

    
770
    # this will trigger configuration file update, if needed
771
    self.cfg.Update(node, feedback_fn)
772

    
773
    if self.new_role != self.old_role:
774
      # Tell the node to demote itself, if no longer MC and not offline
775
      if self.old_role == self._ROLE_CANDIDATE and \
776
          self.new_role != self._ROLE_OFFLINE:
777
        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
778
        if msg:
779
          self.LogWarning("Node failed to demote itself: %s", msg)
780

    
781
      new_flags = self._R2F[self.new_role]
782
      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
783
        if of != nf:
784
          result.append((desc, str(nf)))
785
      (node.master_candidate, node.drained, node.offline) = new_flags
786
      self.cfg.Update(node, feedback_fn)
787

    
788
      # we locked all nodes, we adjust the CP before updating this node
789
      if self.lock_all:
790
        AdjustCandidatePool(self, [node.uuid])
791

    
792
      # if node gets promoted, grant RPC priviledges
793
      if self.new_role == self._ROLE_CANDIDATE:
794
        AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
795
      # if node is demoted, revoke RPC priviledges
796
      if self.old_role == self._ROLE_CANDIDATE:
797
        RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
798

    
799
    # this will trigger job queue propagation or cleanup if the mc
800
    # flag changed
801
    if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
802
      self.context.ReaddNode(node)
803

    
804
    return result
805

    
806

    
807
class LUNodePowercycle(NoHooksLU):
808
  """Powercycles a node.
809

810
  """
811
  REQ_BGL = False
812

    
813
  def CheckArguments(self):
814
    (self.op.node_uuid, self.op.node_name) = \
815
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
816

    
817
    if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
818
      raise errors.OpPrereqError("The node is the master and the force"
819
                                 " parameter was not set",
820
                                 errors.ECODE_INVAL)
821

    
822
  def ExpandNames(self):
823
    """Locking for PowercycleNode.
824

825
    This is a last-resort option and shouldn't block on other
826
    jobs. Therefore, we grab no locks.
827

828
    """
829
    self.needed_locks = {}
830

    
831
  def Exec(self, feedback_fn):
832
    """Reboots a node.
833

834
    """
835
    default_hypervisor = self.cfg.GetHypervisorType()
836
    hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
837
    result = self.rpc.call_node_powercycle(self.op.node_uuid,
838
                                           default_hypervisor,
839
                                           hvparams)
840
    result.Raise("Failed to schedule the reboot")
841
    return result.payload
842

    
843

    
844
def _GetNodeInstancesInner(cfg, fn):
845
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
846

    
847

    
848
def _GetNodePrimaryInstances(cfg, node_uuid):
849
  """Returns primary instances on a node.
850

851
  """
852
  return _GetNodeInstancesInner(cfg,
853
                                lambda inst: node_uuid == inst.primary_node)
854

    
855

    
856
def _GetNodeSecondaryInstances(cfg, node_uuid):
857
  """Returns secondary instances on a node.
858

859
  """
860
  return _GetNodeInstancesInner(cfg,
861
                                lambda inst: node_uuid in
862
                                  cfg.GetInstanceSecondaryNodes(inst.uuid))
863

    
864

    
865
def _GetNodeInstances(cfg, node_uuid):
866
  """Returns a list of all primary and secondary instances on a node.
867

868
  """
869

    
870
  return _GetNodeInstancesInner(cfg,
871
                                lambda inst: node_uuid in
872
                                  cfg.GetInstanceNodes(inst.uuid.uuid))
873

    
874

    
875
class LUNodeEvacuate(NoHooksLU):
876
  """Evacuates instances off a list of nodes.
877

878
  """
879
  REQ_BGL = False
880

    
881
  def CheckArguments(self):
882
    CheckIAllocatorOrNode(self, "iallocator", "remote_node")
883

    
884
  def ExpandNames(self):
885
    (self.op.node_uuid, self.op.node_name) = \
886
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
887

    
888
    if self.op.remote_node is not None:
889
      (self.op.remote_node_uuid, self.op.remote_node) = \
890
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
891
                              self.op.remote_node)
892
      assert self.op.remote_node
893

    
894
      if self.op.node_uuid == self.op.remote_node_uuid:
895
        raise errors.OpPrereqError("Can not use evacuated node as a new"
896
                                   " secondary node", errors.ECODE_INVAL)
897

    
898
      if self.op.mode != constants.NODE_EVAC_SEC:
899
        raise errors.OpPrereqError("Without the use of an iallocator only"
900
                                   " secondary instances can be evacuated",
901
                                   errors.ECODE_INVAL)
902

    
903
    # Declare locks
904
    self.share_locks = ShareAll()
905
    self.needed_locks = {
906
      locking.LEVEL_INSTANCE: [],
907
      locking.LEVEL_NODEGROUP: [],
908
      locking.LEVEL_NODE: [],
909
      }
910

    
911
    # Determine nodes (via group) optimistically, needs verification once locks
912
    # have been acquired
913
    self.lock_nodes = self._DetermineNodes()
914

    
915
  def _DetermineNodes(self):
916
    """Gets the list of node UUIDs to operate on.
917

918
    """
919
    if self.op.remote_node is None:
920
      # Iallocator will choose any node(s) in the same group
921
      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
922
    else:
923
      group_nodes = frozenset([self.op.remote_node_uuid])
924

    
925
    # Determine nodes to be locked
926
    return set([self.op.node_uuid]) | group_nodes
927

    
928
  def _DetermineInstances(self):
929
    """Builds list of instances to operate on.
930

931
    """
932
    assert self.op.mode in constants.NODE_EVAC_MODES
933

    
934
    if self.op.mode == constants.NODE_EVAC_PRI:
935
      # Primary instances only
936
      inst_fn = _GetNodePrimaryInstances
937
      assert self.op.remote_node is None, \
938
        "Evacuating primary instances requires iallocator"
939
    elif self.op.mode == constants.NODE_EVAC_SEC:
940
      # Secondary instances only
941
      inst_fn = _GetNodeSecondaryInstances
942
    else:
943
      # All instances
944
      assert self.op.mode == constants.NODE_EVAC_ALL
945
      inst_fn = _GetNodeInstances
946
      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
947
      # per instance
948
      raise errors.OpPrereqError("Due to an issue with the iallocator"
949
                                 " interface it is not possible to evacuate"
950
                                 " all instances at once; specify explicitly"
951
                                 " whether to evacuate primary or secondary"
952
                                 " instances",
953
                                 errors.ECODE_INVAL)
954

    
955
    return inst_fn(self.cfg, self.op.node_uuid)
956

    
957
  def DeclareLocks(self, level):
958
    if level == locking.LEVEL_INSTANCE:
959
      # Lock instances optimistically, needs verification once node and group
960
      # locks have been acquired
961
      self.needed_locks[locking.LEVEL_INSTANCE] = \
962
        set(i.name for i in self._DetermineInstances())
963

    
964
    elif level == locking.LEVEL_NODEGROUP:
965
      # Lock node groups for all potential target nodes optimistically, needs
966
      # verification once nodes have been acquired
967
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
968
        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
969

    
970
    elif level == locking.LEVEL_NODE:
971
      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
972

    
973
  def CheckPrereq(self):
974
    # Verify locks
975
    owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
976
    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
977
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
978

    
979
    need_nodes = self._DetermineNodes()
980

    
981
    if not owned_nodes.issuperset(need_nodes):
982
      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
983
                                 " locks were acquired, current nodes are"
984
                                 " are '%s', used to be '%s'; retry the"
985
                                 " operation" %
986
                                 (self.op.node_name,
987
                                  utils.CommaJoin(need_nodes),
988
                                  utils.CommaJoin(owned_nodes)),
989
                                 errors.ECODE_STATE)
990

    
991
    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
992
    if owned_groups != wanted_groups:
993
      raise errors.OpExecError("Node groups changed since locks were acquired,"
994
                               " current groups are '%s', used to be '%s';"
995
                               " retry the operation" %
996
                               (utils.CommaJoin(wanted_groups),
997
                                utils.CommaJoin(owned_groups)))
998

    
999
    # Determine affected instances
1000
    self.instances = self._DetermineInstances()
1001
    self.instance_names = [i.name for i in self.instances]
1002

    
1003
    if set(self.instance_names) != owned_instance_names:
1004
      raise errors.OpExecError("Instances on node '%s' changed since locks"
1005
                               " were acquired, current instances are '%s',"
1006
                               " used to be '%s'; retry the operation" %
1007
                               (self.op.node_name,
1008
                                utils.CommaJoin(self.instance_names),
1009
                                utils.CommaJoin(owned_instance_names)))
1010

    
1011
    if self.instance_names:
1012
      self.LogInfo("Evacuating instances from node '%s': %s",
1013
                   self.op.node_name,
1014
                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
1015
    else:
1016
      self.LogInfo("No instances to evacuate from node '%s'",
1017
                   self.op.node_name)
1018

    
1019
    if self.op.remote_node is not None:
1020
      for i in self.instances:
1021
        if i.primary_node == self.op.remote_node_uuid:
1022
          raise errors.OpPrereqError("Node %s is the primary node of"
1023
                                     " instance %s, cannot use it as"
1024
                                     " secondary" %
1025
                                     (self.op.remote_node, i.name),
1026
                                     errors.ECODE_INVAL)
1027

    
1028
  def Exec(self, feedback_fn):
1029
    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1030

    
1031
    if not self.instance_names:
1032
      # No instances to evacuate
1033
      jobs = []
1034

    
1035
    elif self.op.iallocator is not None:
1036
      # TODO: Implement relocation to other group
1037
      req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1038
                                     instances=list(self.instance_names))
1039
      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1040

    
1041
      ial.Run(self.op.iallocator)
1042

    
1043
      if not ial.success:
1044
        raise errors.OpPrereqError("Can't compute node evacuation using"
1045
                                   " iallocator '%s': %s" %
1046
                                   (self.op.iallocator, ial.info),
1047
                                   errors.ECODE_NORES)
1048

    
1049
      jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1050

    
1051
    elif self.op.remote_node is not None:
1052
      assert self.op.mode == constants.NODE_EVAC_SEC
1053
      jobs = [
1054
        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1055
                                        remote_node=self.op.remote_node,
1056
                                        disks=[],
1057
                                        mode=constants.REPLACE_DISK_CHG,
1058
                                        early_release=self.op.early_release)]
1059
        for instance_name in self.instance_names]
1060

    
1061
    else:
1062
      raise errors.ProgrammerError("No iallocator or remote node")
1063

    
1064
    return ResultWithJobs(jobs)
1065

    
1066

    
1067
class LUNodeMigrate(LogicalUnit):
1068
  """Migrate all instances from a node.
1069

1070
  """
1071
  HPATH = "node-migrate"
1072
  HTYPE = constants.HTYPE_NODE
1073
  REQ_BGL = False
1074

    
1075
  def CheckArguments(self):
1076
    pass
1077

    
1078
  def ExpandNames(self):
1079
    (self.op.node_uuid, self.op.node_name) = \
1080
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1081

    
1082
    self.share_locks = ShareAll()
1083
    self.needed_locks = {
1084
      locking.LEVEL_NODE: [self.op.node_uuid],
1085
      }
1086

    
1087
  def BuildHooksEnv(self):
1088
    """Build hooks env.
1089

1090
    This runs on the master, the primary and all the secondaries.
1091

1092
    """
1093
    return {
1094
      "NODE_NAME": self.op.node_name,
1095
      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1096
      }
1097

    
1098
  def BuildHooksNodes(self):
1099
    """Build hooks nodes.
1100

1101
    """
1102
    nl = [self.cfg.GetMasterNode()]
1103
    return (nl, nl)
1104

    
1105
  def CheckPrereq(self):
1106
    pass
1107

    
1108
  def Exec(self, feedback_fn):
1109
    # Prepare jobs for migration instances
1110
    jobs = [
1111
      [opcodes.OpInstanceMigrate(
1112
        instance_name=inst.name,
1113
        mode=self.op.mode,
1114
        live=self.op.live,
1115
        iallocator=self.op.iallocator,
1116
        target_node=self.op.target_node,
1117
        allow_runtime_changes=self.op.allow_runtime_changes,
1118
        ignore_ipolicy=self.op.ignore_ipolicy)]
1119
      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1120

    
1121
    # TODO: Run iallocator in this opcode and pass correct placement options to
1122
    # OpInstanceMigrate. Since other jobs can modify the cluster between
1123
    # running the iallocator and the actual migration, a good consistency model
1124
    # will have to be found.
1125

    
1126
    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1127
            frozenset([self.op.node_uuid]))
1128

    
1129
    return ResultWithJobs(jobs)
1130

    
1131

    
1132
def _GetStorageTypeArgs(cfg, storage_type):
1133
  """Returns the arguments for a storage type.
1134

1135
  """
1136
  # Special case for file storage
1137

    
1138
  if storage_type == constants.ST_FILE:
1139
    return [[cfg.GetFileStorageDir()]]
1140
  elif storage_type == constants.ST_SHARED_FILE:
1141
    dts = cfg.GetClusterInfo().enabled_disk_templates
1142
    paths = []
1143
    if constants.DT_SHARED_FILE in dts:
1144
      paths.append(cfg.GetSharedFileStorageDir())
1145
    if constants.DT_GLUSTER in dts:
1146
      paths.append(cfg.GetGlusterStorageDir())
1147
    return [paths]
1148
  else:
1149
    return []
1150

    
1151

    
1152
class LUNodeModifyStorage(NoHooksLU):
1153
  """Logical unit for modifying a storage volume on a node.
1154

1155
  """
1156
  REQ_BGL = False
1157

    
1158
  def CheckArguments(self):
1159
    (self.op.node_uuid, self.op.node_name) = \
1160
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1161

    
1162
    storage_type = self.op.storage_type
1163

    
1164
    try:
1165
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1166
    except KeyError:
1167
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1168
                                 " modified" % storage_type,
1169
                                 errors.ECODE_INVAL)
1170

    
1171
    diff = set(self.op.changes.keys()) - modifiable
1172
    if diff:
1173
      raise errors.OpPrereqError("The following fields can not be modified for"
1174
                                 " storage units of type '%s': %r" %
1175
                                 (storage_type, list(diff)),
1176
                                 errors.ECODE_INVAL)
1177

    
1178
  def CheckPrereq(self):
1179
    """Check prerequisites.
1180

1181
    """
1182
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1183

    
1184
  def ExpandNames(self):
1185
    self.needed_locks = {
1186
      locking.LEVEL_NODE: self.op.node_uuid,
1187
      }
1188

    
1189
  def Exec(self, feedback_fn):
1190
    """Computes the list of nodes and their attributes.
1191

1192
    """
1193
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1194
    result = self.rpc.call_storage_modify(self.op.node_uuid,
1195
                                          self.op.storage_type, st_args,
1196
                                          self.op.name, self.op.changes)
1197
    result.Raise("Failed to modify storage unit '%s' on %s" %
1198
                 (self.op.name, self.op.node_name))
1199

    
1200

    
1201
def _CheckOutputFields(fields, selected):
1202
  """Checks whether all selected fields are valid according to fields.
1203

1204
  @type fields: L{utils.FieldSet}
1205
  @param fields: fields set
1206
  @type selected: L{utils.FieldSet}
1207
  @param selected: fields set
1208

1209
  """
1210
  delta = fields.NonMatching(selected)
1211
  if delta:
1212
    raise errors.OpPrereqError("Unknown output fields selected: %s"
1213
                               % ",".join(delta), errors.ECODE_INVAL)
1214

    
1215

    
1216
class LUNodeQueryvols(NoHooksLU):
1217
  """Logical unit for getting volumes on node(s).
1218

1219
  """
1220
  REQ_BGL = False
1221

    
1222
  def CheckArguments(self):
1223
    _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1224
                                      constants.VF_VG, constants.VF_NAME,
1225
                                      constants.VF_SIZE, constants.VF_INSTANCE),
1226
                       self.op.output_fields)
1227

    
1228
  def ExpandNames(self):
1229
    self.share_locks = ShareAll()
1230

    
1231
    if self.op.nodes:
1232
      self.needed_locks = {
1233
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1234
        }
1235
    else:
1236
      self.needed_locks = {
1237
        locking.LEVEL_NODE: locking.ALL_SET,
1238
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1239
        }
1240

    
1241
  def Exec(self, feedback_fn):
1242
    """Computes the list of nodes and their attributes.
1243

1244
    """
1245
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1246
    volumes = self.rpc.call_node_volumes(node_uuids)
1247

    
1248
    ilist = self.cfg.GetAllInstancesInfo()
1249
    vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values())
1250

    
1251
    output = []
1252
    for node_uuid in node_uuids:
1253
      nresult = volumes[node_uuid]
1254
      if nresult.offline:
1255
        continue
1256
      msg = nresult.fail_msg
1257
      if msg:
1258
        self.LogWarning("Can't compute volume data on node %s: %s",
1259
                        self.cfg.GetNodeName(node_uuid), msg)
1260
        continue
1261

    
1262
      node_vols = sorted(nresult.payload,
1263
                         key=operator.itemgetter(constants.VF_DEV))
1264

    
1265
      for vol in node_vols:
1266
        node_output = []
1267
        for field in self.op.output_fields:
1268
          if field == constants.VF_NODE:
1269
            val = self.cfg.GetNodeName(node_uuid)
1270
          elif field == constants.VF_PHYS:
1271
            val = vol[constants.VF_DEV]
1272
          elif field == constants.VF_VG:
1273
            val = vol[constants.VF_VG]
1274
          elif field == constants.VF_NAME:
1275
            val = vol[constants.VF_NAME]
1276
          elif field == constants.VF_SIZE:
1277
            val = int(float(vol[constants.VF_SIZE]))
1278
          elif field == constants.VF_INSTANCE:
1279
            inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1280
                                 vol[constants.VF_NAME]), None)
1281
            if inst is not None:
1282
              val = inst.name
1283
            else:
1284
              val = "-"
1285
          else:
1286
            raise errors.ParameterError(field)
1287
          node_output.append(str(val))
1288

    
1289
        output.append(node_output)
1290

    
1291
    return output
1292

    
1293

    
1294
class LUNodeQueryStorage(NoHooksLU):
1295
  """Logical unit for getting information on storage units on node(s).
1296

1297
  """
1298
  REQ_BGL = False
1299

    
1300
  def CheckArguments(self):
1301
    _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1302
                       self.op.output_fields)
1303

    
1304
  def ExpandNames(self):
1305
    self.share_locks = ShareAll()
1306

    
1307
    if self.op.nodes:
1308
      self.needed_locks = {
1309
        locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1310
        }
1311
    else:
1312
      self.needed_locks = {
1313
        locking.LEVEL_NODE: locking.ALL_SET,
1314
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1315
        }
1316

    
1317
  def _DetermineStorageType(self):
1318
    """Determines the default storage type of the cluster.
1319

1320
    """
1321
    enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1322
    default_storage_type = \
1323
        constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1324
    return default_storage_type
1325

    
1326
  def CheckPrereq(self):
1327
    """Check prerequisites.
1328

1329
    """
1330
    if self.op.storage_type:
1331
      CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1332
      self.storage_type = self.op.storage_type
1333
    else:
1334
      self.storage_type = self._DetermineStorageType()
1335
      supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1336
      if self.storage_type not in supported_storage_types:
1337
        raise errors.OpPrereqError(
1338
            "Storage reporting for storage type '%s' is not supported. Please"
1339
            " use the --storage-type option to specify one of the supported"
1340
            " storage types (%s) or set the default disk template to one that"
1341
            " supports storage reporting." %
1342
            (self.storage_type, utils.CommaJoin(supported_storage_types)))
1343

    
1344
  def Exec(self, feedback_fn):
1345
    """Computes the list of nodes and their attributes.
1346

1347
    """
1348
    if self.op.storage_type:
1349
      self.storage_type = self.op.storage_type
1350
    else:
1351
      self.storage_type = self._DetermineStorageType()
1352

    
1353
    self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1354

    
1355
    # Always get name to sort by
1356
    if constants.SF_NAME in self.op.output_fields:
1357
      fields = self.op.output_fields[:]
1358
    else:
1359
      fields = [constants.SF_NAME] + self.op.output_fields
1360

    
1361
    # Never ask for node or type as it's only known to the LU
1362
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1363
      while extra in fields:
1364
        fields.remove(extra)
1365

    
1366
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1367
    name_idx = field_idx[constants.SF_NAME]
1368

    
1369
    st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1370
    data = self.rpc.call_storage_list(self.node_uuids,
1371
                                      self.storage_type, st_args,
1372
                                      self.op.name, fields)
1373

    
1374
    result = []
1375

    
1376
    for node_uuid in utils.NiceSort(self.node_uuids):
1377
      node_name = self.cfg.GetNodeName(node_uuid)
1378
      nresult = data[node_uuid]
1379
      if nresult.offline:
1380
        continue
1381

    
1382
      msg = nresult.fail_msg
1383
      if msg:
1384
        self.LogWarning("Can't get storage data from node %s: %s",
1385
                        node_name, msg)
1386
        continue
1387

    
1388
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1389

    
1390
      for name in utils.NiceSort(rows.keys()):
1391
        row = rows[name]
1392

    
1393
        out = []
1394

    
1395
        for field in self.op.output_fields:
1396
          if field == constants.SF_NODE:
1397
            val = node_name
1398
          elif field == constants.SF_TYPE:
1399
            val = self.storage_type
1400
          elif field in field_idx:
1401
            val = row[field_idx[field]]
1402
          else:
1403
            raise errors.ParameterError(field)
1404

    
1405
          out.append(val)
1406

    
1407
        result.append(out)
1408

    
1409
    return result
1410

    
1411

    
1412
class LUNodeRemove(LogicalUnit):
1413
  """Logical unit for removing a node.
1414

1415
  """
1416
  HPATH = "node-remove"
1417
  HTYPE = constants.HTYPE_NODE
1418

    
1419
  def BuildHooksEnv(self):
1420
    """Build hooks env.
1421

1422
    """
1423
    return {
1424
      "OP_TARGET": self.op.node_name,
1425
      "NODE_NAME": self.op.node_name,
1426
      }
1427

    
1428
  def BuildHooksNodes(self):
1429
    """Build hooks nodes.
1430

1431
    This doesn't run on the target node in the pre phase as a failed
1432
    node would then be impossible to remove.
1433

1434
    """
1435
    all_nodes = self.cfg.GetNodeList()
1436
    try:
1437
      all_nodes.remove(self.op.node_uuid)
1438
    except ValueError:
1439
      pass
1440
    return (all_nodes, all_nodes)
1441

    
1442
  def CheckPrereq(self):
1443
    """Check prerequisites.
1444

1445
    This checks:
1446
     - the node exists in the configuration
1447
     - it does not have primary or secondary instances
1448
     - it's not the master
1449

1450
    Any errors are signaled by raising errors.OpPrereqError.
1451

1452
    """
1453
    (self.op.node_uuid, self.op.node_name) = \
1454
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1455
    node = self.cfg.GetNodeInfo(self.op.node_uuid)
1456
    assert node is not None
1457

    
1458
    masternode = self.cfg.GetMasterNode()
1459
    if node.uuid == masternode:
1460
      raise errors.OpPrereqError("Node is the master node, failover to another"
1461
                                 " node is required", errors.ECODE_INVAL)
1462

    
1463
    for _, instance in self.cfg.GetAllInstancesInfo().items():
1464
      if node.uuid in self.cfg.GetInstanceNodes(instance.uuid):
1465
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1466
                                   " please remove first" % instance.name,
1467
                                   errors.ECODE_INVAL)
1468
    self.op.node_name = node.name
1469
    self.node = node
1470

    
1471
  def Exec(self, feedback_fn):
1472
    """Removes the node from the cluster.
1473

1474
    """
1475
    logging.info("Stopping the node daemon and removing configs from node %s",
1476
                 self.node.name)
1477

    
1478
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1479

    
1480
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1481
      "Not owning BGL"
1482

    
1483
    # Promote nodes to master candidate as needed
1484
    AdjustCandidatePool(self, [self.node.uuid])
1485
    self.context.RemoveNode(self.cfg, self.node)
1486

    
1487
    # Run post hooks on the node before it's removed
1488
    RunPostHook(self, self.node.name)
1489

    
1490
    # we have to call this by name rather than by UUID, as the node is no longer
1491
    # in the config
1492
    result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1493
    msg = result.fail_msg
1494
    if msg:
1495
      self.LogWarning("Errors encountered on the remote node while leaving"
1496
                      " the cluster: %s", msg)
1497

    
1498
    cluster = self.cfg.GetClusterInfo()
1499

    
1500
    # Remove node from candidate certificate list
1501
    if self.node.master_candidate:
1502
      self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1503

    
1504
    # Remove node from our /etc/hosts
1505
    if cluster.modify_etc_hosts:
1506
      master_node_uuid = self.cfg.GetMasterNode()
1507
      result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1508
                                              constants.ETC_HOSTS_REMOVE,
1509
                                              self.node.name, None)
1510
      result.Raise("Can't update hosts file with new host data")
1511
      RedistributeAncillaryFiles(self)
1512

    
1513

    
1514
class LURepairNodeStorage(NoHooksLU):
1515
  """Repairs the volume group on a node.
1516

1517
  """
1518
  REQ_BGL = False
1519

    
1520
  def CheckArguments(self):
1521
    (self.op.node_uuid, self.op.node_name) = \
1522
      ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1523

    
1524
    storage_type = self.op.storage_type
1525

    
1526
    if (constants.SO_FIX_CONSISTENCY not in
1527
        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1528
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1529
                                 " repaired" % storage_type,
1530
                                 errors.ECODE_INVAL)
1531

    
1532
  def ExpandNames(self):
1533
    self.needed_locks = {
1534
      locking.LEVEL_NODE: [self.op.node_uuid],
1535
      }
1536

    
1537
  def _CheckFaultyDisks(self, instance, node_uuid):
1538
    """Ensure faulty disks abort the opcode or at least warn."""
1539
    try:
1540
      if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1541
                                 node_uuid, True):
1542
        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1543
                                   " node '%s'" %
1544
                                   (instance.name,
1545
                                    self.cfg.GetNodeName(node_uuid)),
1546
                                   errors.ECODE_STATE)
1547
    except errors.OpPrereqError, err:
1548
      if self.op.ignore_consistency:
1549
        self.LogWarning(str(err.args[0]))
1550
      else:
1551
        raise
1552

    
1553
  def CheckPrereq(self):
1554
    """Check prerequisites.
1555

1556
    """
1557
    CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1558

    
1559
    # Check whether any instance on this node has faulty disks
1560
    for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1561
      if not inst.disks_active:
1562
        continue
1563
      check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1564
      check_nodes.discard(self.op.node_uuid)
1565
      for inst_node_uuid in check_nodes:
1566
        self._CheckFaultyDisks(inst, inst_node_uuid)
1567

    
1568
  def Exec(self, feedback_fn):
1569
    feedback_fn("Repairing storage unit '%s' on %s ..." %
1570
                (self.op.name, self.op.node_name))
1571

    
1572
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1573
    result = self.rpc.call_storage_execute(self.op.node_uuid,
1574
                                           self.op.storage_type, st_args,
1575
                                           self.op.name,
1576
                                           constants.SO_FIX_CONSISTENCY)
1577
    result.Raise("Failed to repair storage unit '%s' on %s" %
1578
                 (self.op.name, self.op.node_name))