Revision 31b836b8

b/Makefile.am
313 313
	lib/cmdlib/base.py \
314 314
	lib/cmdlib/cluster.py \
315 315
	lib/cmdlib/group.py \
316
	lib/cmdlib/node.py \
316 317
	lib/cmdlib/tags.py \
317 318
	lib/cmdlib/network.py \
318 319
	lib/cmdlib/test.py
b/lib/cmdlib/__init__.py
56 56

  
57 57
from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
58 58
  Tasklet, _QueryBase
59
from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
59
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
60
  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
61
  _ExpandInstanceName, _ExpandItemName, \
60 62
  _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
61 63
  _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
62 64
  _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
......
65 67
  _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
66 68
  _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
67 69
  _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
68
  _CheckInstanceNodeGroups
70
  _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
71
  _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
72
  _CheckIAllocatorOrNode, _FindFaultyInstanceDisks
69 73

  
70 74
from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
71 75
  LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
......
76 80
from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
77 81
  _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
78 82
  LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
83
from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
84
  LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
85
  _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
86
  LUNodeRemove, LURepairNodeStorage
79 87
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
80 88
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
81 89
  LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
......
85 93
import ganeti.masterd.instance # pylint: disable=W0611
86 94

  
87 95

  
88
# States of instance
89
INSTANCE_DOWN = [constants.ADMINST_DOWN]
90
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
91
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
92

  
93
#: Instance status in which an instance can be marked as offline/online
94
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
95
  constants.ADMINST_OFFLINE,
96
  ]))
97

  
98

  
99
def _IsExclusiveStorageEnabledNode(cfg, node):
100
  """Whether exclusive_storage is in effect for the given node.
101

  
102
  @type cfg: L{config.ConfigWriter}
103
  @param cfg: The cluster configuration
104
  @type node: L{objects.Node}
105
  @param node: The node
106
  @rtype: bool
107
  @return: The effective value of exclusive_storage
108

  
109
  """
110
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
111

  
112

  
113 96
def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
114 97
  """Whether exclusive_storage is in effect for the given node.
115 98

  
......
191 174
    assert not lu.glm.is_owned(level), "No locks should be owned"
192 175

  
193 176

  
194
def _CheckOutputFields(static, dynamic, selected):
195
  """Checks whether all selected fields are valid.
196

  
197
  @type static: L{utils.FieldSet}
198
  @param static: static fields set
199
  @type dynamic: L{utils.FieldSet}
200
  @param dynamic: dynamic fields set
201

  
202
  """
203
  f = utils.FieldSet()
204
  f.Extend(static)
205
  f.Extend(dynamic)
206

  
207
  delta = f.NonMatching(selected)
208
  if delta:
209
    raise errors.OpPrereqError("Unknown output fields selected: %s"
210
                               % ",".join(delta), errors.ECODE_INVAL)
211

  
212

  
213
def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
214
  """Make sure that none of the given paramters is global.
215

  
216
  If a global parameter is found, an L{errors.OpPrereqError} exception is
217
  raised. This is used to avoid setting global parameters for individual nodes.
218

  
219
  @type params: dictionary
220
  @param params: Parameters to check
221
  @type glob_pars: dictionary
222
  @param glob_pars: Forbidden parameters
223
  @type kind: string
224
  @param kind: Kind of parameters (e.g. "node")
225
  @type bad_levels: string
226
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
227
      "instance")
228
  @type good_levels: strings
229
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
230
      "cluster or group")
231

  
232
  """
233
  used_globals = glob_pars.intersection(params)
234
  if used_globals:
235
    msg = ("The following %s parameters are global and cannot"
236
           " be customized at %s level, please modify them at"
237
           " %s level: %s" %
238
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
239
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
240

  
241

  
242 177
def _CheckNodeOnline(lu, node, msg=None):
243 178
  """Ensure that a given node is online.
244 179

  
......
298 233
    _CheckOSVariant(result.payload, os_name)
299 234

  
300 235

  
301
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
302
  """Ensure that a node has the given secondary ip.
303

  
304
  @type lu: L{LogicalUnit}
305
  @param lu: the LU on behalf of which we make the check
306
  @type node: string
307
  @param node: the node to check
308
  @type secondary_ip: string
309
  @param secondary_ip: the ip to check
310
  @type prereq: boolean
311
  @param prereq: whether to throw a prerequisite or an execute error
312
  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
313
  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
314

  
315
  """
316
  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
317
  result.Raise("Failure checking secondary ip on node %s" % node,
318
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
319
  if not result.payload:
320
    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
321
           " please fix and re-run this command" % secondary_ip)
322
    if prereq:
323
      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
324
    else:
325
      raise errors.OpExecError(msg)
326

  
327

  
328 236
def _GetClusterDomainSecret():
329 237
  """Reads the cluster domain secret.
330 238

  
......
333 241
                               strict=True)
334 242

  
335 243

  
336
def _CheckInstanceState(lu, instance, req_states, msg=None):
337
  """Ensure that an instance is in one of the required states.
338

  
339
  @param lu: the LU on behalf of which we make the check
340
  @param instance: the instance to check
341
  @param msg: if passed, should be a message to replace the default one
342
  @raise errors.OpPrereqError: if the instance is not in the required state
343

  
344
  """
345
  if msg is None:
346
    msg = ("can't use instance from outside %s states" %
347
           utils.CommaJoin(req_states))
348
  if instance.admin_state not in req_states:
349
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
350
                               (instance.name, instance.admin_state, msg),
351
                               errors.ECODE_STATE)
352

  
353
  if constants.ADMINST_UP not in req_states:
354
    pnode = instance.primary_node
355
    if not lu.cfg.GetNodeInfo(pnode).offline:
356
      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
357
      ins_l.Raise("Can't contact node %s for instance information" % pnode,
358
                  prereq=True, ecode=errors.ECODE_ENVIRON)
359
      if instance.name in ins_l.payload:
360
        raise errors.OpPrereqError("Instance %s is running, %s" %
361
                                   (instance.name, msg), errors.ECODE_STATE)
362
    else:
363
      lu.LogWarning("Primary node offline, ignoring check that instance"
364
                     " is down")
365

  
366

  
367 244
def _ComputeIPolicyInstanceSpecViolation(
368 245
  ipolicy, instance_spec, disk_template,
369 246
  _compute_fn=_ComputeIPolicySpecViolation):
......
621 498
  return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
622 499

  
623 500

  
624
def _DecideSelfPromotion(lu, exceptions=None):
625
  """Decide whether I should promote myself as a master candidate.
