Extract node related logical units from cmdlib
authorThomas Thrainer <thomasth@google.com>
Mon, 13 May 2013 13:16:27 +0000 (15:16 +0200)
committerThomas Thrainer <thomasth@google.com>
Fri, 17 May 2013 09:31:55 +0000 (11:31 +0200)
All LUNode* classes are extracted to node.py. Common functions are moved
to common.py if used by non-node logical units as well.

Signed-off-by: Thomas Thrainer <thomasth@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

Makefile.am
lib/cmdlib/__init__.py
lib/cmdlib/common.py
lib/cmdlib/node.py [new file with mode: 0644]
test/py/ganeti.cmdlib_unittest.py

index 594fe31..cc9d2d2 100644 (file)
@@ -313,6 +313,7 @@ cmdlib_PYTHON = \
        lib/cmdlib/base.py \
        lib/cmdlib/cluster.py \
        lib/cmdlib/group.py \
+       lib/cmdlib/node.py \
        lib/cmdlib/tags.py \
        lib/cmdlib/network.py \
        lib/cmdlib/test.py
index 705038b..f37230d 100644 (file)
@@ -56,7 +56,9 @@ from ganeti.masterd import iallocator
 
 from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
   Tasklet, _QueryBase
-from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
+from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
+  INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
+  _ExpandInstanceName, _ExpandItemName, \
   _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
   _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
   _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
@@ -65,7 +67,9 @@ from ganeti.cmdlib.common import _ExpandInstanceName, _ExpandItemName, \
   _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
   _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
   _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
-  _CheckInstanceNodeGroups
+  _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
+  _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
+  _CheckIAllocatorOrNode, _FindFaultyInstanceDisks
 
 from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
   LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
@@ -76,6 +80,10 @@ from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
 from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
   _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
   LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
+from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
+  LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
+  _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
+  LUNodeRemove, LURepairNodeStorage
 from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
 from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
   LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
@@ -85,31 +93,6 @@ from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
 import ganeti.masterd.instance # pylint: disable=W0611
 
 
-# States of instance
-INSTANCE_DOWN = [constants.ADMINST_DOWN]
-INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
-INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
-
-#: Instance status in which an instance can be marked as offline/online
-CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
-  constants.ADMINST_OFFLINE,
-  ]))
-
-
-def _IsExclusiveStorageEnabledNode(cfg, node):
-  """Whether exclusive_storage is in effect for the given node.
-
-  @type cfg: L{config.ConfigWriter}
-  @param cfg: The cluster configuration
-  @type node: L{objects.Node}
-  @param node: The node
-  @rtype: bool
-  @return: The effective value of exclusive_storage
-
-  """
-  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
-
-
 def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
   """Whether exclusive_storage is in effect for the given node.
 
@@ -191,54 +174,6 @@ def _ReleaseLocks(lu, level, names=None, keep=None):
     assert not lu.glm.is_owned(level), "No locks should be owned"
 
 
-def _CheckOutputFields(static, dynamic, selected):
-  """Checks whether all selected fields are valid.
-
-  @type static: L{utils.FieldSet}
-  @param static: static fields set
-  @type dynamic: L{utils.FieldSet}
-  @param dynamic: dynamic fields set
-
-  """
-  f = utils.FieldSet()
-  f.Extend(static)
-  f.Extend(dynamic)
-
-  delta = f.NonMatching(selected)
-  if delta:
-    raise errors.OpPrereqError("Unknown output fields selected: %s"
-                               % ",".join(delta), errors.ECODE_INVAL)
-
-
-def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
-  """Make sure that none of the given paramters is global.
-
-  If a global parameter is found, an L{errors.OpPrereqError} exception is
-  raised. This is used to avoid setting global parameters for individual nodes.
-
-  @type params: dictionary
-  @param params: Parameters to check
-  @type glob_pars: dictionary
-  @param glob_pars: Forbidden parameters
-  @type kind: string
-  @param kind: Kind of parameters (e.g. "node")
-  @type bad_levels: string
-  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
-      "instance")
-  @type good_levels: strings
-  @param good_levels: Level(s) at which the parameters are allowed (e.g.
-      "cluster or group")
-
-  """
-  used_globals = glob_pars.intersection(params)
-  if used_globals:
-    msg = ("The following %s parameters are global and cannot"
-           " be customized at %s level, please modify them at"
-           " %s level: %s" %
-           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
-    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-
-
 def _CheckNodeOnline(lu, node, msg=None):
   """Ensure that a given node is online.
 
@@ -298,33 +233,6 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
-def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
-  """Ensure that a node has the given secondary ip.
-
-  @type lu: L{LogicalUnit}
-  @param lu: the LU on behalf of which we make the check
-  @type node: string
-  @param node: the node to check
-  @type secondary_ip: string
-  @param secondary_ip: the ip to check
-  @type prereq: boolean
-  @param prereq: whether to throw a prerequisite or an execute error
-  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
-  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
-
-  """
-  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
-  result.Raise("Failure checking secondary ip on node %s" % node,
-               prereq=prereq, ecode=errors.ECODE_ENVIRON)
-  if not result.payload:
-    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
-           " please fix and re-run this command" % secondary_ip)
-    if prereq:
-      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
-    else:
-      raise errors.OpExecError(msg)
-
-
 def _GetClusterDomainSecret():
   """Reads the cluster domain secret.
 
@@ -333,37 +241,6 @@ def _GetClusterDomainSecret():
                                strict=True)
 
 
-def _CheckInstanceState(lu, instance, req_states, msg=None):
-  """Ensure that an instance is in one of the required states.
-
-  @param lu: the LU on behalf of which we make the check
-  @param instance: the instance to check
-  @param msg: if passed, should be a message to replace the default one
-  @raise errors.OpPrereqError: if the instance is not in the required state
-
-  """
-  if msg is None:
-    msg = ("can't use instance from outside %s states" %
-           utils.CommaJoin(req_states))
-  if instance.admin_state not in req_states:
-    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
-                               (instance.name, instance.admin_state, msg),
-                               errors.ECODE_STATE)
-
-  if constants.ADMINST_UP not in req_states:
-    pnode = instance.primary_node
-    if not lu.cfg.GetNodeInfo(pnode).offline:
-      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-      ins_l.Raise("Can't contact node %s for instance information" % pnode,
-                  prereq=True, ecode=errors.ECODE_ENVIRON)
-      if instance.name in ins_l.payload:
-        raise errors.OpPrereqError("Instance %s is running, %s" %
-                                   (instance.name, msg), errors.ECODE_STATE)
-    else:
-      lu.LogWarning("Primary node offline, ignoring check that instance"
-                     " is down")
-
-
 def _ComputeIPolicyInstanceSpecViolation(
   ipolicy, instance_spec, disk_template,
   _compute_fn=_ComputeIPolicySpecViolation):
@@ -621,17 +498,6 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
 
 
-def _DecideSelfPromotion(lu, exceptions=None):
-  """Decide whether I should promote myself as a master candidate.
-
-  """
-  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
-  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
-  # the new node will increase mc_max with one, so:
-  mc_should = min(mc_should + 1, cp_size)
-  return mc_now < mc_should
-
-
 def _CheckNicsBridgesExist(lu, target_nics, target_node):
   """Check that the brigdes needed by a list of nics exist.
 
@@ -679,100 +545,6 @@ def _CheckOSVariant(os_obj, name):
     raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
 
 
-def _GetNodeInstancesInner(cfg, fn):
-  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
-
-
-def _GetNodeInstances(cfg, node_name):
-  """Returns a list of all primary and secondary instances on a node.
-
-  """
-
-  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
-
-
-def _GetNodePrimaryInstances(cfg, node_name):
-  """Returns primary instances on a node.
-
-  """
-  return _GetNodeInstancesInner(cfg,
-                                lambda inst: node_name == inst.primary_node)
-
-
-def _GetNodeSecondaryInstances(cfg, node_name):
-  """Returns secondary instances on a node.
-
-  """
-  return _GetNodeInstancesInner(cfg,
-                                lambda inst: node_name in inst.secondary_nodes)
-
-
-def _GetStorageTypeArgs(cfg, storage_type):
-  """Returns the arguments for a storage type.
-
-  """
-  # Special case for file storage
-  if storage_type == constants.ST_FILE:
-    # storage.FileStorage wants a list of storage directories
-    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
-
-  return []
-
-
-def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
-  faulty = []
-
-  for dev in instance.disks:
-    cfg.SetDiskID(dev, node_name)
-
-  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
-                                                                instance))
-  result.Raise("Failed to get disk status from node %s" % node_name,
-               prereq=prereq, ecode=errors.ECODE_ENVIRON)
-
-  for idx, bdev_status in enumerate(result.payload):
-    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
-      faulty.append(idx)
-
-  return faulty
-
-
-def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
-  """Check the sanity of iallocator and node arguments and use the
-  cluster-wide iallocator if appropriate.
-
-  Check that at most one of (iallocator, node) is specified. If none is
-  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
-  then the LU's opcode's iallocator slot is filled with the cluster-wide
-  default iallocator.
-
-  @type iallocator_slot: string
-  @param iallocator_slot: the name of the opcode iallocator slot
-  @type node_slot: string
-  @param node_slot: the name of the opcode target node slot
-
-  """
-  node = getattr(lu.op, node_slot, None)
-  ialloc = getattr(lu.op, iallocator_slot, None)
-  if node == []:
-    node = None
-
-  if node is not None and ialloc is not None:
-    raise errors.OpPrereqError("Do not specify both, iallocator and node",
-                               errors.ECODE_INVAL)
-  elif ((node is None and ialloc is None) or
-        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
-    default_iallocator = lu.cfg.GetDefaultIAllocator()
-    if default_iallocator:
-      setattr(lu.op, iallocator_slot, default_iallocator)
-    else:
-      raise errors.OpPrereqError("No iallocator or node given and no"
-                                 " cluster-wide default iallocator found;"
-                                 " please specify either an iallocator or a"
-                                 " node, or set a cluster-wide default"
-                                 " iallocator", errors.ECODE_INVAL)
-
-
 def _CheckHostnameSane(lu, name):
   """Ensures that a given hostname resolves to a 'sane' name.
 
@@ -1431,358 +1203,6 @@ class LUExtStorageDiagnose(NoHooksLU):
     return self.eq.OldStyleQuery(self)
 
 
-class LUNodeRemove(LogicalUnit):
-  """Logical unit for removing a node.
-
-  """
-  HPATH = "node-remove"
-  HTYPE = constants.HTYPE_NODE
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    """
-    return {
-      "OP_TARGET": self.op.node_name,
-      "NODE_NAME": self.op.node_name,
-      }
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    This doesn't run on the target node in the pre phase as a failed
-    node would then be impossible to remove.
-
-    """
-    all_nodes = self.cfg.GetNodeList()
-    try:
-      all_nodes.remove(self.op.node_name)
-    except ValueError:
-      pass
-    return (all_nodes, all_nodes)
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks:
-     - the node exists in the configuration
-     - it does not have primary or secondary instances
-     - it's not the master
-
-    Any errors are signaled by raising errors.OpPrereqError.
-
-    """
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    node = self.cfg.GetNodeInfo(self.op.node_name)
-    assert node is not None
-
-    masternode = self.cfg.GetMasterNode()
-    if node.name == masternode:
-      raise errors.OpPrereqError("Node is the master node, failover to another"
-                                 " node is required", errors.ECODE_INVAL)
-
-    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
-      if node.name in instance.all_nodes:
-        raise errors.OpPrereqError("Instance %s is still running on the node,"
-                                   " please remove first" % instance_name,
-                                   errors.ECODE_INVAL)
-    self.op.node_name = node.name
-    self.node = node
-
-  def Exec(self, feedback_fn):
-    """Removes the node from the cluster.
-
-    """
-    node = self.node
-    logging.info("Stopping the node daemon and removing configs from node %s",
-                 node.name)
-
-    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
-
-    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
-      "Not owning BGL"
-
-    # Promote nodes to master candidate as needed
-    _AdjustCandidatePool(self, exceptions=[node.name])
-    self.context.RemoveNode(node.name)
-
-    # Run post hooks on the node before it's removed
-    _RunPostHook(self, node.name)
-
-    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
-    msg = result.fail_msg
-    if msg:
-      self.LogWarning("Errors encountered on the remote node while leaving"
-                      " the cluster: %s", msg)
-
-    # Remove node from our /etc/hosts
-    if self.cfg.GetClusterInfo().modify_etc_hosts:
-      master_node = self.cfg.GetMasterNode()
-      result = self.rpc.call_etc_hosts_modify(master_node,
-                                              constants.ETC_HOSTS_REMOVE,
-                                              node.name, None)
-      result.Raise("Can't update hosts file with new host data")
-      _RedistributeAncillaryFiles(self)
-
-
-class _NodeQuery(_QueryBase):
-  FIELDS = query.NODE_FIELDS
-
-  def ExpandNames(self, lu):
-    lu.needed_locks = {}
-    lu.share_locks = _ShareAll()
-
-    if self.names:
-      self.wanted = _GetWantedNodes(lu, self.names)
-    else:
-      self.wanted = locking.ALL_SET
-
-    self.do_locking = (self.use_locking and
-                       query.NQ_LIVE in self.requested_data)
-
-    if self.do_locking:
-      # If any non-static field is requested we need to lock the nodes
-      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
-      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
-  def DeclareLocks(self, lu, level):
-    pass
-
-  def _GetQueryData(self, lu):
-    """Computes the list of nodes and their attributes.
-
-    """
-    all_info = lu.cfg.GetAllNodesInfo()
-
-    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
-
-    # Gather data as requested
-    if query.NQ_LIVE in self.requested_data:
-      # filter out non-vm_capable nodes
-      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
-
-      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
-      node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
-                                        [lu.cfg.GetHypervisorType()], es_flags)
-      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
-                       for (name, nresult) in node_data.items()
-                       if not nresult.fail_msg and nresult.payload)
-    else:
-      live_data = None
-
-    if query.NQ_INST in self.requested_data:
-      node_to_primary = dict([(name, set()) for name in nodenames])
-      node_to_secondary = dict([(name, set()) for name in nodenames])
-
-      inst_data = lu.cfg.GetAllInstancesInfo()
-
-      for inst in inst_data.values():
-        if inst.primary_node in node_to_primary:
-          node_to_primary[inst.primary_node].add(inst.name)
-        for secnode in inst.secondary_nodes:
-          if secnode in node_to_secondary:
-            node_to_secondary[secnode].add(inst.name)
-    else:
-      node_to_primary = None
-      node_to_secondary = None
-
-    if query.NQ_OOB in self.requested_data:
-      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
-                         for name, node in all_info.iteritems())
-    else:
-      oob_support = None
-
-    if query.NQ_GROUP in self.requested_data:
-      groups = lu.cfg.GetAllNodeGroupsInfo()
-    else:
-      groups = {}
-
-    return query.NodeQueryData([all_info[name] for name in nodenames],
-                               live_data, lu.cfg.GetMasterNode(),
-                               node_to_primary, node_to_secondary, groups,
-                               oob_support, lu.cfg.GetClusterInfo())
-
-
-class LUNodeQuery(NoHooksLU):
-  """Logical unit for querying nodes.
-
-  """
-  # pylint: disable=W0142
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
-                         self.op.output_fields, self.op.use_locking)
-
-  def ExpandNames(self):
-    self.nq.ExpandNames(self)
-
-  def DeclareLocks(self, level):
-    self.nq.DeclareLocks(self, level)
-
-  def Exec(self, feedback_fn):
-    return self.nq.OldStyleQuery(self)
-
-
-class LUNodeQueryvols(NoHooksLU):
-  """Logical unit for getting volumes on node(s).
-
-  """
-  REQ_BGL = False
-  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
-  _FIELDS_STATIC = utils.FieldSet("node")
-
-  def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
-
-  def ExpandNames(self):
-    self.share_locks = _ShareAll()
-
-    if self.op.nodes:
-      self.needed_locks = {
-        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
-        }
-    else:
-      self.needed_locks = {
-        locking.LEVEL_NODE: locking.ALL_SET,
-        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
-        }
-
-  def Exec(self, feedback_fn):
-    """Computes the list of nodes and their attributes.
-
-    """
-    nodenames = self.owned_locks(locking.LEVEL_NODE)
-    volumes = self.rpc.call_node_volumes(nodenames)
-
-    ilist = self.cfg.GetAllInstancesInfo()
-    vol2inst = _MapInstanceDisksToNodes(ilist.values())
-
-    output = []
-    for node in nodenames:
-      nresult = volumes[node]
-      if nresult.offline:
-        continue
-      msg = nresult.fail_msg
-      if msg:
-        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
-        continue
-
-      node_vols = sorted(nresult.payload,
-                         key=operator.itemgetter("dev"))
-
-      for vol in node_vols:
-        node_output = []
-        for field in self.op.output_fields:
-          if field == "node":
-            val = node
-          elif field == "phys":
-            val = vol["dev"]
-          elif field == "vg":
-            val = vol["vg"]
-          elif field == "name":
-            val = vol["name"]
-          elif field == "size":
-            val = int(float(vol["size"]))
-          elif field == "instance":
-            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
-          else:
-            raise errors.ParameterError(field)
-          node_output.append(str(val))
-
-        output.append(node_output)
-
-    return output
-
-
-class LUNodeQueryStorage(NoHooksLU):
-  """Logical unit for getting information on storage units on node(s).
-
-  """
-  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
-                       selected=self.op.output_fields)
-
-  def ExpandNames(self):
-    self.share_locks = _ShareAll()
-
-    if self.op.nodes:
-      self.needed_locks = {
-        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
-        }
-    else:
-      self.needed_locks = {
-        locking.LEVEL_NODE: locking.ALL_SET,
-        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
-        }
-
-  def Exec(self, feedback_fn):
-    """Computes the list of nodes and their attributes.
-
-    """
-    self.nodes = self.owned_locks(locking.LEVEL_NODE)
-
-    # Always get name to sort by
-    if constants.SF_NAME in self.op.output_fields:
-      fields = self.op.output_fields[:]
-    else:
-      fields = [constants.SF_NAME] + self.op.output_fields
-
-    # Never ask for node or type as it's only known to the LU
-    for extra in [constants.SF_NODE, constants.SF_TYPE]:
-      while extra in fields:
-        fields.remove(extra)
-
-    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
-    name_idx = field_idx[constants.SF_NAME]
-
-    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
-    data = self.rpc.call_storage_list(self.nodes,
-                                      self.op.storage_type, st_args,
-                                      self.op.name, fields)
-
-    result = []
-
-    for node in utils.NiceSort(self.nodes):
-      nresult = data[node]
-      if nresult.offline:
-        continue
-
-      msg = nresult.fail_msg
-      if msg:
-        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
-        continue
-
-      rows = dict([(row[name_idx], row) for row in nresult.payload])
-
-      for name in utils.NiceSort(rows.keys()):
-        row = rows[name]
-
-        out = []
-
-        for field in self.op.output_fields:
-          if field == constants.SF_NODE:
-            val = node
-          elif field == constants.SF_TYPE:
-            val = self.op.storage_type
-          elif field in field_idx:
-            val = row[field_idx[field]]
-          else:
-            raise errors.ParameterError(field)
-
-          out.append(val)
-
-        result.append(out)
-
-    return result
-
-
 class _InstanceQuery(_QueryBase):
   FIELDS = query.INSTANCE_FIELDS
 
@@ -1833,887 +1253,140 @@ class _InstanceQuery(_QueryBase):
     owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
 
     # Check if node groups for locked instances are still correct
-    for instance_name in owned_instances:
-      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
-
-  def _GetQueryData(self, lu):
-    """Computes the list of instances and their attributes.
-
-    """
-    if self.do_grouplocks:
-      self._CheckGroupLocks(lu)
-
-    cluster = lu.cfg.GetClusterInfo()
-    all_info = lu.cfg.GetAllInstancesInfo()
-
-    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
-
-    instance_list = [all_info[name] for name in instance_names]
-    nodes = frozenset(itertools.chain(*(inst.all_nodes
-                                        for inst in instance_list)))
-    hv_list = list(set([inst.hypervisor for inst in instance_list]))
-    bad_nodes = []
-    offline_nodes = []
-    wrongnode_inst = set()
-
-    # Gather data as requested
-    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
-      live_data = {}
-      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
-      for name in nodes:
-        result = node_data[name]
-        if result.offline:
-          # offline nodes will be in both lists
-          assert result.fail_msg
-          offline_nodes.append(name)
-        if result.fail_msg:
-          bad_nodes.append(name)
-        elif result.payload:
-          for inst in result.payload:
-            if inst in all_info:
-              if all_info[inst].primary_node == name:
-                live_data.update(result.payload)
-              else:
-                wrongnode_inst.add(inst)
-            else:
-              # orphan instance; we don't list it here as we don't
-              # handle this case yet in the output of instance listing
-              logging.warning("Orphan instance '%s' found on node %s",
-                              inst, name)
-        # else no instance is alive
-    else:
-      live_data = {}
-
-    if query.IQ_DISKUSAGE in self.requested_data:
-      gmi = ganeti.masterd.instance
-      disk_usage = dict((inst.name,
-                         gmi.ComputeDiskSize(inst.disk_template,
-                                             [{constants.IDISK_SIZE: disk.size}
-                                              for disk in inst.disks]))
-                        for inst in instance_list)
-    else:
-      disk_usage = None
-
-    if query.IQ_CONSOLE in self.requested_data:
-      consinfo = {}
-      for inst in instance_list:
-        if inst.name in live_data:
-          # Instance is running
-          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
-        else:
-          consinfo[inst.name] = None
-      assert set(consinfo.keys()) == set(instance_names)
-    else:
-      consinfo = None
-
-    if query.IQ_NODES in self.requested_data:
-      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
-                                            instance_list)))
-      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
-      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
-                    for uuid in set(map(operator.attrgetter("group"),
-                                        nodes.values())))
-    else:
-      nodes = None
-      groups = None
-
-    if query.IQ_NETWORKS in self.requested_data:
-      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
-                                    for i in instance_list))
-      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
-    else:
-      networks = None
-
-    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
-                                   disk_usage, offline_nodes, bad_nodes,
-                                   live_data, wrongnode_inst, consinfo,
-                                   nodes, groups, networks)
-
-
-class LUQuery(NoHooksLU):
-  """Query for resources/items of a certain kind.
-
-  """
-  # pylint: disable=W0142
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    qcls = _GetQueryImplementation(self.op.what)
-
-    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
-
-  def ExpandNames(self):
-    self.impl.ExpandNames(self)
-
-  def DeclareLocks(self, level):
-    self.impl.DeclareLocks(self, level)
-
-  def Exec(self, feedback_fn):
-    return self.impl.NewStyleQuery(self)
-
-
-class LUQueryFields(NoHooksLU):
-  """Query for resources/items of a certain kind.
-
-  """
-  # pylint: disable=W0142
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    self.qcls = _GetQueryImplementation(self.op.what)
-
-  def ExpandNames(self):
-    self.needed_locks = {}
-
-  def Exec(self, feedback_fn):
-    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
-
-
-class LUNodeModifyStorage(NoHooksLU):
-  """Logical unit for modifying a storage volume on a node.
-
-  """
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
-    storage_type = self.op.storage_type
-
-    try:
-      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
-    except KeyError:
-      raise errors.OpPrereqError("Storage units of type '%s' can not be"
-                                 " modified" % storage_type,
-                                 errors.ECODE_INVAL)
-
-    diff = set(self.op.changes.keys()) - modifiable
-    if diff:
-      raise errors.OpPrereqError("The following fields can not be modified for"
-                                 " storage units of type '%s': %r" %
-                                 (storage_type, list(diff)),
-                                 errors.ECODE_INVAL)
-
-  def ExpandNames(self):
-    self.needed_locks = {
-      locking.LEVEL_NODE: self.op.node_name,
-      }
-
-  def Exec(self, feedback_fn):
-    """Computes the list of nodes and their attributes.
-
-    """
-    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
-    result = self.rpc.call_storage_modify(self.op.node_name,
-                                          self.op.storage_type, st_args,
-                                          self.op.name, self.op.changes)
-    result.Raise("Failed to modify storage unit '%s' on %s" %
-                 (self.op.name, self.op.node_name))
-
-
-class LUNodeAdd(LogicalUnit):
-  """Logical unit for adding node to the cluster.
-
-  """
-  HPATH = "node-add"
-  HTYPE = constants.HTYPE_NODE
-  _NFLAGS = ["master_capable", "vm_capable"]
-
-  def CheckArguments(self):
-    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
-    # validate/normalize the node name
-    self.hostname = netutils.GetHostname(name=self.op.node_name,
-                                         family=self.primary_ip_family)
-    self.op.node_name = self.hostname.name
-
-    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
-      raise errors.OpPrereqError("Cannot readd the master node",
-                                 errors.ECODE_STATE)
-
-    if self.op.readd and self.op.group:
-      raise errors.OpPrereqError("Cannot pass a node group when a node is"
-                                 " being readded", errors.ECODE_INVAL)
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This will run on all nodes before, and on all nodes + the new node after.
-
-    """
-    return {
-      "OP_TARGET": self.op.node_name,
-      "NODE_NAME": self.op.node_name,
-      "NODE_PIP": self.op.primary_ip,
-      "NODE_SIP": self.op.secondary_ip,
-      "MASTER_CAPABLE": str(self.op.master_capable),
-      "VM_CAPABLE": str(self.op.vm_capable),
-      }
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    # Exclude added node
-    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
-    post_nodes = pre_nodes + [self.op.node_name, ]
-
-    return (pre_nodes, post_nodes)
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks:
-     - the new node is not already in the config
-     - it is resolvable
-     - its parameters (single/dual homed) matches the cluster
-
-    Any errors are signaled by raising errors.OpPrereqError.
-
-    """
-    cfg = self.cfg
-    hostname = self.hostname
-    node = hostname.name
-    primary_ip = self.op.primary_ip = hostname.ip
-    if self.op.secondary_ip is None:
-      if self.primary_ip_family == netutils.IP6Address.family:
-        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
-                                   " IPv4 address must be given as secondary",
-                                   errors.ECODE_INVAL)
-      self.op.secondary_ip = primary_ip
-
-    secondary_ip = self.op.secondary_ip
-    if not netutils.IP4Address.IsValid(secondary_ip):
-      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
-                                 " address" % secondary_ip, errors.ECODE_INVAL)
-
-    node_list = cfg.GetNodeList()
-    if not self.op.readd and node in node_list:
-      raise errors.OpPrereqError("Node %s is already in the configuration" %
-                                 node, errors.ECODE_EXISTS)
-    elif self.op.readd and node not in node_list:
-      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
-                                 errors.ECODE_NOENT)
-
-    self.changed_primary_ip = False
-
-    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
-      if self.op.readd and node == existing_node_name:
-        if existing_node.secondary_ip != secondary_ip:
-          raise errors.OpPrereqError("Readded node doesn't have the same IP"
-                                     " address configuration as before",
-                                     errors.ECODE_INVAL)
-        if existing_node.primary_ip != primary_ip:
-          self.changed_primary_ip = True
-
-        continue
-
-      if (existing_node.primary_ip == primary_ip or
-          existing_node.secondary_ip == primary_ip or
-          existing_node.primary_ip == secondary_ip or
-          existing_node.secondary_ip == secondary_ip):
-        raise errors.OpPrereqError("New node ip address(es) conflict with"
-                                   " existing node %s" % existing_node.name,
-                                   errors.ECODE_NOTUNIQUE)
-
-    # After this 'if' block, None is no longer a valid value for the
-    # _capable op attributes
-    if self.op.readd:
-      old_node = self.cfg.GetNodeInfo(node)
-      assert old_node is not None, "Can't retrieve locked node %s" % node
-      for attr in self._NFLAGS:
-        if getattr(self.op, attr) is None:
-          setattr(self.op, attr, getattr(old_node, attr))
-    else:
-      for attr in self._NFLAGS:
-        if getattr(self.op, attr) is None:
-          setattr(self.op, attr, True)
-
-    if self.op.readd and not self.op.vm_capable:
-      pri, sec = cfg.GetNodeInstances(node)
-      if pri or sec:
-        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
-                                   " flag set to false, but it already holds"
-                                   " instances" % node,
-                                   errors.ECODE_STATE)
-
-    # check that the type of the node (single versus dual homed) is the
-    # same as for the master
-    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
-    master_singlehomed = myself.secondary_ip == myself.primary_ip
-    newbie_singlehomed = secondary_ip == primary_ip
-    if master_singlehomed != newbie_singlehomed:
-      if master_singlehomed:
-        raise errors.OpPrereqError("The master has no secondary ip but the"
-                                   " new node has one",
-                                   errors.ECODE_INVAL)
-      else:
-        raise errors.OpPrereqError("The master has a secondary ip but the"
-                                   " new node doesn't have one",
-                                   errors.ECODE_INVAL)
-
-    # checks reachability
-    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
-      raise errors.OpPrereqError("Node not reachable by ping",
-                                 errors.ECODE_ENVIRON)
-
-    if not newbie_singlehomed:
-      # check reachability from my secondary ip to newbie's secondary ip
-      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
-                              source=myself.secondary_ip):
-        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
-                                   " based ping to node daemon port",
-                                   errors.ECODE_ENVIRON)
-
-    if self.op.readd:
-      exceptions = [node]
-    else:
-      exceptions = []
-
-    if self.op.master_capable:
-      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
-    else:
-      self.master_candidate = False
-
-    if self.op.readd:
-      self.new_node = old_node
-    else:
-      node_group = cfg.LookupNodeGroup(self.op.group)
-      self.new_node = objects.Node(name=node,
-                                   primary_ip=primary_ip,
-                                   secondary_ip=secondary_ip,
-                                   master_candidate=self.master_candidate,
-                                   offline=False, drained=False,
-                                   group=node_group, ndparams={})
-
-    if self.op.ndparams:
-      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
-      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
-                            "node", "cluster or group")
-
-    if self.op.hv_state:
-      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
-
-    if self.op.disk_state:
-      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
-
-    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
-    #       it a property on the base class.
-    rpcrunner = rpc.DnsOnlyRunner()
-    result = rpcrunner.call_version([node])[node]
-    result.Raise("Can't get version information from node %s" % node)
-    if constants.PROTOCOL_VERSION == result.payload:
-      logging.info("Communication to node %s fine, sw version %s match",
-                   node, result.payload)
-    else:
-      raise errors.OpPrereqError("Version mismatch master version %s,"
-                                 " node version %s" %
-                                 (constants.PROTOCOL_VERSION, result.payload),
-                                 errors.ECODE_ENVIRON)
-
-    vg_name = cfg.GetVGName()
-    if vg_name is not None:
-      vparams = {constants.NV_PVLIST: [vg_name]}
-      excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
-      cname = self.cfg.GetClusterName()
-      result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
-      (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
-      if errmsgs:
-        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
-                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
-
-  def Exec(self, feedback_fn):
-    """Adds the new node to the cluster.
-
-    """
-    new_node = self.new_node
-    node = new_node.name
-
-    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
-      "Not owning BGL"
-
-    # We adding a new node so we assume it's powered
-    new_node.powered = True
-
-    # for re-adds, reset the offline/drained/master-candidate flags;
-    # we need to reset here, otherwise offline would prevent RPC calls
-    # later in the procedure; this also means that if the re-add
-    # fails, we are left with a non-offlined, broken node
-    if self.op.readd:
-      new_node.drained = new_node.offline = False # pylint: disable=W0201
-      self.LogInfo("Readding a node, the offline/drained flags were reset")
-      # if we demote the node, we do cleanup later in the procedure
-      new_node.master_candidate = self.master_candidate
-      if self.changed_primary_ip:
-        new_node.primary_ip = self.op.primary_ip
-
-    # copy the master/vm_capable flags
-    for attr in self._NFLAGS:
-      setattr(new_node, attr, getattr(self.op, attr))
-
-    # notify the user about any possible mc promotion
-    if new_node.master_candidate:
-      self.LogInfo("Node will be a master candidate")
-
-    if self.op.ndparams:
-      new_node.ndparams = self.op.ndparams
-    else:
-      new_node.ndparams = {}
-
-    if self.op.hv_state:
-      new_node.hv_state_static = self.new_hv_state
-
-    if self.op.disk_state:
-      new_node.disk_state_static = self.new_disk_state
-
-    # Add node to our /etc/hosts, and add key to known_hosts
-    if self.cfg.GetClusterInfo().modify_etc_hosts:
-      master_node = self.cfg.GetMasterNode()
-      result = self.rpc.call_etc_hosts_modify(master_node,
-                                              constants.ETC_HOSTS_ADD,
-                                              self.hostname.name,
-                                              self.hostname.ip)
-      result.Raise("Can't update hosts file with new host data")
-
-    if new_node.secondary_ip != new_node.primary_ip:
-      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
-                               False)
-
-    node_verify_list = [self.cfg.GetMasterNode()]
-    node_verify_param = {
-      constants.NV_NODELIST: ([node], {}),
-      # TODO: do a node-net-test as well?
-    }
-
-    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
-                                       self.cfg.GetClusterName())
-    for verifier in node_verify_list:
-      result[verifier].Raise("Cannot communicate with node %s" % verifier)
-      nl_payload = result[verifier].payload[constants.NV_NODELIST]
-      if nl_payload:
-        for failed in nl_payload:
-          feedback_fn("ssh/hostname verification failed"
-                      " (checking from %s): %s" %
-                      (verifier, nl_payload[failed]))
-        raise errors.OpExecError("ssh/hostname verification failed")
-
-    if self.op.readd:
-      _RedistributeAncillaryFiles(self)
-      self.context.ReaddNode(new_node)
-      # make sure we redistribute the config
-      self.cfg.Update(new_node, feedback_fn)
-      # and make sure the new node will not have old files around
-      if not new_node.master_candidate:
-        result = self.rpc.call_node_demote_from_mc(new_node.name)
-        msg = result.fail_msg
-        if msg:
-          self.LogWarning("Node failed to demote itself from master"
-                          " candidate status: %s" % msg)
-    else:
-      _RedistributeAncillaryFiles(self, additional_nodes=[node],
-                                  additional_vm=self.op.vm_capable)
-      self.context.AddNode(new_node, self.proc.GetECId())
-
-
-class LUNodeSetParams(LogicalUnit):
-  """Modifies the parameters of a node.
-
-  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
-      to the node role (as _ROLE_*)
-  @cvar _R2F: a dictionary from node role to tuples of flags
-  @cvar _FLAGS: a list of attribute names corresponding to the flags
-
-  """
-  HPATH = "node-modify"
-  HTYPE = constants.HTYPE_NODE
-  REQ_BGL = False
-  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
-  _F2R = {
-    (True, False, False): _ROLE_CANDIDATE,
-    (False, True, False): _ROLE_DRAINED,
-    (False, False, True): _ROLE_OFFLINE,
-    (False, False, False): _ROLE_REGULAR,
-    }
-  _R2F = dict((v, k) for k, v in _F2R.items())
-  _FLAGS = ["master_candidate", "drained", "offline"]
-
-  def CheckArguments(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
-                self.op.master_capable, self.op.vm_capable,
-                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
-                self.op.disk_state]
-    if all_mods.count(None) == len(all_mods):
-      raise errors.OpPrereqError("Please pass at least one modification",
-                                 errors.ECODE_INVAL)
-    if all_mods.count(True) > 1:
-      raise errors.OpPrereqError("Can't set the node into more than one"
-                                 " state at the same time",
-                                 errors.ECODE_INVAL)
-
-    # Boolean value that tells us whether we might be demoting from MC
-    self.might_demote = (self.op.master_candidate is False or
-                         self.op.offline is True or
-                         self.op.drained is True or
-                         self.op.master_capable is False)
-
-    if self.op.secondary_ip:
-      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
-        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
-                                   " address" % self.op.secondary_ip,
-                                   errors.ECODE_INVAL)
-
-    self.lock_all = self.op.auto_promote and self.might_demote
-    self.lock_instances = self.op.secondary_ip is not None
-
-  def _InstanceFilter(self, instance):
-    """Filter for getting affected instances.
-
-    """
-    return (instance.disk_template in constants.DTS_INT_MIRROR and
-            self.op.node_name in instance.all_nodes)
-
-  def ExpandNames(self):
-    if self.lock_all:
-      self.needed_locks = {
-        locking.LEVEL_NODE: locking.ALL_SET,
-
-        # Block allocations when all nodes are locked
-        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
-        }
-    else:
-      self.needed_locks = {
-        locking.LEVEL_NODE: self.op.node_name,
-        }
-
-    # Since modifying a node can have severe effects on currently running
-    # operations the resource lock is at least acquired in shared mode
-    self.needed_locks[locking.LEVEL_NODE_RES] = \
-      self.needed_locks[locking.LEVEL_NODE]
-
-    # Get all locks except nodes in shared mode; they are not used for anything
-    # but read-only access
-    self.share_locks = _ShareAll()
-    self.share_locks[locking.LEVEL_NODE] = 0
-    self.share_locks[locking.LEVEL_NODE_RES] = 0
-    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
-
-    if self.lock_instances:
-      self.needed_locks[locking.LEVEL_INSTANCE] = \
-        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on the master node.
-
-    """
-    return {
-      "OP_TARGET": self.op.node_name,
-      "MASTER_CANDIDATE": str(self.op.master_candidate),
-      "OFFLINE": str(self.op.offline),
-      "DRAINED": str(self.op.drained),
-      "MASTER_CAPABLE": str(self.op.master_capable),
-      "VM_CAPABLE": str(self.op.vm_capable),
-      }
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    nl = [self.cfg.GetMasterNode(), self.op.node_name]
-    return (nl, nl)
-
-  def CheckPrereq(self):
-    """Check prerequisites.
+    for instance_name in owned_instances:
+      _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
 