626

  
627
  """
628
  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
629
  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
630
  # the new node will increase mc_max with one, so:
631
  mc_should = min(mc_should + 1, cp_size)
632
  return mc_now < mc_should
633

  
634

  
635 501
def _CheckNicsBridgesExist(lu, target_nics, target_node):
636 502
  """Check that the brigdes needed by a list of nics exist.
637 503

  
......
679 545
    raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
680 546

  
681 547

  
682
def _GetNodeInstancesInner(cfg, fn):
683
  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
684

  
685

  
686
def _GetNodeInstances(cfg, node_name):
687
  """Returns a list of all primary and secondary instances on a node.
688

  
689
  """
690

  
691
  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
692

  
693

  
694
def _GetNodePrimaryInstances(cfg, node_name):
695
  """Returns primary instances on a node.
696

  
697
  """
698
  return _GetNodeInstancesInner(cfg,
699
                                lambda inst: node_name == inst.primary_node)
700

  
701

  
702
def _GetNodeSecondaryInstances(cfg, node_name):
703
  """Returns secondary instances on a node.
704

  
705
  """
706
  return _GetNodeInstancesInner(cfg,
707
                                lambda inst: node_name in inst.secondary_nodes)
708

  
709

  
710
def _GetStorageTypeArgs(cfg, storage_type):
711
  """Returns the arguments for a storage type.
712

  
713
  """
714
  # Special case for file storage
715
  if storage_type == constants.ST_FILE:
716
    # storage.FileStorage wants a list of storage directories
717
    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
718

  
719
  return []
720

  
721

  
722
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
723
  faulty = []
724

  
725
  for dev in instance.disks:
726
    cfg.SetDiskID(dev, node_name)
727

  
728
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
729
                                                                instance))
730
  result.Raise("Failed to get disk status from node %s" % node_name,
731
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
732

  
733
  for idx, bdev_status in enumerate(result.payload):
734
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
735
      faulty.append(idx)
736

  
737
  return faulty
738

  
739

  
740
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
741
  """Check the sanity of iallocator and node arguments and use the
742
  cluster-wide iallocator if appropriate.
743

  
744
  Check that at most one of (iallocator, node) is specified. If none is
745
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
746
  then the LU's opcode's iallocator slot is filled with the cluster-wide
747
  default iallocator.
748

  
749
  @type iallocator_slot: string
750
  @param iallocator_slot: the name of the opcode iallocator slot
751
  @type node_slot: string
752
  @param node_slot: the name of the opcode target node slot
753

  
754
  """
755
  node = getattr(lu.op, node_slot, None)
756
  ialloc = getattr(lu.op, iallocator_slot, None)
757
  if node == []:
758
    node = None
759

  
760
  if node is not None and ialloc is not None:
761
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
762
                               errors.ECODE_INVAL)