-    This only checks the instance list against the existing names.
+  def _GetQueryData(self, lu):
+    """Computes the list of instances and their attributes.
 
     """
-    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+    if self.do_grouplocks:
+      self._CheckGroupLocks(lu)
 
-    if self.lock_instances:
-      affected_instances = \
-        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+    cluster = lu.cfg.GetClusterInfo()
+    all_info = lu.cfg.GetAllInstancesInfo()
 
-      # Verify instance locks
-      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
-      wanted_instances = frozenset(affected_instances.keys())
-      if wanted_instances - owned_instances:
-        raise errors.OpPrereqError("Instances affected by changing node %s's"
-                                   " secondary IP address have changed since"
-                                   " locks were acquired, wanted '%s', have"
-                                   " '%s'; retry the operation" %
-                                   (self.op.node_name,
-                                    utils.CommaJoin(wanted_instances),
-                                    utils.CommaJoin(owned_instances)),
-                                   errors.ECODE_STATE)
-    else:
-      affected_instances = None
-
-    if (self.op.master_candidate is not None or
-        self.op.drained is not None or
-        self.op.offline is not None):
-      # we can't change the master's node flags
-      if self.op.node_name == self.cfg.GetMasterNode():
-        raise errors.OpPrereqError("The master role can be changed"
-                                   " only via master-failover",
-                                   errors.ECODE_INVAL)
+    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
 
-    if self.op.master_candidate and not node.master_capable:
-      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
-                                 " it a master candidate" % node.name,
-                                 errors.ECODE_STATE)
+    instance_list = [all_info[name] for name in instance_names]
+    nodes = frozenset(itertools.chain(*(inst.all_nodes
+                                        for inst in instance_list)))
+    hv_list = list(set([inst.hypervisor for inst in instance_list]))
+    bad_nodes = []
+    offline_nodes = []
+    wrongnode_inst = set()
 
-    if self.op.vm_capable is False:
-      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
-      if ipri or isec:
-        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
-                                   " the vm_capable flag" % node.name,
-                                   errors.ECODE_STATE)
+    # Gather data as requested
+    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
+      live_data = {}
+      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
+      for name in nodes:
+        result = node_data[name]
+        if result.offline:
+          # offline nodes will be in both lists
+          assert result.fail_msg
+          offline_nodes.append(name)
+        if result.fail_msg:
+          bad_nodes.append(name)
+        elif result.payload:
+          for inst in result.payload:
+            if inst in all_info:
+              if all_info[inst].primary_node == name:
+                live_data.update(result.payload)
+              else:
+                wrongnode_inst.add(inst)
+            else:
+              # orphan instance; we don't list it here as we don't
+              # handle this case yet in the output of instance listing
+              logging.warning("Orphan instance '%s' found on node %s",
+                              inst, name)
+        # else no instance is alive
+    else:
+      live_data = {}
 