763
  elif ((node is None and ialloc is None) or
764
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
765
    default_iallocator = lu.cfg.GetDefaultIAllocator()
766
    if default_iallocator:
767
      setattr(lu.op, iallocator_slot, default_iallocator)
768
    else:
769
      raise errors.OpPrereqError("No iallocator or node given and no"
770
                                 " cluster-wide default iallocator found;"
771
                                 " please specify either an iallocator or a"
772
                                 " node, or set a cluster-wide default"
773
                                 " iallocator", errors.ECODE_INVAL)
774

  
775

  
776 548
def _CheckHostnameSane(lu, name):
777 549
  """Ensures that a given hostname resolves to a 'sane' name.
778 550

  
......
1431 1203
    return self.eq.OldStyleQuery(self)
1432 1204

  
1433 1205

  
1434
class LUNodeRemove(LogicalUnit):
1435
  """Logical unit for removing a node.
1436

  
1437
  """
1438
  HPATH = "node-remove"
1439
  HTYPE = constants.HTYPE_NODE
1440

  
1441
  def BuildHooksEnv(self):
1442
    """Build hooks env.
1443

  
1444
    """
1445
    return {
1446
      "OP_TARGET": self.op.node_name,
1447
      "NODE_NAME": self.op.node_name,
1448
      }
1449

  
1450
  def BuildHooksNodes(self):
1451
    """Build hooks nodes.
1452

  
1453
    This doesn't run on the target node in the pre phase as a failed
1454
    node would then be impossible to remove.
1455

  
1456
    """
1457
    all_nodes = self.cfg.GetNodeList()
1458
    try:
1459
      all_nodes.remove(self.op.node_name)
1460
    except ValueError:
1461
      pass
1462
    return (all_nodes, all_nodes)
1463

  
1464
  def CheckPrereq(self):
1465
    """Check prerequisites.
1466

  
1467
    This checks:
1468
     - the node exists in the configuration
1469
     - it does not have primary or secondary instances
1470
     - it's not the master
1471

  
1472
    Any errors are signaled by raising errors.OpPrereqError.
1473

  
1474
    """
1475
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1476
    node = self.cfg.GetNodeInfo(self.op.node_name)
1477
    assert node is not None
1478

  
1479
    masternode = self.cfg.GetMasterNode()
1480
    if node.name == masternode:
1481
      raise errors.OpPrereqError("Node is the master node, failover to another"
1482
                                 " node is required", errors.ECODE_INVAL)
1483

  
1484
    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1485
      if node.name in instance.all_nodes:
1486
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1487
                                   " please remove first" % instance_name,
1488
                                   errors.ECODE_INVAL)
1489
    self.op.node_name = node.name
1490
    self.node = node
1491

  
1492
  def Exec(self, feedback_fn):
1493
    """Removes the node from the cluster.
1494

  
1495
    """
1496
    node = self.node
1497
    logging.info("Stopping the node daemon and removing configs from node %s",
1498
                 node.name)
1499

  
1500
    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1501

  
1502
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1503
      "Not owning BGL"
1504

  
1505
    # Promote nodes to master candidate as needed
1506
    _AdjustCandidatePool(self, exceptions=[node.name])
1507
    self.context.RemoveNode(node.name)
1508

  
1509
    # Run post hooks on the node before it's removed
1510
    _RunPostHook(self, node.name)
1511

  
1512
    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1513
    msg = result.fail_msg
1514
    if msg:
1515
      self.LogWarning("Errors encountered on the remote node while leaving"
1516
                      " the cluster: %s", msg)
1517

  
1518
    # Remove node from our /etc/hosts
1519
    if self.cfg.GetClusterInfo().modify_etc_hosts:
1520
      master_node = self.cfg.GetMasterNode()
1521
      result = self.rpc.call_etc_hosts_modify(master_node,
1522
                                              constants.ETC_HOSTS_REMOVE,
1523
                                              node.name, None)
1524
      result.Raise("Can't update hosts file with new host data")
1525
      _RedistributeAncillaryFiles(self)
1526

  
1527

  
1528
class _NodeQuery(_QueryBase):
1529
  FIELDS = query.NODE_FIELDS
1530

  
1531
  def ExpandNames(self, lu):
1532
    lu.needed_locks = {}
1533
    lu.share_locks = _ShareAll()
1534

  
1535
    if self.names:
1536
      self.wanted = _GetWantedNodes(lu, self.names)
1537
    else:
1538
      self.wanted = locking.ALL_SET
1539

  
1540
    self.do_locking = (self.use_locking and
1541
                       query.NQ_LIVE in self.requested_data)
1542

  
1543
    if self.do_locking:
1544
      # If any non-static field is requested we need to lock the nodes
1545
      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1546
      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1547

  
1548
  def DeclareLocks(self, lu, level):
1549
    pass
1550

  
1551
  def _GetQueryData(self, lu):
1552
    """Computes the list of nodes and their attributes.
1553

  
1554
    """
1555
    all_info = lu.cfg.GetAllNodesInfo()
1556

  
1557
    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1558

  
1559
    # Gather data as requested
1560
    if query.NQ_LIVE in self.requested_data:
1561
      # filter out non-vm_capable nodes
1562
      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1563

  
1564
      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1565
      node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
1566
                                        [lu.cfg.GetHypervisorType()], es_flags)
1567
      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1568
                       for (name, nresult) in node_data.items()
1569
                       if not nresult.fail_msg and nresult.payload)
1570
    else:
1571
      live_data = None
1572

  
1573
    if query.NQ_INST in self.requested_data:
1574
      node_to_primary = dict([(name, set()) for name in nodenames])
1575
      node_to_secondary = dict([(name, set()) for name in nodenames])
1576

  
1577
      inst_data = lu.cfg.GetAllInstancesInfo()
1578

  
1579
      for inst in inst_data.values():
1580
        if inst.primary_node in node_to_primary:
1581
          node_to_primary[inst.primary_node].add(inst.name)
1582
        for secnode in inst.secondary_nodes:
1583
          if secnode in node_to_secondary:
1584
            node_to_secondary[secnode].add(inst.name)
1585
    else:
1586
      node_to_primary = None
1587
      node_to_secondary = None
1588

  
1589
    if query.NQ_OOB in self.requested_data:
1590
      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
1591
                         for name, node in all_info.iteritems())
1592
    else:
1593
      oob_support = None
1594

  
1595
    if query.NQ_GROUP in self.requested_data:
1596
      groups = lu.cfg.GetAllNodeGroupsInfo()
1597
    else:
1598
      groups = {}
1599

  
1600
    return query.NodeQueryData([all_info[name] for name in nodenames],
1601
                               live_data, lu.cfg.GetMasterNode(),
1602
                               node_to_primary, node_to_secondary, groups,
1603
                               oob_support, lu.cfg.GetClusterInfo())
1604

  
1605

  
1606
class LUNodeQuery(NoHooksLU):
1607
  """Logical unit for querying nodes.
1608

  
1609
  """
1610
  # pylint: disable=W0142
1611
  REQ_BGL = False
1612

  
1613
  def CheckArguments(self):
1614
    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1615
                         self.op.output_fields, self.op.use_locking)
1616

  
1617
  def ExpandNames(self):
1618
    self.nq.ExpandNames(self)
1619

  
1620
  def DeclareLocks(self, level):
1621
    self.nq.DeclareLocks(self, level)
1622

  
1623
  def Exec(self, feedback_fn):
1624
    return self.nq.OldStyleQuery(self)
1625

  
1626

  
1627
class LUNodeQueryvols(NoHooksLU):
1628
  """Logical unit for getting volumes on node(s).
1629

  
1630
  """
1631
  REQ_BGL = False
1632
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1633
  _FIELDS_STATIC = utils.FieldSet("node")
1634

  
1635
  def CheckArguments(self):
1636
    _CheckOutputFields(static=self._FIELDS_STATIC,
1637
                       dynamic=self._FIELDS_DYNAMIC,
1638
                       selected=self.op.output_fields)
1639

  
1640
  def ExpandNames(self):
1641
    self.share_locks = _ShareAll()
1642

  
1643
    if self.op.nodes:
1644
      self.needed_locks = {
1645
        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1646
        }
1647
    else:
1648
      self.needed_locks = {
1649
        locking.LEVEL_NODE: locking.ALL_SET,
1650
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1651
        }
1652

  
1653
  def Exec(self, feedback_fn):
1654
    """Computes the list of nodes and their attributes.
1655

  
1656
    """
1657
    nodenames = self.owned_locks(locking.LEVEL_NODE)
1658
    volumes = self.rpc.call_node_volumes(nodenames)
1659

  
1660
    ilist = self.cfg.GetAllInstancesInfo()
1661
    vol2inst = _MapInstanceDisksToNodes(ilist.values())
1662

  
1663
    output = []
1664
    for node in nodenames:
1665
      nresult = volumes[node]
1666
      if nresult.offline:
1667
        continue
1668
      msg = nresult.fail_msg
1669
      if msg:
1670
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1671
        continue
1672

  
1673
      node_vols = sorted(nresult.payload,
1674
                         key=operator.itemgetter("dev"))
1675

  
1676
      for vol in node_vols:
1677
        node_output = []
1678
        for field in self.op.output_fields:
1679
          if field == "node":
1680
            val = node
1681
          elif field == "phys":
1682
            val = vol["dev"]
1683
          elif field == "vg":
1684
            val = vol["vg"]
1685
          elif field == "name":
1686
            val = vol["name"]
1687
          elif field == "size":
1688
            val = int(float(vol["size"]))
1689
          elif field == "instance":
1690
            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1691
          else:
1692
            raise errors.ParameterError(field)
1693
          node_output.append(str(val))
1694

  
1695
        output.append(node_output)
1696

  
1697
    return output
1698

  
1699

  
1700
class LUNodeQueryStorage(NoHooksLU):
1701
  """Logical unit for getting information on storage units on node(s).
1702

  
1703
  """
1704
  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1705
  REQ_BGL = False
1706

  
1707
  def CheckArguments(self):
1708
    _CheckOutputFields(static=self._FIELDS_STATIC,
1709
                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1710
                       selected=self.op.output_fields)
1711

  
1712
  def ExpandNames(self):
1713
    self.share_locks = _ShareAll()
1714

  
1715
    if self.op.nodes:
1716
      self.needed_locks = {
1717
        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1718
        }
1719
    else:
1720
      self.needed_locks = {
1721
        locking.LEVEL_NODE: locking.ALL_SET,
1722
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1723
        }
1724

  
1725
  def Exec(self, feedback_fn):
1726
    """Computes the list of nodes and their attributes.
1727

  
1728
    """
1729
    self.nodes = self.owned_locks(locking.LEVEL_NODE)
1730

  
1731
    # Always get name to sort by
1732
    if constants.SF_NAME in self.op.output_fields:
1733
      fields = self.op.output_fields[:]
1734
    else:
1735
      fields = [constants.SF_NAME] + self.op.output_fields
1736

  
1737
    # Never ask for node or type as it's only known to the LU
1738
    for extra in [constants.SF_NODE, constants.SF_TYPE]:
1739
      while extra in fields:
1740
        fields.remove(extra)
1741

  
1742
    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1743
    name_idx = field_idx[constants.SF_NAME]
1744

  
1745
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1746
    data = self.rpc.call_storage_list(self.nodes,
1747
                                      self.op.storage_type, st_args,
1748
                                      self.op.name, fields)
1749

  
1750
    result = []
1751

  
1752
    for node in utils.NiceSort(self.nodes):
1753
      nresult = data[node]
1754
      if nresult.offline:
1755
        continue
1756

  
1757
      msg = nresult.fail_msg
1758
      if msg:
1759
        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1760
        continue
1761

  
1762
      rows = dict([(row[name_idx], row) for row in nresult.payload])
1763

  
1764
      for name in utils.NiceSort(rows.keys()):
1765
        row = rows[name]
1766

  
1767
        out = []
1768

  
1769
        for field in self.op.output_fields:
1770
          if field == constants.SF_NODE:
1771
            val = node
1772
          elif field == constants.SF_TYPE:
1773
            val = self.op.storage_type
1774
          elif field in field_idx:
1775
            val = row[field_idx[field]]
1776
          else:
1777
            raise errors.ParameterError(field)
1778

  
1779
          out.append(val)
1780

  
1781
        result.append(out)
1782

  
1783
    return result
1784

  
1785

  
1786 1206
class _InstanceQuery(_QueryBase):
1787 1207
  FIELDS = query.INSTANCE_FIELDS
1788 1208

  
......
1833 1253
    owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1834 1254

  
1835 1255
    # Check if node groups for locked instances are still correct
1836
    for instance_name in owned_instances:
1837
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1838

  
1839
  def _GetQueryData(self, lu):
1840
    """Computes the list of instances and their attributes.
1841

  
1842
    """
1843
    if self.do_grouplocks:
1844
      self._CheckGroupLocks(lu)
1845

  
1846
    cluster = lu.cfg.GetClusterInfo()
1847
    all_info = lu.cfg.GetAllInstancesInfo()
1848

  
1849
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1850

  
1851
    instance_list = [all_info[name] for name in instance_names]
1852
    nodes = frozenset(itertools.chain(*(inst.all_nodes
1853
                                        for inst in instance_list)))
1854
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
1855
    bad_nodes = []
1856
    offline_nodes = []
1857
    wrongnode_inst = set()
1858

  
1859
    # Gather data as requested
1860
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1861
      live_data = {}
1862
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1863
      for name in nodes:
1864
        result = node_data[name]
1865
        if result.offline:
1866
          # offline nodes will be in both lists
1867
          assert result.fail_msg
1868
          offline_nodes.append(name)
1869
        if result.fail_msg:
1870
          bad_nodes.append(name)
1871
        elif result.payload:
1872
          for inst in result.payload:
1873
            if inst in all_info:
1874
              if all_info[inst].primary_node == name:
1875
                live_data.update(result.payload)
1876
              else:
1877
                wrongnode_inst.add(inst)
1878
            else:
1879
              # orphan instance; we don't list it here as we don't
1880
              # handle this case yet in the output of instance listing
1881
              logging.warning("Orphan instance '%s' found on node %s",
1882
                              inst, name)
1883
        # else no instance is alive
1884
    else:
1885
      live_data = {}
1886

  
1887
    if query.IQ_DISKUSAGE in self.requested_data:
1888
      gmi = ganeti.masterd.instance
1889
      disk_usage = dict((inst.name,
1890
                         gmi.ComputeDiskSize(inst.disk_template,
1891
                                             [{constants.IDISK_SIZE: disk.size}
1892
                                              for disk in inst.disks]))
1893
                        for inst in instance_list)
1894
    else:
1895
      disk_usage = None
1896

  
1897
    if query.IQ_CONSOLE in self.requested_data:
1898
      consinfo = {}
1899
      for inst in instance_list:
1900
        if inst.name in live_data:
1901
          # Instance is running
1902
          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
1903
        else:
1904
          consinfo[inst.name] = None
1905
      assert set(consinfo.keys()) == set(instance_names)
1906
    else:
1907
      consinfo = None
1908

  
1909
    if query.IQ_NODES in self.requested_data:
1910
      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
1911
                                            instance_list)))
1912
      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
1913
      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
1914
                    for uuid in set(map(operator.attrgetter("group"),
1915
                                        nodes.values())))
1916
    else:
1917
      nodes = None
1918
      groups = None
1919

  
1920
    if query.IQ_NETWORKS in self.requested_data:
1921
      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
1922
                                    for i in instance_list))
1923
      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
1924
    else:
1925
      networks = None
1926

  
1927
    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
1928
                                   disk_usage, offline_nodes, bad_nodes,
1929
                                   live_data, wrongnode_inst, consinfo,
1930
                                   nodes, groups, networks)
1931

  
1932

  
1933
class LUQuery(NoHooksLU):
1934
  """Query for resources/items of a certain kind.
1935

  
1936
  """
1937
  # pylint: disable=W0142
1938
  REQ_BGL = False
1939

  
1940
  def CheckArguments(self):
1941
    qcls = _GetQueryImplementation(self.op.what)
1942

  
1943
    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
1944

  
1945
  def ExpandNames(self):
1946
    self.impl.ExpandNames(self)
1947

  
1948
  def DeclareLocks(self, level):
1949
    self.impl.DeclareLocks(self, level)
1950

  
1951
  def Exec(self, feedback_fn):
1952
    return self.impl.NewStyleQuery(self)
1953

  
1954

  
1955
class LUQueryFields(NoHooksLU):
1956
  """Query for resources/items of a certain kind.
1957

  
1958
  """
1959
  # pylint: disable=W0142
1960
  REQ_BGL = False
1961

  
1962
  def CheckArguments(self):
1963
    self.qcls = _GetQueryImplementation(self.op.what)
1964

  
1965
  def ExpandNames(self):
1966
    self.needed_locks = {}
1967

  
1968
  def Exec(self, feedback_fn):
1969
    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
1970

  
1971

  
1972
class LUNodeModifyStorage(NoHooksLU):
1973
  """Logical unit for modifying a storage volume on a node.
1974

  
1975
  """
1976
  REQ_BGL = False
1977

  
1978
  def CheckArguments(self):
1979
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1980

  
1981
    storage_type = self.op.storage_type
1982

  
1983
    try:
1984
      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1985
    except KeyError:
1986
      raise errors.OpPrereqError("Storage units of type '%s' can not be"
1987
                                 " modified" % storage_type,
1988
                                 errors.ECODE_INVAL)
1989

  
1990
    diff = set(self.op.changes.keys()) - modifiable
1991
    if diff:
1992
      raise errors.OpPrereqError("The following fields can not be modified for"
1993
                                 " storage units of type '%s': %r" %
1994
                                 (storage_type, list(diff)),
1995
                                 errors.ECODE_INVAL)
1996

  
1997
  def ExpandNames(self):
1998
    self.needed_locks = {
1999
      locking.LEVEL_NODE: self.op.node_name,
2000
      }
2001

  
2002
  def Exec(self, feedback_fn):
2003
    """Computes the list of nodes and their attributes.
2004

  
2005
    """
2006
    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2007
    result = self.rpc.call_storage_modify(self.op.node_name,
2008
                                          self.op.storage_type, st_args,
2009
                                          self.op.name, self.op.changes)
2010
    result.Raise("Failed to modify storage unit '%s' on %s" %
2011
                 (self.op.name, self.op.node_name))
2012

  
2013

  
2014
class LUNodeAdd(LogicalUnit):
2015
  """Logical unit for adding node to the cluster.
2016

  
2017
  """
2018
  HPATH = "node-add"
2019
  HTYPE = constants.HTYPE_NODE
2020
  _NFLAGS = ["master_capable", "vm_capable"]
2021

  
2022
  def CheckArguments(self):
2023
    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
2024
    # validate/normalize the node name
2025
    self.hostname = netutils.GetHostname(name=self.op.node_name,
2026
                                         family=self.primary_ip_family)
2027
    self.op.node_name = self.hostname.name
2028

  
2029
    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
2030
      raise errors.OpPrereqError("Cannot readd the master node",
2031
                                 errors.ECODE_STATE)
2032

  
2033
    if self.op.readd and self.op.group:
2034
      raise errors.OpPrereqError("Cannot pass a node group when a node is"
2035
                                 " being readded", errors.ECODE_INVAL)
2036

  
2037
  def BuildHooksEnv(self):
2038
    """Build hooks env.
2039

  
2040
    This will run on all nodes before, and on all nodes + the new node after.
2041

  
2042
    """
2043
    return {
2044
      "OP_TARGET": self.op.node_name,
2045
      "NODE_NAME": self.op.node_name,
2046
      "NODE_PIP": self.op.primary_ip,
2047
      "NODE_SIP": self.op.secondary_ip,
2048
      "MASTER_CAPABLE": str(self.op.master_capable),
2049
      "VM_CAPABLE": str(self.op.vm_capable),
2050
      }
2051

  
2052
  def BuildHooksNodes(self):
2053
    """Build hooks nodes.
2054

  
2055
    """
2056
    # Exclude added node
2057
    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
2058
    post_nodes = pre_nodes + [self.op.node_name, ]
2059

  
2060
    return (pre_nodes, post_nodes)
2061

  
2062
  def CheckPrereq(self):
2063
    """Check prerequisites.
2064

  
2065
    This checks:
2066
     - the new node is not already in the config
2067
     - it is resolvable
2068
     - its parameters (single/dual homed) matches the cluster
2069

  
2070
    Any errors are signaled by raising errors.OpPrereqError.
2071

  
2072
    """
2073
    cfg = self.cfg
2074
    hostname = self.hostname
2075
    node = hostname.name
2076
    primary_ip = self.op.primary_ip = hostname.ip
2077
    if self.op.secondary_ip is None:
2078
      if self.primary_ip_family == netutils.IP6Address.family:
2079
        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
2080
                                   " IPv4 address must be given as secondary",
2081
                                   errors.ECODE_INVAL)
2082
      self.op.secondary_ip = primary_ip
2083

  
2084
    secondary_ip = self.op.secondary_ip
2085
    if not netutils.IP4Address.IsValid(secondary_ip):
2086
      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
2087
                                 " address" % secondary_ip, errors.ECODE_INVAL)
2088

  
2089
    node_list = cfg.GetNodeList()
2090
    if not self.op.readd and node in node_list:
2091
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2092
                                 node, errors.ECODE_EXISTS)
2093
    elif self.op.readd and node not in node_list:
2094
      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2095
                                 errors.ECODE_NOENT)
2096

  
2097
    self.changed_primary_ip = False
2098

  
2099
    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
2100
      if self.op.readd and node == existing_node_name:
2101
        if existing_node.secondary_ip != secondary_ip:
2102
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2103
                                     " address configuration as before",
2104
                                     errors.ECODE_INVAL)
2105
        if existing_node.primary_ip != primary_ip:
2106
          self.changed_primary_ip = True
2107

  
2108
        continue
2109

  
2110
      if (existing_node.primary_ip == primary_ip or
2111
          existing_node.secondary_ip == primary_ip or
2112
          existing_node.primary_ip == secondary_ip or
2113
          existing_node.secondary_ip == secondary_ip):
2114
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2115
                                   " existing node %s" % existing_node.name,
2116
                                   errors.ECODE_NOTUNIQUE)
2117

  
2118
    # After this 'if' block, None is no longer a valid value for the
2119
    # _capable op attributes
2120
    if self.op.readd:
2121
      old_node = self.cfg.GetNodeInfo(node)
2122
      assert old_node is not None, "Can't retrieve locked node %s" % node
2123
      for attr in self._NFLAGS:
2124
        if getattr(self.op, attr) is None:
2125
          setattr(self.op, attr, getattr(old_node, attr))
2126
    else:
2127
      for attr in self._NFLAGS:
2128
        if getattr(self.op, attr) is None:
2129
          setattr(self.op, attr, True)
2130

  
2131
    if self.op.readd and not self.op.vm_capable:
2132
      pri, sec = cfg.GetNodeInstances(node)
2133
      if pri or sec:
2134
        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
2135
                                   " flag set to false, but it already holds"
2136
                                   " instances" % node,
2137
                                   errors.ECODE_STATE)
2138

  
2139
    # check that the type of the node (single versus dual homed) is the
2140
    # same as for the master
2141
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2142
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2143
    newbie_singlehomed = secondary_ip == primary_ip
2144
    if master_singlehomed != newbie_singlehomed:
2145
      if master_singlehomed:
2146
        raise errors.OpPrereqError("The master has no secondary ip but the"
2147
                                   " new node has one",
2148
                                   errors.ECODE_INVAL)
2149
      else:
2150
        raise errors.OpPrereqError("The master has a secondary ip but the"
2151
                                   " new node doesn't have one",
2152
                                   errors.ECODE_INVAL)
2153

  
2154
    # checks reachability
2155
    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2156
      raise errors.OpPrereqError("Node not reachable by ping",
2157
                                 errors.ECODE_ENVIRON)
2158

  
2159
    if not newbie_singlehomed:
2160
      # check reachability from my secondary ip to newbie's secondary ip
2161
      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2162
                              source=myself.secondary_ip):
2163
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2164
                                   " based ping to node daemon port",
2165
                                   errors.ECODE_ENVIRON)
2166

  
2167
    if self.op.readd:
2168
      exceptions = [node]
2169
    else:
2170
      exceptions = []
2171

  
2172
    if self.op.master_capable:
2173
      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
2174
    else:
2175
      self.master_candidate = False
2176

  
2177
    if self.op.readd:
2178
      self.new_node = old_node
2179
    else:
2180
      node_group = cfg.LookupNodeGroup(self.op.group)
2181
      self.new_node = objects.Node(name=node,
2182
                                   primary_ip=primary_ip,
2183
                                   secondary_ip=secondary_ip,
2184
                                   master_candidate=self.master_candidate,
2185
                                   offline=False, drained=False,
2186
                                   group=node_group, ndparams={})
2187

  
2188
    if self.op.ndparams:
2189
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
2190
      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
2191
                            "node", "cluster or group")
2192

  
2193
    if self.op.hv_state:
2194
      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
2195

  
2196
    if self.op.disk_state:
2197
      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
2198

  
2199
    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
2200
    #       it a property on the base class.
2201
    rpcrunner = rpc.DnsOnlyRunner()
2202
    result = rpcrunner.call_version([node])[node]
2203
    result.Raise("Can't get version information from node %s" % node)
2204
    if constants.PROTOCOL_VERSION == result.payload:
2205
      logging.info("Communication to node %s fine, sw version %s match",
2206
                   node, result.payload)
2207
    else:
2208
      raise errors.OpPrereqError("Version mismatch master version %s,"
2209
                                 " node version %s" %
2210
                                 (constants.PROTOCOL_VERSION, result.payload),
2211
                                 errors.ECODE_ENVIRON)
2212

  
2213
    vg_name = cfg.GetVGName()
2214
    if vg_name is not None:
2215
      vparams = {constants.NV_PVLIST: [vg_name]}
2216
      excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
2217
      cname = self.cfg.GetClusterName()
2218
      result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
2219
      (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
2220
      if errmsgs:
2221
        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
2222
                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
2223

  
2224
  def Exec(self, feedback_fn):
2225
    """Adds the new node to the cluster.
2226

  
2227
    """
2228
    new_node = self.new_node
2229
    node = new_node.name
2230

  
2231
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
2232
      "Not owning BGL"
2233

  
2234
    # We adding a new node so we assume it's powered
2235
    new_node.powered = True
2236

  
2237
    # for re-adds, reset the offline/drained/master-candidate flags;
2238
    # we need to reset here, otherwise offline would prevent RPC calls
2239
    # later in the procedure; this also means that if the re-add
2240
    # fails, we are left with a non-offlined, broken node
2241
    if self.op.readd:
2242
      new_node.drained = new_node.offline = False # pylint: disable=W0201
2243
      self.LogInfo("Readding a node, the offline/drained flags were reset")
2244
      # if we demote the node, we do cleanup later in the procedure
2245
      new_node.master_candidate = self.master_candidate
2246
      if self.changed_primary_ip:
2247
        new_node.primary_ip = self.op.primary_ip
2248

  
2249
    # copy the master/vm_capable flags
2250
    for attr in self._NFLAGS:
2251
      setattr(new_node, attr, getattr(self.op, attr))
2252

  
2253
    # notify the user about any possible mc promotion
2254
    if new_node.master_candidate:
2255
      self.LogInfo("Node will be a master candidate")
2256

  
2257
    if self.op.ndparams:
2258
      new_node.ndparams = self.op.ndparams
2259
    else:
2260
      new_node.ndparams = {}
2261

  
2262
    if self.op.hv_state:
2263
      new_node.hv_state_static = self.new_hv_state
2264

  
2265
    if self.op.disk_state:
2266
      new_node.disk_state_static = self.new_disk_state
2267

  
2268
    # Add node to our /etc/hosts, and add key to known_hosts
2269
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2270
      master_node = self.cfg.GetMasterNode()
2271
      result = self.rpc.call_etc_hosts_modify(master_node,
2272
                                              constants.ETC_HOSTS_ADD,
2273
                                              self.hostname.name,
2274
                                              self.hostname.ip)
2275
      result.Raise("Can't update hosts file with new host data")
2276

  
2277
    if new_node.secondary_ip != new_node.primary_ip:
2278
      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
2279
                               False)
2280

  
2281
    node_verify_list = [self.cfg.GetMasterNode()]
2282
    node_verify_param = {
2283
      constants.NV_NODELIST: ([node], {}),
2284
      # TODO: do a node-net-test as well?
2285
    }
2286

  
2287
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2288
                                       self.cfg.GetClusterName())
2289
    for verifier in node_verify_list:
2290
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2291
      nl_payload = result[verifier].payload[constants.NV_NODELIST]
2292
      if nl_payload:
2293
        for failed in nl_payload:
2294
          feedback_fn("ssh/hostname verification failed"
2295
                      " (checking from %s): %s" %
2296
                      (verifier, nl_payload[failed]))
2297
        raise errors.OpExecError("ssh/hostname verification failed")
2298

  
2299
    if self.op.readd:
2300
      _RedistributeAncillaryFiles(self)
2301
      self.context.ReaddNode(new_node)
2302
      # make sure we redistribute the config
2303
      self.cfg.Update(new_node, feedback_fn)
2304
      # and make sure the new node will not have old files around
2305
      if not new_node.master_candidate:
2306
        result = self.rpc.call_node_demote_from_mc(new_node.name)
2307
        msg = result.fail_msg
2308
        if msg:
2309
          self.LogWarning("Node failed to demote itself from master"
2310
                          " candidate status: %s" % msg)
2311
    else:
2312
      _RedistributeAncillaryFiles(self, additional_nodes=[node],
2313
                                  additional_vm=self.op.vm_capable)
2314
      self.context.AddNode(new_node, self.proc.GetECId())
2315

  
2316

  
2317
class LUNodeSetParams(LogicalUnit):
2318
  """Modifies the parameters of a node.
2319

  
2320
  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
2321
      to the node role (as _ROLE_*)
2322
  @cvar _R2F: a dictionary from node role to tuples of flags
2323
  @cvar _FLAGS: a list of attribute names corresponding to the flags
2324

  
2325
  """
2326
  HPATH = "node-modify"
2327
  HTYPE = constants.HTYPE_NODE
2328
  REQ_BGL = False
2329
  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
2330
  _F2R = {
2331
    (True, False, False): _ROLE_CANDIDATE,
2332
    (False, True, False): _ROLE_DRAINED,
2333
    (False, False, True): _ROLE_OFFLINE,
2334
    (False, False, False): _ROLE_REGULAR,
2335
    }
2336
  _R2F = dict((v, k) for k, v in _F2R.items())
2337
  _FLAGS = ["master_candidate", "drained", "offline"]
2338

  
2339
  def CheckArguments(self):
2340
    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2341
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
2342
                self.op.master_capable, self.op.vm_capable,
2343
                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
2344
                self.op.disk_state]
2345
    if all_mods.count(None) == len(all_mods):
2346
      raise errors.OpPrereqError("Please pass at least one modification",
2347
                                 errors.ECODE_INVAL)
2348
    if all_mods.count(True) > 1:
2349
      raise errors.OpPrereqError("Can't set the node into more than one"
2350
                                 " state at the same time",
2351
                                 errors.ECODE_INVAL)
2352

  
2353
    # Boolean value that tells us whether we might be demoting from MC
2354
    self.might_demote = (self.op.master_candidate is False or
2355
                         self.op.offline is True or
2356
                         self.op.drained is True or
2357
                         self.op.master_capable is False)
2358

  
2359
    if self.op.secondary_ip:
2360
      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
2361
        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
2362
                                   " address" % self.op.secondary_ip,
2363
                                   errors.ECODE_INVAL)
2364

  
2365
    self.lock_all = self.op.auto_promote and self.might_demote
2366
    self.lock_instances = self.op.secondary_ip is not None
2367

  
2368
  def _InstanceFilter(self, instance):
2369
    """Filter for getting affected instances.
2370

  
2371
    """
2372
    return (instance.disk_template in constants.DTS_INT_MIRROR and
2373
            self.op.node_name in instance.all_nodes)
2374

  
2375
  def ExpandNames(self):
2376
    if self.lock_all:
2377
      self.needed_locks = {
2378
        locking.LEVEL_NODE: locking.ALL_SET,
2379

  
2380
        # Block allocations when all nodes are locked
2381
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2382
        }
2383
    else:
2384
      self.needed_locks = {
2385
        locking.LEVEL_NODE: self.op.node_name,
2386
        }
2387

  
2388
    # Since modifying a node can have severe effects on currently running
2389
    # operations the resource lock is at least acquired in shared mode
2390
    self.needed_locks[locking.LEVEL_NODE_RES] = \
2391
      self.needed_locks[locking.LEVEL_NODE]
2392

  
2393
    # Get all locks except nodes in shared mode; they are not used for anything
2394
    # but read-only access
2395
    self.share_locks = _ShareAll()
2396
    self.share_locks[locking.LEVEL_NODE] = 0
2397
    self.share_locks[locking.LEVEL_NODE_RES] = 0
2398
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
2399

  
2400
    if self.lock_instances:
2401
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2402
        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
2403

  
2404
  def BuildHooksEnv(self):
2405
    """Build hooks env.
2406

  
2407
    This runs on the master node.
2408

  
2409
    """
2410
    return {
2411
      "OP_TARGET": self.op.node_name,
2412
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2413
      "OFFLINE": str(self.op.offline),
2414
      "DRAINED": str(self.op.drained),
2415
      "MASTER_CAPABLE": str(self.op.master_capable),
2416
      "VM_CAPABLE": str(self.op.vm_capable),
2417
      }
2418

  
2419
  def BuildHooksNodes(self):
2420
    """Build hooks nodes.
2421

  
2422
    """
2423
    nl = [self.cfg.GetMasterNode(), self.op.node_name]
2424
    return (nl, nl)
2425

  
2426
  def CheckPrereq(self):
2427
    """Check prerequisites.
1256
    for instance_name in owned_instances:
1257
      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
2428 1258

  
2429
    This only checks the instance list against the existing names.
1259
  def _GetQueryData(self, lu):
1260
    """Computes the list of instances and their attributes.
2430 1261

  
2431 1262
    """
2432
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
1263
    if self.do_grouplocks:
1264
      self._CheckGroupLocks(lu)
2433 1265

  
2434
    if self.lock_instances:
2435
      affected_instances = \
2436
        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
1266
    cluster = lu.cfg.GetClusterInfo()
1267
    all_info = lu.cfg.GetAllInstancesInfo()
2437 1268

  
2438
      # Verify instance locks
2439
      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2440
      wanted_instances = frozenset(affected_instances.keys())
2441
      if wanted_instances - owned_instances:
2442
        raise errors.OpPrereqError("Instances affected by changing node %s's"
2443
                                   " secondary IP address have changed since"
2444
                                   " locks were acquired, wanted '%s', have"
2445
                                   " '%s'; retry the operation" %
2446
                                   (self.op.node_name,
2447
                                    utils.CommaJoin(wanted_instances),
2448
                                    utils.CommaJoin(owned_instances)),
2449
                                   errors.ECODE_STATE)
2450
    else:
2451
      affected_instances = None
2452

  
2453
    if (self.op.master_candidate is not None or
2454
        self.op.drained is not None or
2455
        self.op.offline is not None):
2456
      # we can't change the master's node flags
2457
      if self.op.node_name == self.cfg.GetMasterNode():
2458
        raise errors.OpPrereqError("The master role can be changed"
2459
                                   " only via master-failover",
2460
                                   errors.ECODE_INVAL)
1269
    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
2461 1270

  
2462
    if self.op.master_candidate and not node.master_capable:
2463
      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
2464
                                 " it a master candidate" % node.name,
2465
                                 errors.ECODE_STATE)
1271
    instance_list = [all_info[name] for name in instance_names]
1272
    nodes = frozenset(itertools.chain(*(inst.all_nodes
1273
                                        for inst in instance_list)))
1274
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
1275
    bad_nodes = []
1276
    offline_nodes = []
1277
    wrongnode_inst = set()
2466 1278

  
2467
    if self.op.vm_capable is False:
2468
      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
2469
      if ipri or isec:
2470
        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
2471
                                   " the vm_capable flag" % node.name,
2472
                                   errors.ECODE_STATE)
1279
    # Gather data as requested
1280
    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1281
      live_data = {}
1282
      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1283
      for name in nodes:
1284
        result = node_data[name]
1285
        if result.offline:
1286
          # offline nodes will be in both lists
1287
          assert result.fail_msg
1288
          offline_nodes.append(name)
1289
        if result.fail_msg:
1290
          bad_nodes.append(name)
1291
        elif result.payload:
1292
          for inst in result.payload:
1293
            if inst in all_info:
1294
              if all_info[inst].primary_node == name:
1295
                live_data.update(result.payload)
1296
              else:
1297
                wrongnode_inst.add(inst)
1298
            else:
1299
              # orphan instance; we don't list it here as we don't
1300
              # handle this case yet in the output of instance listing
1301
              logging.warning("Orphan instance '%s' found on node %s",
1302
                              inst, name)