-    if node.master_candidate and self.might_demote and not self.lock_all:
-      assert not self.op.auto_promote, "auto_promote set but lock_all not"
-      # check if after removing the current node, we're missing master
-      # candidates
-      (mc_remaining, mc_should, _) = \
-          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
-      if mc_remaining < mc_should:
-        raise errors.OpPrereqError("Not enough master candidates, please"
-                                   " pass auto promote option to allow"
-                                   " promotion (--auto-promote or RAPI"
-                                   " auto_promote=True)", errors.ECODE_STATE)
-
-    self.old_flags = old_flags = (node.master_candidate,
-                                  node.drained, node.offline)
-    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
-    self.old_role = old_role = self._F2R[old_flags]
-
-    # Check for ineffective changes
-    for attr in self._FLAGS:
-      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
-        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
-        setattr(self.op, attr, None)
-
-    # Past this point, any flag change to False means a transition
-    # away from the respective state, as only real changes are kept
-
-    # TODO: We might query the real power state if it supports OOB
-    if _SupportsOob(self.cfg, node):
-      if self.op.offline is False and not (node.powered or
-                                           self.op.powered is True):
-        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
-                                    " offline status can be reset") %
-                                   self.op.node_name, errors.ECODE_STATE)
-    elif self.op.powered is not None:
-      raise errors.OpPrereqError(("Unable to change powered state for node %s"
-                                  " as it does not support out-of-band"
-                                  " handling") % self.op.node_name,
-                                 errors.ECODE_STATE)
+    if query.IQ_DISKUSAGE in self.requested_data:
+      gmi = ganeti.masterd.instance
+      disk_usage = dict((inst.name,
+                         gmi.ComputeDiskSize(inst.disk_template,
+                                             [{constants.IDISK_SIZE: disk.size}
+                                              for disk in inst.disks]))
+                        for inst in instance_list)
+    else:
+      disk_usage = None
 
-    # If we're being deofflined/drained, we'll MC ourself if needed
-    if (self.op.drained is False or self.op.offline is False or
-        (self.op.master_capable and not node.master_capable)):
-      if _DecideSelfPromotion(self):
-        self.op.master_candidate = True
-        self.LogInfo("Auto-promoting node to master candidate")
-
-    # If we're no longer master capable, we'll demote ourselves from MC
-    if self.op.master_capable is False and node.master_candidate:
-      self.LogInfo("Demoting from master candidate")
-      self.op.master_candidate = False
-
-    # Compute new role
-    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
-    if self.op.master_candidate:
-      new_role = self._ROLE_CANDIDATE
-    elif self.op.drained:
-      new_role = self._ROLE_DRAINED
-    elif self.op.offline:
-      new_role = self._ROLE_OFFLINE
-    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
-      # False is still in new flags, which means we're un-setting (the
-      # only) True flag
-      new_role = self._ROLE_REGULAR
-    else: # no new flags, nothing, keep old role
-      new_role = old_role
-
-    self.new_role = new_role
-
-    if old_role == self._ROLE_OFFLINE and new_role != old_role:
-      # Trying to transition out of offline status
-      result = self.rpc.call_version([node.name])[node.name]
-      if result.fail_msg:
-        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
-                                   " to report its version: %s" %
-                                   (node.name, result.fail_msg),
-                                   errors.ECODE_STATE)
-      else:
-        self.LogWarning("Transitioning node from offline to online state"
-                        " without using re-add. Please make sure the node"
-                        " is healthy!")
-
-    # When changing the secondary ip, verify if this is a single-homed to
-    # multi-homed transition or vice versa, and apply the relevant
-    # restrictions.
-    if self.op.secondary_ip:
-      # Ok even without locking, because this can't be changed by any LU
-      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
-      master_singlehomed = master.secondary_ip == master.primary_ip
-      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
-        if self.op.force and node.name == master.name:
-          self.LogWarning("Transitioning from single-homed to multi-homed"
-                          " cluster; all nodes will require a secondary IP"
-                          " address")
-        else:
-          raise errors.OpPrereqError("Changing the secondary ip on a"
-                                     " single-homed cluster requires the"
-                                     " --force option to be passed, and the"
-                                     " target node to be the master",
-                                     errors.ECODE_INVAL)
-      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
-        if self.op.force and node.name == master.name:
-          self.LogWarning("Transitioning from multi-homed to single-homed"
-                          " cluster; secondary IP addresses will have to be"
-                          " removed")
+    if query.IQ_CONSOLE in self.requested_data:
+      consinfo = {}
+      for inst in instance_list:
+        if inst.name in live_data:
+          # Instance is running
+          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
         else:
-          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
-                                     " same as the primary IP on a multi-homed"
-                                     " cluster, unless the --force option is"
-                                     " passed, and the target node is the"
-                                     " master", errors.ECODE_INVAL)
-
-      assert not (frozenset(affected_instances) -
-                  self.owned_locks(locking.LEVEL_INSTANCE))
-
-      if node.offline:
-        if affected_instances:
-          msg = ("Cannot change secondary IP address: offline node has"
-                 " instances (%s) configured to use it" %
-                 utils.CommaJoin(affected_instances.keys()))
-          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
-      else:
-        # On online nodes, check that no instances are running, and that
-        # the node has the new ip and we can reach it.
-        for instance in affected_instances.values():
-          _CheckInstanceState(self, instance, INSTANCE_DOWN,
-                              msg="cannot change secondary ip")
-
-        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
-        if master.name != node.name:
-          # check reachability from master secondary ip to new secondary ip
-          if not netutils.TcpPing(self.op.secondary_ip,
-                                  constants.DEFAULT_NODED_PORT,
-                                  source=master.secondary_ip):
-            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
-                                       " based ping to node daemon port",
-                                       errors.ECODE_ENVIRON)
-
-    if self.op.ndparams:
-      new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
-      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
-      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
-                            "node", "cluster or group")
-      self.new_ndparams = new_ndparams
-
-    if self.op.hv_state:
-      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
-                                                 self.node.hv_state_static)
-
-    if self.op.disk_state:
-      self.new_disk_state = \
-        _MergeAndVerifyDiskState(self.op.disk_state,
-                                 self.node.disk_state_static)
-
-  def Exec(self, feedback_fn):
-    """Modifies a node.
-
-    """
-    node = self.node
-    old_role = self.old_role
-    new_role = self.new_role
-
-    result = []
-
-    if self.op.ndparams:
-      node.ndparams = self.new_ndparams
+          consinfo[inst.name] = None
+      assert set(consinfo.keys()) == set(instance_names)
+    else:
+      consinfo = None
 
-    if self.op.powered is not None:
-      node.powered = self.op.powered
+    if query.IQ_NODES in self.requested_data:
+      node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
+                                            instance_list)))
+      nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
+      groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
+                    for uuid in set(map(operator.attrgetter("group"),
+                                        nodes.values())))
+    else:
+      nodes = None
+      groups = None
 
-    if self.op.hv_state:
-      node.hv_state_static = self.new_hv_state
+    if query.IQ_NETWORKS in self.requested_data:
+      net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
+                                    for i in instance_list))
+      networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
+    else:
+      networks = None
 
-    if self.op.disk_state:
-      node.disk_state_static = self.new_disk_state
+    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
+                                   disk_usage, offline_nodes, bad_nodes,
+                                   live_data, wrongnode_inst, consinfo,
+                                   nodes, groups, networks)
 
-    for attr in ["master_capable", "vm_capable"]:
-      val = getattr(self.op, attr)
-      if val is not None:
-        setattr(node, attr, val)
-        result.append((attr, str(val)))
 
-    if new_role != old_role:
-      # Tell the node to demote itself, if no longer MC and not offline
-      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
-        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
-        if msg:
-          self.LogWarning("Node failed to demote itself: %s", msg)
+class LUQuery(NoHooksLU):
+  """Query for resources/items of a certain kind.
 
-      new_flags = self._R2F[new_role]
-      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
-        if of != nf:
-          result.append((desc, str(nf)))
-      (node.master_candidate, node.drained, node.offline) = new_flags
+  """
+  # pylint: disable=W0142
+  REQ_BGL = False
 
-      # we locked all nodes, we adjust the CP before updating this node
-      if self.lock_all:
-        _AdjustCandidatePool(self, [node.name])
+  def CheckArguments(self):
+    qcls = _GetQueryImplementation(self.op.what)
 
-    if self.op.secondary_ip:
-      node.secondary_ip = self.op.secondary_ip
-      result.append(("secondary_ip", self.op.secondary_ip))
+    self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
 
-    # this will trigger configuration file update, if needed
-    self.cfg.Update(node, feedback_fn)
+  def ExpandNames(self):
+    self.impl.ExpandNames(self)
 
-    # this will trigger job queue propagation or cleanup if the mc
-    # flag changed
-    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
-      self.context.ReaddNode(node)
+  def DeclareLocks(self, level):
+    self.impl.DeclareLocks(self, level)
 
-    return result
+  def Exec(self, feedback_fn):
+    return self.impl.NewStyleQuery(self)
 
 
-class LUNodePowercycle(NoHooksLU):
-  """Powercycles a node.
+class LUQueryFields(NoHooksLU):
+  """Query for resources/items of a certain kind.
 
   """
+  # pylint: disable=W0142
   REQ_BGL = False
 
   def CheckArguments(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
-      raise errors.OpPrereqError("The node is the master and the force"
-                                 " parameter was not set",
-                                 errors.ECODE_INVAL)
+    self.qcls = _GetQueryImplementation(self.op.what)
 
   def ExpandNames(self):
-    """Locking for PowercycleNode.
-
-    This is a last-resort option and shouldn't block on other
-    jobs. Therefore, we grab no locks.
-
-    """
     self.needed_locks = {}
 
   def Exec(self, feedback_fn):
-    """Reboots a node.
-
-    """
-    result = self.rpc.call_node_powercycle(self.op.node_name,
-                                           self.cfg.GetHypervisorType())
-    result.Raise("Failed to schedule the reboot")
-    return result.payload
+    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
 
 
 class LUInstanceActivateDisks(NoHooksLU):
@@ -4414,70 +3087,6 @@ class LUInstanceMove(LogicalUnit):
                                  (instance.name, target_node, msg))
 
 
-class LUNodeMigrate(LogicalUnit):
-  """Migrate all instances from a node.
-
-  """
-  HPATH = "node-migrate"
-  HTYPE = constants.HTYPE_NODE
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    pass
-
-  def ExpandNames(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
-    self.share_locks = _ShareAll()
-    self.needed_locks = {
-      locking.LEVEL_NODE: [self.op.node_name],
-      }
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on the master, the primary and all the secondaries.
-
-    """
-    return {
-      "NODE_NAME": self.op.node_name,
-      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
-      }
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    nl = [self.cfg.GetMasterNode()]
-    return (nl, nl)
-
-  def CheckPrereq(self):
-    pass
-
-  def Exec(self, feedback_fn):
-    # Prepare jobs for migration instances
-    allow_runtime_changes = self.op.allow_runtime_changes
-    jobs = [
-      [opcodes.OpInstanceMigrate(instance_name=inst.name,
-                                 mode=self.op.mode,
-                                 live=self.op.live,
-                                 iallocator=self.op.iallocator,
-                                 target_node=self.op.target_node,
-                                 allow_runtime_changes=allow_runtime_changes,
-                                 ignore_ipolicy=self.op.ignore_ipolicy)]
-      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
-
-    # TODO: Run iallocator in this opcode and pass correct placement options to
-    # OpInstanceMigrate. Since other jobs can modify the cluster between
-    # running the iallocator and the actual migration, a good consistency model
-    # will have to be found.
-
-    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
-            frozenset([self.op.node_name]))
-
-    return ResultWithJobs(jobs)
-
-
 class TLMigrateInstance(Tasklet):
   """Tasklet class for instance migration.
 
@@ -8142,267 +6751,6 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
 
 
-class LURepairNodeStorage(NoHooksLU):
-  """Repairs the volume group on a node.
-
-  """
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
-    storage_type = self.op.storage_type
-
-    if (constants.SO_FIX_CONSISTENCY not in
-        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
-      raise errors.OpPrereqError("Storage units of type '%s' can not be"
-                                 " repaired" % storage_type,
-                                 errors.ECODE_INVAL)
-
-  def ExpandNames(self):
-    self.needed_locks = {
-      locking.LEVEL_NODE: [self.op.node_name],
-      }
-
-  def _CheckFaultyDisks(self, instance, node_name):
-    """Ensure faulty disks abort the opcode or at least warn."""
-    try:
-      if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
-                                  node_name, True):
-        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
-                                   " node '%s'" % (instance.name, node_name),
-                                   errors.ECODE_STATE)
-    except errors.OpPrereqError, err:
-      if self.op.ignore_consistency:
-        self.LogWarning(str(err.args[0]))
-      else:
-        raise
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    """
-    # Check whether any instance on this node has faulty disks
-    for inst in _GetNodeInstances(self.cfg, self.op.node_name):
-      if inst.admin_state != constants.ADMINST_UP:
-        continue
-      check_nodes = set(inst.all_nodes)
-      check_nodes.discard(self.op.node_name)
-      for inst_node_name in check_nodes:
-        self._CheckFaultyDisks(inst, inst_node_name)
-
-  def Exec(self, feedback_fn):
-    feedback_fn("Repairing storage unit '%s' on %s ..." %
-                (self.op.name, self.op.node_name))
-
-    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
-    result = self.rpc.call_storage_execute(self.op.node_name,
-                                           self.op.storage_type, st_args,
-                                           self.op.name,
-                                           constants.SO_FIX_CONSISTENCY)
-    result.Raise("Failed to repair storage unit '%s' on %s" %
-                 (self.op.name, self.op.node_name))
-
-
-class LUNodeEvacuate(NoHooksLU):
-  """Evacuates instances off a list of nodes.
-
-  """
-  REQ_BGL = False
-
-  _MODE2IALLOCATOR = {
-    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
-    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
-    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
-    }
-  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
-  assert (frozenset(_MODE2IALLOCATOR.values()) ==
-          constants.IALLOCATOR_NEVAC_MODES)
-
-  def CheckArguments(self):
-    _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
-
-  def ExpandNames(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
-    if self.op.remote_node is not None:
-      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-      assert self.op.remote_node
-
-      if self.op.remote_node == self.op.node_name:
-        raise errors.OpPrereqError("Can not use evacuated node as a new"
-                                   " secondary node", errors.ECODE_INVAL)
-
-      if self.op.mode != constants.NODE_EVAC_SEC:
-        raise errors.OpPrereqError("Without the use of an iallocator only"
-                                   " secondary instances can be evacuated",
-                                   errors.ECODE_INVAL)
-
-    # Declare locks
-    self.share_locks = _ShareAll()
-    self.needed_locks = {
-      locking.LEVEL_INSTANCE: [],
-      locking.LEVEL_NODEGROUP: [],
-      locking.LEVEL_NODE: [],
-      }
-
-    # Determine nodes (via group) optimistically, needs verification once locks
-    # have been acquired
-    self.lock_nodes = self._DetermineNodes()
-
-  def _DetermineNodes(self):
-    """Gets the list of nodes to operate on.
-
-    """
-    if self.op.remote_node is None:
-      # Iallocator will choose any node(s) in the same group
-      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
-    else:
-      group_nodes = frozenset([self.op.remote_node])
-
-    # Determine nodes to be locked
-    return set([self.op.node_name]) | group_nodes
-
-  def _DetermineInstances(self):
-    """Builds list of instances to operate on.
-
-    """
-    assert self.op.mode in constants.NODE_EVAC_MODES
-
-    if self.op.mode == constants.NODE_EVAC_PRI:
-      # Primary instances only
-      inst_fn = _GetNodePrimaryInstances
-      assert self.op.remote_node is None, \
-        "Evacuating primary instances requires iallocator"
-    elif self.op.mode == constants.NODE_EVAC_SEC:
-      # Secondary instances only
-      inst_fn = _GetNodeSecondaryInstances
-    else:
-      # All instances
-      assert self.op.mode == constants.NODE_EVAC_ALL
-      inst_fn = _GetNodeInstances
-      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
-      # per instance
-      raise errors.OpPrereqError("Due to an issue with the iallocator"
-                                 " interface it is not possible to evacuate"
-                                 " all instances at once; specify explicitly"
-                                 " whether to evacuate primary or secondary"
-                                 " instances",
-                                 errors.ECODE_INVAL)
-
-    return inst_fn(self.cfg, self.op.node_name)
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_INSTANCE:
-      # Lock instances optimistically, needs verification once node and group
-      # locks have been acquired
-      self.needed_locks[locking.LEVEL_INSTANCE] = \
-        set(i.name for i in self._DetermineInstances())
-
-    elif level == locking.LEVEL_NODEGROUP:
-      # Lock node groups for all potential target nodes optimistically, needs
-      # verification once nodes have been acquired
-      self.needed_locks[locking.LEVEL_NODEGROUP] = \
-        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
-
-    elif level == locking.LEVEL_NODE:
-      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
-
-  def CheckPrereq(self):
-    # Verify locks
-    owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
-    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
-    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
-
-    need_nodes = self._DetermineNodes()
-
-    if not owned_nodes.issuperset(need_nodes):
-      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
-                                 " locks were acquired, current nodes are"
-                                 " are '%s', used to be '%s'; retry the"
-                                 " operation" %
-                                 (self.op.node_name,
-                                  utils.CommaJoin(need_nodes),
-                                  utils.CommaJoin(owned_nodes)),
-                                 errors.ECODE_STATE)
-
-    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
-    if owned_groups != wanted_groups:
-      raise errors.OpExecError("Node groups changed since locks were acquired,"
-                               " current groups are '%s', used to be '%s';"
-                               " retry the operation" %
-                               (utils.CommaJoin(wanted_groups),
-                                utils.CommaJoin(owned_groups)))
-
-    # Determine affected instances
-    self.instances = self._DetermineInstances()
-    self.instance_names = [i.name for i in self.instances]
-
-    if set(self.instance_names) != owned_instances:
-      raise errors.OpExecError("Instances on node '%s' changed since locks"
-                               " were acquired, current instances are '%s',"
-                               " used to be '%s'; retry the operation" %
-                               (self.op.node_name,
-                                utils.CommaJoin(self.instance_names),
-                                utils.CommaJoin(owned_instances)))
-
-    if self.instance_names:
-      self.LogInfo("Evacuating instances from node '%s': %s",
-                   self.op.node_name,
-                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
-    else:
-      self.LogInfo("No instances to evacuate from node '%s'",
-                   self.op.node_name)
-
-    if self.op.remote_node is not None:
-      for i in self.instances:
-        if i.primary_node == self.op.remote_node:
-          raise errors.OpPrereqError("Node %s is the primary node of"
-                                     " instance %s, cannot use it as"
-                                     " secondary" %
-                                     (self.op.remote_node, i.name),
-                                     errors.ECODE_INVAL)
-
-  def Exec(self, feedback_fn):
-    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
-
-    if not self.instance_names:
-      # No instances to evacuate
-      jobs = []
-
-    elif self.op.iallocator is not None:
-      # TODO: Implement relocation to other group
-      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
-      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
-                                     instances=list(self.instance_names))
-      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
-
-      ial.Run(self.op.iallocator)
-
-      if not ial.success:
-        raise errors.OpPrereqError("Can't compute node evacuation using"
-                                   " iallocator '%s': %s" %
-                                   (self.op.iallocator, ial.info),
-                                   errors.ECODE_NORES)
-
-      jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
-
-    elif self.op.remote_node is not None:
-      assert self.op.mode == constants.NODE_EVAC_SEC
-      jobs = [
-        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
-                                        remote_node=self.op.remote_node,
-                                        disks=[],
-                                        mode=constants.REPLACE_DISK_CHG,
-                                        early_release=self.op.early_release)]
-        for instance_name in self.instance_names]
-
-    else:
-      raise errors.ProgrammerError("No iallocator or remote node")
-
-    return ResultWithJobs(jobs)
-
-
 def _DiskSizeInBytesToMebibytes(lu, size):
   """Converts a disk size in bytes to mebibytes.
 
index 5d630c6..2d675b8 100644 (file)
@@ -37,6 +37,17 @@ from ganeti import ssconf
 from ganeti import utils
 
 
+# States of instance
+INSTANCE_DOWN = [constants.ADMINST_DOWN]
+INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
+INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
+
+#: Instance status in which an instance can be marked as offline/online
+CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
+  constants.ADMINST_OFFLINE,
+  ]))
+
+
 def _ExpandItemName(fn, name, kind):
   """Expand an item name.
 