1303
        # else no instance is alive
1304
    else:
1305
      live_data = {}
2473 1306

  
2474
    if node.master_candidate and self.might_demote and not self.lock_all:
2475
      assert not self.op.auto_promote, "auto_promote set but lock_all not"
2476
      # check if after removing the current node, we're missing master
2477
      # candidates
2478
      (mc_remaining, mc_should, _) = \
2479
          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
2480
      if mc_remaining < mc_should:
2481
        raise errors.OpPrereqError("Not enough master candidates, please"
2482
                                   " pass auto promote option to allow"
2483
                                   " promotion (--auto-promote or RAPI"
2484
                                   " auto_promote=True)", errors.ECODE_STATE)
2485

  
2486
    self.old_flags = old_flags = (node.master_candidate,
2487
                                  node.drained, node.offline)
2488
    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
2489
    self.old_role = old_role = self._F2R[old_flags]
2490

  
2491
    # Check for ineffective changes
2492
    for attr in self._FLAGS:
2493
      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
2494
        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
2495
        setattr(self.op, attr, None)
2496

  
2497
    # Past this point, any flag change to False means a transition
2498
    # away from the respective state, as only real changes are kept
2499

  
2500
    # TODO: We might query the real power state if it supports OOB
2501
    if _SupportsOob(self.cfg, node):
2502
      if self.op.offline is False and not (node.powered or
2503
                                           self.op.powered is True):
2504
        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
2505
                                    " offline status can be reset") %
2506
                                   self.op.node_name, errors.ECODE_STATE)
2507
    elif self.op.powered is not None:
2508
      raise errors.OpPrereqError(("Unable to change powered state for node %s"
2509
                                  " as it does not support out-of-band"
2510
                                  " handling") % self.op.node_name,
2511
                                 errors.ECODE_STATE)
1307
    if query.IQ_DISKUSAGE in self.requested_data:
1308
      gmi = ganeti.masterd.instance
1309
      disk_usage = dict((inst.name,
1310
                         gmi.ComputeDiskSize(inst.disk_template,
1311
                                             [{constants.IDISK_SIZE: disk.size}
1312
                                              for disk in inst.disks]))
1313
                        for inst in instance_list)
1314
    else:
1315
      disk_usage = None
2512 1316

  
2513
    # If we're being deofflined/drained, we'll MC ourself if needed
2514
    if (self.op.drained is False or self.op.offline is False or
2515
        (self.op.master_capable and not node.master_capable)):
2516
      if _DecideSelfPromotion(self):
2517
        self.op.master_candidate = True
2518
        self.LogInfo("Auto-promoting node to master candidate")