@@ -873,3 +884,131 @@ def _MapInstanceDisksToNodes(instances):
               for inst in instances
               for (node, vols) in inst.MapLVsByNode().items()
               for vol in vols)
+
+
+def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
+  """Make sure that none of the given paramters is global.
+
+  If a global parameter is found, an L{errors.OpPrereqError} exception is
+  raised. This is used to avoid setting global parameters for individual nodes.
+
+  @type params: dictionary
+  @param params: Parameters to check
+  @type glob_pars: dictionary
+  @param glob_pars: Forbidden parameters
+  @type kind: string
+  @param kind: Kind of parameters (e.g. "node")
+  @type bad_levels: string
+  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
+      "instance")
+  @type good_levels: strings
+  @param good_levels: Level(s) at which the parameters are allowed (e.g.
+      "cluster or group")
+
+  """
+  used_globals = glob_pars.intersection(params)
+  if used_globals:
+    msg = ("The following %s parameters are global and cannot"
+           " be customized at %s level, please modify them at"
+           " %s level: %s" %
+           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
+    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _IsExclusiveStorageEnabledNode(cfg, node):
+  """Whether exclusive_storage is in effect for the given node.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type node: L{objects.Node}
+  @param node: The node
+  @rtype: bool
+  @return: The effective value of exclusive_storage
+
+  """
+  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
+
+
+def _CheckInstanceState(lu, instance, req_states, msg=None):
+  """Ensure that an instance is in one of the required states.
+
+  @param lu: the LU on behalf of which we make the check
+  @param instance: the instance to check
+  @param msg: if passed, should be a message to replace the default one
+  @raise errors.OpPrereqError: if the instance is not in the required state
+
+  """
+  if msg is None:
+    msg = ("can't use instance from outside %s states" %
+           utils.CommaJoin(req_states))
+  if instance.admin_state not in req_states:
+    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
+                               (instance.name, instance.admin_state, msg),
+                               errors.ECODE_STATE)
+
+  if constants.ADMINST_UP not in req_states:
+    pnode = instance.primary_node
+    if not lu.cfg.GetNodeInfo(pnode).offline:
+      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+      ins_l.Raise("Can't contact node %s for instance information" % pnode,
+                  prereq=True, ecode=errors.ECODE_ENVIRON)
+      if instance.name in ins_l.payload:
+        raise errors.OpPrereqError("Instance %s is running, %s" %
+                                   (instance.name, msg), errors.ECODE_STATE)
+    else:
+      lu.LogWarning("Primary node offline, ignoring check that instance"
+                     " is down")
+
+
+def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
+  """Check the sanity of iallocator and node arguments and use the
+  cluster-wide iallocator if appropriate.
+
+  Check that at most one of (iallocator, node) is specified. If none is
+  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
+  then the LU's opcode's iallocator slot is filled with the cluster-wide
+  default iallocator.
+
+  @type iallocator_slot: string
+  @param iallocator_slot: the name of the opcode iallocator slot
+  @type node_slot: string
+  @param node_slot: the name of the opcode target node slot
+
+  """
+  node = getattr(lu.op, node_slot, None)
+  ialloc = getattr(lu.op, iallocator_slot, None)
+  if node == []:
+    node = None
+
+  if node is not None and ialloc is not None:
+    raise errors.OpPrereqError("Do not specify both, iallocator and node",
+                               errors.ECODE_INVAL)
+  elif ((node is None and ialloc is None) or
+        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
+    default_iallocator = lu.cfg.GetDefaultIAllocator()
+    if default_iallocator:
+      setattr(lu.op, iallocator_slot, default_iallocator)
+    else:
+      raise errors.OpPrereqError("No iallocator or node given and no"
+                                 " cluster-wide default iallocator found;"
+                                 " please specify either an iallocator or a"
+                                 " node, or set a cluster-wide default"
+                                 " iallocator", errors.ECODE_INVAL)
+
+
+def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
+  faulty = []
+
+  for dev in instance.disks:
+    cfg.SetDiskID(dev, node_name)
+
+  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+                                                                instance))
+  result.Raise("Failed to get disk status from node %s" % node_name,
+               prereq=prereq, ecode=errors.ECODE_ENVIRON)
+
+  for idx, bdev_status in enumerate(result.payload):
+    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
+      faulty.append(idx)
+
+  return faulty
diff --git a/lib/cmdlib/node.py b/lib/cmdlib/node.py
new file mode 100644 (file)
index 0000000..b186059
--- /dev/null
@@ -0,0 +1,1569 @@
+#
+#
+
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Logical units dealing with nodes."""
+
+import logging
+import operator
+
+from ganeti import constants
+from ganeti import errors
+from ganeti import locking
+from ganeti import netutils
+from ganeti import objects
+from ganeti import opcodes
+from ganeti import qlang
+from ganeti import query
+from ganeti import rpc
+from ganeti import utils
+from ganeti.masterd import iallocator
+
+from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, _QueryBase, \
+  ResultWithJobs
+from ganeti.cmdlib.common import _CheckParamsNotGlobal, \
+  _MergeAndVerifyHvState, _MergeAndVerifyDiskState, \
+  _IsExclusiveStorageEnabledNode, _CheckNodePVs, \
+  _RedistributeAncillaryFiles, _ExpandNodeName, _ShareAll, _SupportsOob, \
+  _CheckInstanceState, INSTANCE_DOWN, _GetUpdatedParams, \
+  _AdjustCandidatePool, _CheckIAllocatorOrNode, _LoadNodeEvacResult, \
+  _GetWantedNodes, _MapInstanceDisksToNodes, _RunPostHook, \
+  _FindFaultyInstanceDisks
+
+
+def _DecideSelfPromotion(lu, exceptions=None):
+  """Decide whether I should promote myself as a master candidate.
+
+  """
+  cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
+  mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
+  # the new node will increase mc_max with one, so:
+  mc_should = min(mc_should + 1, cp_size)
+  return mc_now < mc_should
+
+
+def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
+  """Ensure that a node has the given secondary ip.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the LU on behalf of which we make the check
+  @type node: string
+  @param node: the node to check
+  @type secondary_ip: string
+  @param secondary_ip: the ip to check
+  @type prereq: boolean
+  @param prereq: whether to throw a prerequisite or an execute error
+  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
+  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
+
+  """
+  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
+  result.Raise("Failure checking secondary ip on node %s" % node,
+               prereq=prereq, ecode=errors.ECODE_ENVIRON)
+  if not result.payload:
+    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
+           " please fix and re-run this command" % secondary_ip)
+    if prereq:
+      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+    else:
+      raise errors.OpExecError(msg)
+
+
+class LUNodeAdd(LogicalUnit):
+  """Logical unit for adding node to the cluster.
+
+  """
+  HPATH = "node-add"
+  HTYPE = constants.HTYPE_NODE
+  _NFLAGS = ["master_capable", "vm_capable"]
+
+  def CheckArguments(self):
+    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
+    # validate/normalize the node name
+    self.hostname = netutils.GetHostname(name=self.op.node_name,
+                                         family=self.primary_ip_family)
+    self.op.node_name = self.hostname.name
+
+    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+      raise errors.OpPrereqError("Cannot readd the master node",
+                                 errors.ECODE_STATE)
+
+    if self.op.readd and self.op.group:
+      raise errors.OpPrereqError("Cannot pass a node group when a node is"
+                                 " being readded", errors.ECODE_INVAL)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This will run on all nodes before, and on all nodes + the new node after.
+
+    """
+    return {
+      "OP_TARGET": self.op.node_name,
+      "NODE_NAME": self.op.node_name,
+      "NODE_PIP": self.op.primary_ip,
+      "NODE_SIP": self.op.secondary_ip,
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    # Exclude added node
+    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
+    post_nodes = pre_nodes + [self.op.node_name, ]
+
+    return (pre_nodes, post_nodes)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks:
+     - the new node is not already in the config
+     - it is resolvable
+     - its parameters (single/dual homed) matches the cluster
+
+    Any errors are signaled by raising errors.OpPrereqError.
+
+    """
+    cfg = self.cfg
+    hostname = self.hostname
+    node = hostname.name
+    primary_ip = self.op.primary_ip = hostname.ip
+    if self.op.secondary_ip is None:
+      if self.primary_ip_family == netutils.IP6Address.family:
+        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
+                                   " IPv4 address must be given as secondary",
+                                   errors.ECODE_INVAL)
+      self.op.secondary_ip = primary_ip
+
+    secondary_ip = self.op.secondary_ip
+    if not netutils.IP4Address.IsValid(secondary_ip):
+      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                 " address" % secondary_ip, errors.ECODE_INVAL)
+
+    node_list = cfg.GetNodeList()
+    if not self.op.readd and node in node_list:
+      raise errors.OpPrereqError("Node %s is already in the configuration" %
+                                 node, errors.ECODE_EXISTS)
+    elif self.op.readd and node not in node_list:
+      raise errors.OpPrereqError("Node %s is not in the configuration" % node,
+                                 errors.ECODE_NOENT)
+
+    self.changed_primary_ip = False
+
+    for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
+      if self.op.readd and node == existing_node_name:
+        if existing_node.secondary_ip != secondary_ip:
+          raise errors.OpPrereqError("Readded node doesn't have the same IP"
+                                     " address configuration as before",
+                                     errors.ECODE_INVAL)
+        if existing_node.primary_ip != primary_ip:
+          self.changed_primary_ip = True
+
+        continue
+
+      if (existing_node.primary_ip == primary_ip or
+          existing_node.secondary_ip == primary_ip or
+          existing_node.primary_ip == secondary_ip or
+          existing_node.secondary_ip == secondary_ip):
+        raise errors.OpPrereqError("New node ip address(es) conflict with"
+                                   " existing node %s" % existing_node.name,
+                                   errors.ECODE_NOTUNIQUE)
+
+    # After this 'if' block, None is no longer a valid value for the
+    # _capable op attributes
+    if self.op.readd:
+      old_node = self.cfg.GetNodeInfo(node)
+      assert old_node is not None, "Can't retrieve locked node %s" % node
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, getattr(old_node, attr))
+    else:
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, True)
+
+    if self.op.readd and not self.op.vm_capable:
+      pri, sec = cfg.GetNodeInstances(node)
+      if pri or sec:
+        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
+                                   " flag set to false, but it already holds"
+                                   " instances" % node,
+                                   errors.ECODE_STATE)
+
+    # check that the type of the node (single versus dual homed) is the
+    # same as for the master
+    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
+    master_singlehomed = myself.secondary_ip == myself.primary_ip
+    newbie_singlehomed = secondary_ip == primary_ip
+    if master_singlehomed != newbie_singlehomed:
+      if master_singlehomed:
+        raise errors.OpPrereqError("The master has no secondary ip but the"
+                                   " new node has one",
+                                   errors.ECODE_INVAL)
+      else:
+        raise errors.OpPrereqError("The master has a secondary ip but the"
+                                   " new node doesn't have one",
+                                   errors.ECODE_INVAL)
+
+    # checks reachability
+    if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
+      raise errors.OpPrereqError("Node not reachable by ping",
+                                 errors.ECODE_ENVIRON)
+
+    if not newbie_singlehomed:
+      # check reachability from my secondary ip to newbie's secondary ip
+      if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
+                              source=myself.secondary_ip):
+        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+                                   " based ping to node daemon port",
+                                   errors.ECODE_ENVIRON)
+
+    if self.op.readd:
+      exceptions = [node]
+    else:
+      exceptions = []
+
+    if self.op.master_capable:
+      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    else:
+      self.master_candidate = False
+
+    if self.op.readd:
+      self.new_node = old_node
+    else:
+      node_group = cfg.LookupNodeGroup(self.op.group)
+      self.new_node = objects.Node(name=node,
+                                   primary_ip=primary_ip,
+                                   secondary_ip=secondary_ip,
+                                   master_candidate=self.master_candidate,
+                                   offline=False, drained=False,
+                                   group=node_group, ndparams={})
+
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
+                            "node", "cluster or group")
+
+    if self.op.hv_state:
+      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+
+    if self.op.disk_state:
+      self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+
+    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+    #       it a property on the base class.
+    rpcrunner = rpc.DnsOnlyRunner()
+    result = rpcrunner.call_version([node])[node]
+    result.Raise("Can't get version information from node %s" % node)
+    if constants.PROTOCOL_VERSION == result.payload:
+      logging.info("Communication to node %s fine, sw version %s match",
+                   node, result.payload)
+    else:
+      raise errors.OpPrereqError("Version mismatch master version %s,"
+                                 " node version %s" %
+                                 (constants.PROTOCOL_VERSION, result.payload),
+                                 errors.ECODE_ENVIRON)
+
+    vg_name = cfg.GetVGName()
+    if vg_name is not None:
+      vparams = {constants.NV_PVLIST: [vg_name]}
+      excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
+      cname = self.cfg.GetClusterName()
+      result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
+      (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
+      if errmsgs:
+        raise errors.OpPrereqError("Checks on node PVs failed: %s" %
+                                   "; ".join(errmsgs), errors.ECODE_ENVIRON)
+
+  def Exec(self, feedback_fn):
+    """Adds the new node to the cluster.
+
+    """
+    new_node = self.new_node
+    node = new_node.name
+
+    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+      "Not owning BGL"
+
+    # We adding a new node so we assume it's powered
+    new_node.powered = True
+
+    # for re-adds, reset the offline/drained/master-candidate flags;
+    # we need to reset here, otherwise offline would prevent RPC calls
+    # later in the procedure; this also means that if the re-add
+    # fails, we are left with a non-offlined, broken node
+    if self.op.readd:
+      new_node.drained = new_node.offline = False # pylint: disable=W0201
+      self.LogInfo("Readding a node, the offline/drained flags were reset")
+      # if we demote the node, we do cleanup later in the procedure
+      new_node.master_candidate = self.master_candidate
+      if self.changed_primary_ip:
+        new_node.primary_ip = self.op.primary_ip
+
+    # copy the master/vm_capable flags
+    for attr in self._NFLAGS:
+      setattr(new_node, attr, getattr(self.op, attr))
+
+    # notify the user about any possible mc promotion
+    if new_node.master_candidate:
+      self.LogInfo("Node will be a master candidate")
+
+    if self.op.ndparams:
+      new_node.ndparams = self.op.ndparams
+    else:
+      new_node.ndparams = {}
+
+    if self.op.hv_state:
+      new_node.hv_state_static = self.new_hv_state
+
+    if self.op.disk_state:
+      new_node.disk_state_static = self.new_disk_state
+
+    # Add node to our /etc/hosts, and add key to known_hosts
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      master_node = self.cfg.GetMasterNode()
+      result = self.rpc.call_etc_hosts_modify(master_node,
+                                              constants.ETC_HOSTS_ADD,
+                                              self.hostname.name,
+                                              self.hostname.ip)
+      result.Raise("Can't update hosts file with new host data")
+
+    if new_node.secondary_ip != new_node.primary_ip:
+      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
+                               False)
+
+    node_verify_list = [self.cfg.GetMasterNode()]
+    node_verify_param = {
+      constants.NV_NODELIST: ([node], {}),
+      # TODO: do a node-net-test as well?
+    }
+
+    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
+                                       self.cfg.GetClusterName())
+    for verifier in node_verify_list:
+      result[verifier].Raise("Cannot communicate with node %s" % verifier)
+      nl_payload = result[verifier].payload[constants.NV_NODELIST]
+      if nl_payload:
+        for failed in nl_payload:
+          feedback_fn("ssh/hostname verification failed"
+                      " (checking from %s): %s" %
+                      (verifier, nl_payload[failed]))
+        raise errors.OpExecError("ssh/hostname verification failed")
+
+    if self.op.readd:
+      _RedistributeAncillaryFiles(self)
+      self.context.ReaddNode(new_node)
+      # make sure we redistribute the config
+      self.cfg.Update(new_node, feedback_fn)
+      # and make sure the new node will not have old files around
+      if not new_node.master_candidate:
+        result = self.rpc.call_node_demote_from_mc(new_node.name)
+        msg = result.fail_msg
+        if msg:
+          self.LogWarning("Node failed to demote itself from master"
+                          " candidate status: %s" % msg)
+    else:
+      _RedistributeAncillaryFiles(self, additional_nodes=[node],
+                                  additional_vm=self.op.vm_capable)
+      self.context.AddNode(new_node, self.proc.GetECId())
+
+
+class LUNodeSetParams(LogicalUnit):
+  """Modifies the parameters of a node.
+
+  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
+      to the node role (as _ROLE_*)
+  @cvar _R2F: a dictionary from node role to tuples of flags
+  @cvar _FLAGS: a list of attribute names corresponding to the flags
+
+  """
+  HPATH = "node-modify"
+  HTYPE = constants.HTYPE_NODE
+  REQ_BGL = False
+  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
+  _F2R = {
+    (True, False, False): _ROLE_CANDIDATE,
+    (False, True, False): _ROLE_DRAINED,
+    (False, False, True): _ROLE_OFFLINE,
+    (False, False, False): _ROLE_REGULAR,
+    }
+  _R2F = dict((v, k) for k, v in _F2R.items())
+  _FLAGS = ["master_candidate", "drained", "offline"]
+
+  def CheckArguments(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
+                self.op.master_capable, self.op.vm_capable,
+                self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
+                self.op.disk_state]
+    if all_mods.count(None) == len(all_mods):
+      raise errors.OpPrereqError("Please pass at least one modification",
+                                 errors.ECODE_INVAL)
+    if all_mods.count(True) > 1:
+      raise errors.OpPrereqError("Can't set the node into more than one"
+                                 " state at the same time",
+                                 errors.ECODE_INVAL)
+
+    # Boolean value that tells us whether we might be demoting from MC
+    self.might_demote = (self.op.master_candidate is False or
+                         self.op.offline is True or
+                         self.op.drained is True or
+                         self.op.master_capable is False)
+
+    if self.op.secondary_ip:
+      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
+        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                   " address" % self.op.secondary_ip,
+                                   errors.ECODE_INVAL)
+
+    self.lock_all = self.op.auto_promote and self.might_demote
+    self.lock_instances = self.op.secondary_ip is not None
+
+  def _InstanceFilter(self, instance):
+    """Filter for getting affected instances.
+
+    """
+    return (instance.disk_template in constants.DTS_INT_MIRROR and
+            self.op.node_name in instance.all_nodes)
+
+  def ExpandNames(self):
+    if self.lock_all:
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+
+        # Block allocations when all nodes are locked
+        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+        }
+    else:
+      self.needed_locks = {
+        locking.LEVEL_NODE: self.op.node_name,
+        }
+
+    # Since modifying a node can have severe effects on currently running
+    # operations the resource lock is at least acquired in shared mode
+    self.needed_locks[locking.LEVEL_NODE_RES] = \
+      self.needed_locks[locking.LEVEL_NODE]
+
+    # Get all locks except nodes in shared mode; they are not used for anything
+    # but read-only access
+    self.share_locks = _ShareAll()
+    self.share_locks[locking.LEVEL_NODE] = 0
+    self.share_locks[locking.LEVEL_NODE_RES] = 0
+    self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
+
+    if self.lock_instances:
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master node.
+
+    """
+    return {
+      "OP_TARGET": self.op.node_name,
+      "MASTER_CANDIDATE": str(self.op.master_candidate),
+      "OFFLINE": str(self.op.offline),
+      "DRAINED": str(self.op.drained),
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode(), self.op.node_name]
+    return (nl, nl)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the instance list against the existing names.
+
+    """
+    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+
+    if self.lock_instances:
+      affected_instances = \
+        self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+      # Verify instance locks
+      owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+      wanted_instances = frozenset(affected_instances.keys())
+      if wanted_instances - owned_instances:
+        raise errors.OpPrereqError("Instances affected by changing node %s's"
+                                   " secondary IP address have changed since"
+                                   " locks were acquired, wanted '%s', have"
+                                   " '%s'; retry the operation" %
+                                   (self.op.node_name,
+                                    utils.CommaJoin(wanted_instances),
+                                    utils.CommaJoin(owned_instances)),
+                                   errors.ECODE_STATE)
+    else:
+      affected_instances = None
+
+    if (self.op.master_candidate is not None or
+        self.op.drained is not None or
+        self.op.offline is not None):
+      # we can't change the master's node flags
+      if self.op.node_name == self.cfg.GetMasterNode():
+        raise errors.OpPrereqError("The master role can be changed"
+                                   " only via master-failover",
+                                   errors.ECODE_INVAL)
+
+    if self.op.master_candidate and not node.master_capable:
+      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
+                                 " it a master candidate" % node.name,
+                                 errors.ECODE_STATE)
+
+    if self.op.vm_capable is False:
+      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
+      if ipri or isec:
+        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
+                                   " the vm_capable flag" % node.name,
+                                   errors.ECODE_STATE)
+
+    if node.master_candidate and self.might_demote and not self.lock_all:
+      assert not self.op.auto_promote, "auto_promote set but lock_all not"
+      # check if after removing the current node, we're missing master
+      # candidates
+      (mc_remaining, mc_should, _) = \
+          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
+      if mc_remaining < mc_should:
+        raise errors.OpPrereqError("Not enough master candidates, please"
+                                   " pass auto promote option to allow"
+                                   " promotion (--auto-promote or RAPI"
+                                   " auto_promote=True)", errors.ECODE_STATE)
+
+    self.old_flags = old_flags = (node.master_candidate,
+                                  node.drained, node.offline)
+    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
+    self.old_role = old_role = self._F2R[old_flags]
+
+    # Check for ineffective changes
+    for attr in self._FLAGS:
+      if (getattr(self.op, attr) is False and getattr(node, attr) is False):
+        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
+        setattr(self.op, attr, None)
+
+    # Past this point, any flag change to False means a transition
+    # away from the respective state, as only real changes are kept
+
+    # TODO: We might query the real power state if it supports OOB
+    if _SupportsOob(self.cfg, node):
+      if self.op.offline is False and not (node.powered or
+                                           self.op.powered is True):
+        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+                                    " offline status can be reset") %
+                                   self.op.node_name, errors.ECODE_STATE)
+    elif self.op.powered is not None:
+      raise errors.OpPrereqError(("Unable to change powered state for node %s"
+                                  " as it does not support out-of-band"
+                                  " handling") % self.op.node_name,
+                                 errors.ECODE_STATE)
+
+    # If we're being deofflined/drained, we'll MC ourself if needed
+    if (self.op.drained is False or self.op.offline is False or
+        (self.op.master_capable and not node.master_capable)):
+      if _DecideSelfPromotion(self):
+        self.op.master_candidate = True
+        self.LogInfo("Auto-promoting node to master candidate")
+
+    # If we're no longer master capable, we'll demote ourselves from MC
+    if self.op.master_capable is False and node.master_candidate:
+      self.LogInfo("Demoting from master candidate")
+      self.op.master_candidate = False
+
+    # Compute new role
+    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
+    if self.op.master_candidate:
+      new_role = self._ROLE_CANDIDATE
+    elif self.op.drained:
+      new_role = self._ROLE_DRAINED
+    elif self.op.offline:
+      new_role = self._ROLE_OFFLINE
+    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
+      # False is still in new flags, which means we're un-setting (the
+      # only) True flag
+      new_role = self._ROLE_REGULAR
+    else: # no new flags, nothing, keep old role
+      new_role = old_role
+
+    self.new_role = new_role
+
+    if old_role == self._ROLE_OFFLINE and new_role != old_role:
+      # Trying to transition out of offline status
+      result = self.rpc.call_version([node.name])[node.name]
+      if result.fail_msg:
+        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
+                                   " to report its version: %s" %
+                                   (node.name, result.fail_msg),
+                                   errors.ECODE_STATE)
+      else:
+        self.LogWarning("Transitioning node from offline to online state"
+                        " without using re-add. Please make sure the node"
+                        " is healthy!")
+
+    # When changing the secondary ip, verify if this is a single-homed to
+    # multi-homed transition or vice versa, and apply the relevant
+    # restrictions.
+    if self.op.secondary_ip:
+      # Ok even without locking, because this can't be changed by any LU
+      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
+      master_singlehomed = master.secondary_ip == master.primary_ip
+      if master_singlehomed and self.op.secondary_ip != node.primary_ip:
+        if self.op.force and node.name == master.name:
+          self.LogWarning("Transitioning from single-homed to multi-homed"
+                          " cluster; all nodes will require a secondary IP"
+                          " address")
+        else:
+          raise errors.OpPrereqError("Changing the secondary ip on a"
+                                     " single-homed cluster requires the"
+                                     " --force option to be passed, and the"
+                                     " target node to be the master",
+                                     errors.ECODE_INVAL)
+      elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
+        if self.op.force and node.name == master.name:
+          self.LogWarning("Transitioning from multi-homed to single-homed"
+                          " cluster; secondary IP addresses will have to be"
+                          " removed")
+        else:
+          raise errors.OpPrereqError("Cannot set the secondary IP to be the"
+                                     " same as the primary IP on a multi-homed"
+                                     " cluster, unless the --force option is"
+                                     " passed, and the target node is the"
+                                     " master", errors.ECODE_INVAL)
+
+      assert not (frozenset(affected_instances) -
+                  self.owned_locks(locking.LEVEL_INSTANCE))
+
+      if node.offline:
+        if affected_instances:
+          msg = ("Cannot change secondary IP address: offline node has"
+                 " instances (%s) configured to use it" %
+                 utils.CommaJoin(affected_instances.keys()))
+          raise errors.OpPrereqError(msg, errors.ECODE_STATE)
+      else:
+        # On online nodes, check that no instances are running, and that
+        # the node has the new ip and we can reach it.
+        for instance in affected_instances.values():
+          _CheckInstanceState(self, instance, INSTANCE_DOWN,
+                              msg="cannot change secondary ip")
+
+        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
+        if master.name != node.name:
+          # check reachability from master secondary ip to new secondary ip
+          if not netutils.TcpPing(self.op.secondary_ip,
+                                  constants.DEFAULT_NODED_PORT,
+                                  source=master.secondary_ip):
+            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+                                       " based ping to node daemon port",
+                                       errors.ECODE_ENVIRON)
+
+    if self.op.ndparams:
+      new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
+      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
+      _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
+                            "node", "cluster or group")
+      self.new_ndparams = new_ndparams
+
+    if self.op.hv_state:
+      self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+                                                 self.node.hv_state_static)
+
+    if self.op.disk_state:
+      self.new_disk_state = \
+        _MergeAndVerifyDiskState(self.op.disk_state,
+                                 self.node.disk_state_static)
+
+  def Exec(self, feedback_fn):
+    """Modifies a node.
+
+    """
+    node = self.node
+    old_role = self.old_role
+    new_role = self.new_role
+
+    result = []
+
+    if self.op.ndparams:
+      node.ndparams = self.new_ndparams
+
+    if self.op.powered is not None:
+      node.powered = self.op.powered
+
+    if self.op.hv_state:
+      node.hv_state_static = self.new_hv_state
+
+    if self.op.disk_state:
+      node.disk_state_static = self.new_disk_state
+
+    for attr in ["master_capable", "vm_capable"]:
+      val = getattr(self.op, attr)
+      if val is not None:
+        setattr(node, attr, val)
+        result.append((attr, str(val)))
+
+    if new_role != old_role:
+      # Tell the node to demote itself, if no longer MC and not offline
+      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
+        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
+        if msg:
+          self.LogWarning("Node failed to demote itself: %s", msg)
+
+      new_flags = self._R2F[new_role]
+      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
+        if of != nf:
+          result.append((desc, str(nf)))
+      (node.master_candidate, node.drained, node.offline) = new_flags
+
+      # we locked all nodes, we adjust the CP before updating this node
+      if self.lock_all:
+        _AdjustCandidatePool(self, [node.name])
+
+    if self.op.secondary_ip:
+      node.secondary_ip = self.op.secondary_ip
+      result.append(("secondary_ip", self.op.secondary_ip))
+
+    # this will trigger configuration file update, if needed
+    self.cfg.Update(node, feedback_fn)
+
+    # this will trigger job queue propagation or cleanup if the mc
+    # flag changed
+    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
+      self.context.ReaddNode(node)
+
+    return result
+
+
+class LUNodePowercycle(NoHooksLU):
+  """Powercycles a node.
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
+      raise errors.OpPrereqError("The node is the master and the force"
+                                 " parameter was not set",
+                                 errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    """Locking for PowercycleNode.
+
+    This is a last-resort option and shouldn't block on other
+    jobs. Therefore, we grab no locks.
+
+    """
+    self.needed_locks = {}
+
+  def Exec(self, feedback_fn):
+    """Reboots a node.
+
+    """
+    result = self.rpc.call_node_powercycle(self.op.node_name,
+                                           self.cfg.GetHypervisorType())
+    result.Raise("Failed to schedule the reboot")
+    return result.payload
+
+
+def _GetNodeInstancesInner(cfg, fn):
+  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
+
+
+def _GetNodePrimaryInstances(cfg, node_name):
+  """Returns primary instances on a node.
+
+  """
+  return _GetNodeInstancesInner(cfg,
+                                lambda inst: node_name == inst.primary_node)
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+  """Returns secondary instances on a node.
+
+  """
+  return _GetNodeInstancesInner(cfg,
+                                lambda inst: node_name in inst.secondary_nodes)
+
+
+def _GetNodeInstances(cfg, node_name):
+  """Returns a list of all primary and secondary instances on a node.
+
+  """
+
+  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
+
+
+class LUNodeEvacuate(NoHooksLU):
+  """Evacuates instances off a list of nodes.
+
+  """
+  REQ_BGL = False
+
+  _MODE2IALLOCATOR = {
+    constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
+    constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
+    constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
+    }
+  assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
+  assert (frozenset(_MODE2IALLOCATOR.values()) ==
+          constants.IALLOCATOR_NEVAC_MODES)
+
+  def CheckArguments(self):
+    _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
+  def ExpandNames(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+      assert self.op.remote_node
+
+      if self.op.remote_node == self.op.node_name:
+        raise errors.OpPrereqError("Can not use evacuated node as a new"
+                                   " secondary node", errors.ECODE_INVAL)
+
+      if self.op.mode != constants.NODE_EVAC_SEC:
+        raise errors.OpPrereqError("Without the use of an iallocator only"
+                                   " secondary instances can be evacuated",
+                                   errors.ECODE_INVAL)
+
+    # Declare locks
+    self.share_locks = _ShareAll()
+    self.needed_locks = {
+      locking.LEVEL_INSTANCE: [],
+      locking.LEVEL_NODEGROUP: [],
+      locking.LEVEL_NODE: [],
+      }
+
+    # Determine nodes (via group) optimistically, needs verification once locks
+    # have been acquired
+    self.lock_nodes = self._DetermineNodes()
+
+  def _DetermineNodes(self):
+    """Gets the list of nodes to operate on.
+
+    """
+    if self.op.remote_node is None:
+      # Iallocator will choose any node(s) in the same group
+      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
+    else:
+      group_nodes = frozenset([self.op.remote_node])
+
+    # Determine nodes to be locked
+    return set([self.op.node_name]) | group_nodes
+
+  def _DetermineInstances(self):
+    """Builds list of instances to operate on.
+
+    """
+    assert self.op.mode in constants.NODE_EVAC_MODES
+
+    if self.op.mode == constants.NODE_EVAC_PRI:
+      # Primary instances only
+      inst_fn = _GetNodePrimaryInstances
+      assert self.op.remote_node is None, \
+        "Evacuating primary instances requires iallocator"
+    elif self.op.mode == constants.NODE_EVAC_SEC:
+      # Secondary instances only
+      inst_fn = _GetNodeSecondaryInstances
+    else:
+      # All instances
+      assert self.op.mode == constants.NODE_EVAC_ALL
+      inst_fn = _GetNodeInstances
+      # TODO: In 2.6, change the iallocator interface to take an evacuation mode
+      # per instance
+      raise errors.OpPrereqError("Due to an issue with the iallocator"
+                                 " interface it is not possible to evacuate"
+                                 " all instances at once; specify explicitly"
+                                 " whether to evacuate primary or secondary"
+                                 " instances",
+                                 errors.ECODE_INVAL)
+
+    return inst_fn(self.cfg, self.op.node_name)
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_INSTANCE:
+      # Lock instances optimistically, needs verification once node and group
+      # locks have been acquired
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        set(i.name for i in self._DetermineInstances())
+
+    elif level == locking.LEVEL_NODEGROUP:
+      # Lock node groups for all potential target nodes optimistically, needs
+      # verification once nodes have been acquired
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
+
+    elif level == locking.LEVEL_NODE:
+      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
+
+  def CheckPrereq(self):
+    # Verify locks
+    owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+    owned_nodes = self.owned_locks(locking.LEVEL_NODE)
+    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+
+    need_nodes = self._DetermineNodes()
+
+    if not owned_nodes.issuperset(need_nodes):
+      raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
+                                 " locks were acquired, current nodes are"
+                                 " are '%s', used to be '%s'; retry the"
+                                 " operation" %
+                                 (self.op.node_name,
+                                  utils.CommaJoin(need_nodes),
+                                  utils.CommaJoin(owned_nodes)),
+                                 errors.ECODE_STATE)
+
+    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
+    if owned_groups != wanted_groups:
+      raise errors.OpExecError("Node groups changed since locks were acquired,"
+                               " current groups are '%s', used to be '%s';"
+                               " retry the operation" %
+                               (utils.CommaJoin(wanted_groups),
+                                utils.CommaJoin(owned_groups)))
+
+    # Determine affected instances
+    self.instances = self._DetermineInstances()
+    self.instance_names = [i.name for i in self.instances]
+
+    if set(self.instance_names) != owned_instances:
+      raise errors.OpExecError("Instances on node '%s' changed since locks"
+                               " were acquired, current instances are '%s',"
+                               " used to be '%s'; retry the operation" %
+                               (self.op.node_name,
+                                utils.CommaJoin(self.instance_names),
+                                utils.CommaJoin(owned_instances)))
+
+    if self.instance_names:
+      self.LogInfo("Evacuating instances from node '%s': %s",
+                   self.op.node_name,
+                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
+    else:
+      self.LogInfo("No instances to evacuate from node '%s'",
+                   self.op.node_name)
+
+    if self.op.remote_node is not None:
+      for i in self.instances:
+        if i.primary_node == self.op.remote_node:
+          raise errors.OpPrereqError("Node %s is the primary node of"
+                                     " instance %s, cannot use it as"
+                                     " secondary" %
+                                     (self.op.remote_node, i.name),
+                                     errors.ECODE_INVAL)
+
+  def Exec(self, feedback_fn):
+    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
+
+    if not self.instance_names:
+      # No instances to evacuate
+      jobs = []
+
+    elif self.op.iallocator is not None:
+      # TODO: Implement relocation to other group
+      evac_mode = self._MODE2IALLOCATOR[self.op.mode]
+      req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
+                                     instances=list(self.instance_names))
+      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+      ial.Run(self.op.iallocator)
+
+      if not ial.success:
+        raise errors.OpPrereqError("Can't compute node evacuation using"
+                                   " iallocator '%s': %s" %
+                                   (self.op.iallocator, ial.info),
+                                   errors.ECODE_NORES)
+
+      jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
+
+    elif self.op.remote_node is not None:
+      assert self.op.mode == constants.NODE_EVAC_SEC
+      jobs = [
+        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
+                                        remote_node=self.op.remote_node,
+                                        disks=[],
+                                        mode=constants.REPLACE_DISK_CHG,
+                                        early_release=self.op.early_release)]
+        for instance_name in self.instance_names]
+
+    else:
+      raise errors.ProgrammerError("No iallocator or remote node")
+
+    return ResultWithJobs(jobs)
+
+
+class LUNodeMigrate(LogicalUnit):
+  """Migrate all instances from a node.
+
+  """
+  HPATH = "node-migrate"
+  HTYPE = constants.HTYPE_NODE
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    pass
+
+  def ExpandNames(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+    self.share_locks = _ShareAll()
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    return {
+      "NODE_NAME": self.op.node_name,
+      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode()]
+    return (nl, nl)
+
+  def CheckPrereq(self):
+    pass
+
+  def Exec(self, feedback_fn):
+    # Prepare jobs for migration instances
+    allow_runtime_changes = self.op.allow_runtime_changes
+    jobs = [
+      [opcodes.OpInstanceMigrate(instance_name=inst.name,
+                                 mode=self.op.mode,
+                                 live=self.op.live,
+                                 iallocator=self.op.iallocator,
+                                 target_node=self.op.target_node,
+                                 allow_runtime_changes=allow_runtime_changes,
+                                 ignore_ipolicy=self.op.ignore_ipolicy)]
+      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
+
+    # TODO: Run iallocator in this opcode and pass correct placement options to
+    # OpInstanceMigrate. Since other jobs can modify the cluster between
+    # running the iallocator and the actual migration, a good consistency model
+    # will have to be found.
+
+    assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
+            frozenset([self.op.node_name]))
+
+    return ResultWithJobs(jobs)
+
+
+def _GetStorageTypeArgs(cfg, storage_type):
+  """Returns the arguments for a storage type.
+
+  """
+  # Special case for file storage
+  if storage_type == constants.ST_FILE:
+    # storage.FileStorage wants a list of storage directories
+    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
+
+  return []
+
+
+class LUNodeModifyStorage(NoHooksLU):
+  """Logical unit for modifying a storage volume on a node.
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+    storage_type = self.op.storage_type
+
+    try:
+      modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
+    except KeyError:
+      raise errors.OpPrereqError("Storage units of type '%s' can not be"
+                                 " modified" % storage_type,
+                                 errors.ECODE_INVAL)
+
+    diff = set(self.op.changes.keys()) - modifiable
+    if diff:
+      raise errors.OpPrereqError("The following fields can not be modified for"
+                                 " storage units of type '%s': %r" %
+                                 (storage_type, list(diff)),
+                                 errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: self.op.node_name,
+      }
+
+  def Exec(self, feedback_fn):
+    """Computes the list of nodes and their attributes.
+
+    """
+    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+    result = self.rpc.call_storage_modify(self.op.node_name,
+                                          self.op.storage_type, st_args,
+                                          self.op.name, self.op.changes)
+    result.Raise("Failed to modify storage unit '%s' on %s" %
+                 (self.op.name, self.op.node_name))
+
+
+class _NodeQuery(_QueryBase):
+  FIELDS = query.NODE_FIELDS
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+    lu.share_locks = _ShareAll()
+
+    if self.names:
+      self.wanted = _GetWantedNodes(lu, self.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = (self.use_locking and
+                       query.NQ_LIVE in self.requested_data)
+
+    if self.do_locking:
+      # If any non-static field is requested we need to lock the nodes
+      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
+      lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
+
+    """
+    all_info = lu.cfg.GetAllNodesInfo()
+
+    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
+
+    # Gather data as requested
+    if query.NQ_LIVE in self.requested_data:
+      # filter out non-vm_capable nodes
+      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
+
+      es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
+      node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
+                                        [lu.cfg.GetHypervisorType()], es_flags)
+      live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
+                       for (name, nresult) in node_data.items()
+                       if not nresult.fail_msg and nresult.payload)
+    else:
+      live_data = None
+
+    if query.NQ_INST in self.requested_data:
+      node_to_primary = dict([(name, set()) for name in nodenames])
+      node_to_secondary = dict([(name, set()) for name in nodenames])
+
+      inst_data = lu.cfg.GetAllInstancesInfo()
+
+      for inst in inst_data.values():
+        if inst.primary_node in node_to_primary:
+          node_to_primary[inst.primary_node].add(inst.name)
+        for secnode in inst.secondary_nodes:
+          if secnode in node_to_secondary:
+            node_to_secondary[secnode].add(inst.name)
+    else:
+      node_to_primary = None
+      node_to_secondary = None
+
+    if query.NQ_OOB in self.requested_data:
+      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
+                         for name, node in all_info.iteritems())
+    else:
+      oob_support = None
+
+    if query.NQ_GROUP in self.requested_data:
+      groups = lu.cfg.GetAllNodeGroupsInfo()
+    else:
+      groups = {}
+
+    return query.NodeQueryData([all_info[name] for name in nodenames],
+                               live_data, lu.cfg.GetMasterNode(),
+                               node_to_primary, node_to_secondary, groups,
+                               oob_support, lu.cfg.GetClusterInfo())
+
+
+class LUNodeQuery(NoHooksLU):
+  """Logical unit for querying nodes.
+
+  """
+  # pylint: disable=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                         self.op.output_fields, self.op.use_locking)
+
+  def ExpandNames(self):
+    self.nq.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.nq.DeclareLocks(self, level)
+
+  def Exec(self, feedback_fn):
+    return self.nq.OldStyleQuery(self)
+
+
+def _CheckOutputFields(static, dynamic, selected):
+  """Checks whether all selected fields are valid.
+
+  @type static: L{utils.FieldSet}
+  @param static: static fields set
+  @type dynamic: L{utils.FieldSet}
+  @param dynamic: dynamic fields set
+
+  """
+  f = utils.FieldSet()
+  f.Extend(static)
+  f.Extend(dynamic)
+
+  delta = f.NonMatching(selected)
+  if delta:
+    raise errors.OpPrereqError("Unknown output fields selected: %s"
+                               % ",".join(delta), errors.ECODE_INVAL)
+
+
+class LUNodeQueryvols(NoHooksLU):
+  """Logical unit for getting volumes on node(s).
+
+  """
+  REQ_BGL = False
+  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
+  _FIELDS_STATIC = utils.FieldSet("node")
+
+  def CheckArguments(self):
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=self._FIELDS_DYNAMIC,
+                       selected=self.op.output_fields)
+
+  def ExpandNames(self):
+    self.share_locks = _ShareAll()
+
+    if self.op.nodes:
+      self.needed_locks = {
+        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
+        }
+    else:
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+        }
+
+  def Exec(self, feedback_fn):
+    """Computes the list of nodes and their attributes.
+
+    """
+    nodenames = self.owned_locks(locking.LEVEL_NODE)
+    volumes = self.rpc.call_node_volumes(nodenames)
+
+    ilist = self.cfg.GetAllInstancesInfo()
+    vol2inst = _MapInstanceDisksToNodes(ilist.values())
+
+    output = []
+    for node in nodenames:
+      nresult = volumes[node]
+      if nresult.offline:
+        continue
+      msg = nresult.fail_msg
+      if msg:
+        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
+        continue
+
+      node_vols = sorted(nresult.payload,
+                         key=operator.itemgetter("dev"))
+
+      for vol in node_vols:
+        node_output = []
+        for field in self.op.output_fields:
+          if field == "node":
+            val = node
+          elif field == "phys":
+            val = vol["dev"]
+          elif field == "vg":
+            val = vol["vg"]
+          elif field == "name":
+            val = vol["name"]
+          elif field == "size":
+            val = int(float(vol["size"]))
+          elif field == "instance":
+            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
+          else:
+            raise errors.ParameterError(field)
+          node_output.append(str(val))
+
+        output.append(node_output)
+
+    return output
+
+
+class LUNodeQueryStorage(NoHooksLU):
+  """Logical unit for getting information on storage units on node(s).
+
+  """
+  _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    _CheckOutputFields(static=self._FIELDS_STATIC,
+                       dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
+                       selected=self.op.output_fields)
+
+  def ExpandNames(self):
+    self.share_locks = _ShareAll()
+
+    if self.op.nodes:
+      self.needed_locks = {
+        locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
+        }
+    else:
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
+        }
+
+  def Exec(self, feedback_fn):
+    """Computes the list of nodes and their attributes.
+
+    """
+    self.nodes = self.owned_locks(locking.LEVEL_NODE)
+
+    # Always get name to sort by
+    if constants.SF_NAME in self.op.output_fields:
+      fields = self.op.output_fields[:]
+    else:
+      fields = [constants.SF_NAME] + self.op.output_fields
+
+    # Never ask for node or type as it's only known to the LU
+    for extra in [constants.SF_NODE, constants.SF_TYPE]:
+      while extra in fields:
+        fields.remove(extra)
+
+    field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
+    name_idx = field_idx[constants.SF_NAME]
+
+    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+    data = self.rpc.call_storage_list(self.nodes,
+                                      self.op.storage_type, st_args,
+                                      self.op.name, fields)
+
+    result = []
+
+    for node in utils.NiceSort(self.nodes):
+      nresult = data[node]
+      if nresult.offline:
+        continue
+
+      msg = nresult.fail_msg
+      if msg:
+        self.LogWarning("Can't get storage data from node %s: %s", node, msg)
+        continue
+
+      rows = dict([(row[name_idx], row) for row in nresult.payload])
+
+      for name in utils.NiceSort(rows.keys()):
+        row = rows[name]
+
+        out = []
+
+        for field in self.op.output_fields:
+          if field == constants.SF_NODE:
+            val = node
+          elif field == constants.SF_TYPE:
+            val = self.op.storage_type
+          elif field in field_idx:
+            val = row[field_idx[field]]
+          else:
+            raise errors.ParameterError(field)
+
+          out.append(val)
+
+        result.append(out)
+
+    return result
+
+
+class LUNodeRemove(LogicalUnit):
+  """Logical unit for removing a node.
+
+  """
+  HPATH = "node-remove"
+  HTYPE = constants.HTYPE_NODE
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    return {
+      "OP_TARGET": self.op.node_name,
+      "NODE_NAME": self.op.node_name,
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    This doesn't run on the target node in the pre phase as a failed
+    node would then be impossible to remove.
+
+    """
+    all_nodes = self.cfg.GetNodeList()
+    try:
+      all_nodes.remove(self.op.node_name)
+    except ValueError:
+      pass
+    return (all_nodes, all_nodes)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks:
+     - the node exists in the configuration
+     - it does not have primary or secondary instances
+     - it's not the master
+
+    Any errors are signaled by raising errors.OpPrereqError.
+
+    """
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+    node = self.cfg.GetNodeInfo(self.op.node_name)
+    assert node is not None
+
+    masternode = self.cfg.GetMasterNode()
+    if node.name == masternode:
+      raise errors.OpPrereqError("Node is the master node, failover to another"
+                                 " node is required", errors.ECODE_INVAL)
+
+    for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
+      if node.name in instance.all_nodes:
+        raise errors.OpPrereqError("Instance %s is still running on the node,"
+                                   " please remove first" % instance_name,
+                                   errors.ECODE_INVAL)
+    self.op.node_name = node.name
+    self.node = node
+
+  def Exec(self, feedback_fn):
+    """Removes the node from the cluster.
+
+    """
+    node = self.node
+    logging.info("Stopping the node daemon and removing configs from node %s",
+                 node.name)
+
+    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+
+    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+      "Not owning BGL"
+
+    # Promote nodes to master candidate as needed
+    _AdjustCandidatePool(self, exceptions=[node.name])
+    self.context.RemoveNode(node.name)
+
+    # Run post hooks on the node before it's removed
+    _RunPostHook(self, node.name)
+
+    result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
+    msg = result.fail_msg
+    if msg:
+      self.LogWarning("Errors encountered on the remote node while leaving"
+                      " the cluster: %s", msg)
+
+    # Remove node from our /etc/hosts
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      master_node = self.cfg.GetMasterNode()
+      result = self.rpc.call_etc_hosts_modify(master_node,
+                                              constants.ETC_HOSTS_REMOVE,
+                                              node.name, None)
+      result.Raise("Can't update hosts file with new host data")
+      _RedistributeAncillaryFiles(self)
+
+
+class LURepairNodeStorage(NoHooksLU):
+  """Repairs the volume group on a node.
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+    storage_type = self.op.storage_type
+
+    if (constants.SO_FIX_CONSISTENCY not in
+        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
+      raise errors.OpPrereqError("Storage units of type '%s' can not be"
+                                 " repaired" % storage_type,
+                                 errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
+
+  def _CheckFaultyDisks(self, instance, node_name):
+    """Ensure faulty disks abort the opcode or at least warn."""
+    try:
+      if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
+                                  node_name, True):
+        raise errors.OpPrereqError("Instance '%s' has faulty disks on"
+                                   " node '%s'" % (instance.name, node_name),
+                                   errors.ECODE_STATE)
+    except errors.OpPrereqError, err:
+      if self.op.ignore_consistency:
+        self.LogWarning(str(err.args[0]))
+      else:
+        raise
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    # Check whether any instance on this node has faulty disks
+    for inst in _GetNodeInstances(self.cfg, self.op.node_name):
+      if inst.admin_state != constants.ADMINST_UP:
+        continue
+      check_nodes = set(inst.all_nodes)
+      check_nodes.discard(self.op.node_name)
+      for inst_node_name in check_nodes:
+        self._CheckFaultyDisks(inst, inst_node_name)
+
+  def Exec(self, feedback_fn):
+    feedback_fn("Repairing storage unit '%s' on %s ..." %
+                (self.op.name, self.op.node_name))
+
+    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+    result = self.rpc.call_storage_execute(self.op.node_name,
+                                           self.op.storage_type, st_args,
+                                           self.op.name,
+                                           constants.SO_FIX_CONSISTENCY)
+    result.Raise("Failed to repair storage unit '%s' on %s" %
+                 (self.op.name, self.op.node_name))
index e42bd19..2b24ef1 100755 (executable)
@@ -109,7 +109,7 @@ class TestIAllocatorChecks(testutils.GanetiTestCase):
     op = OpTest()
     lu = TestLU(op)
 
-    c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node")
+    c_i = lambda: common._CheckIAllocatorOrNode(lu, "iallocator", "node")
 
     # Neither node nor iallocator given
     for n in (None, []):