2519

  
2520
    # If we're no longer master capable, we'll demote ourselves from MC
2521
    if self.op.master_capable is False and node.master_candidate:
2522
      self.LogInfo("Demoting from master candidate")
2523
      self.op.master_candidate = False
2524

  
2525
    # Compute new role
2526
    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
2527
    if self.op.master_candidate:
2528
      new_role = self._ROLE_CANDIDATE
2529
    elif self.op.drained:
2530
      new_role = self._ROLE_DRAINED
2531
    elif self.op.offline:
2532
      new_role = self._ROLE_OFFLINE
2533
    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
2534
      # False is still in new flags, which means we're un-setting (the
2535
      # only) True flag
2536
      new_role = self._ROLE_REGULAR
2537
    else: # no new flags, nothing, keep old role
2538
      new_role = old_role
2539

  
2540
    self.new_role = new_role
2541

  
2542
    if old_role == self._ROLE_OFFLINE and new_role != old_role:
2543
      # Trying to transition out of offline status
2544
      result = self.rpc.call_version([node.name])[node.name]
2545
      if result.fail_msg:
2546
        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
2547
                                   " to report its version: %s" %
2548
                                   (node.name, result.fail_msg),
2549
                                   errors.ECODE_STATE)
2550
      else:
2551
        self.LogWarning("Transitioning node from offline to online state"
2552
                        " without using re-add. Please make sure the node"
2553
                        " is healthy!")
2554

  
2555
    # When changing the secondary ip, verify if this is a single-homed to
2556
    # multi-homed transition or vice versa, and apply the relevant
2557
    # restrictions.
2558
    if self.op.secondary_ip:
2559
      # Ok even without locking, because this can't be changed by any LU
2560
      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2561
      master_singlehomed = master.secondary_ip == master.primary_ip
2562
      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
2563
        if self.op.force and node.name == master.name:
2564
          self.LogWarning("Transitioning from single-homed to multi-homed"
2565
                          " cluster; all nodes will require a secondary IP"
2566
                          " address")
2567
        else:
2568
          raise errors.OpPrereqError("Changing the secondary ip on a"
2569
                                     " single-homed cluster requires the"
2570
                                     " --force option to be passed, and the"
2571
                                     " target node to be the master",
2572
                                     errors.ECODE_INVAL)
2573
      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
2574
        if self.op.force and node.name == master.name:
2575
          self.LogWarning("Transitioning from multi-homed to single-homed"
2576
                          " cluster; secondary IP addresses will have to be"
2577
                          " removed")
1317
    if query.IQ_CONSOLE in self.requested_data:
1318
      consinfo = {}
1319
      for inst in instance_list:
1320
        if inst.name in live_data:
1321
          # Instance is running
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff