+
+
+class LURemoveExport(NoHooksLU):
+ """Remove exports related to the named instance.
+
+ """
+ _OP_REQP = ["instance_name"]
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+ """
+ pass
+
+ def Exec(self, feedback_fn):
+ """Remove any export.
+
+ """
+ instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+ # If the instance was not found we'll try with the name that was passed in.
+ # This will only work if it was an FQDN, though.
+ fqdn_warn = False
+ if not instance_name:
+ fqdn_warn = True
+ instance_name = self.op.instance_name
+
+ op = opcodes.OpQueryExports(nodes=[])
+ exportlist = self.proc.ChainOpCode(op)
+ found = False
+ for node in exportlist:
+ if instance_name in exportlist[node]:
+ found = True
+ if not rpc.call_export_remove(node, instance_name):
+ logger.Error("could not remove export for instance %s"
+ " on node %s" % (instance_name, node))
+
+ if fqdn_warn and not found:
+ feedback_fn("Export not found. If trying to remove an export belonging"
+ " to a deleted instance please use its Fully Qualified"
+ " Domain Name.")
+
+
+class TagsLU(NoHooksLU):
+ """Generic tags LU.
+
+ This is an abstract class which is the parent of all the other tags LUs.
+
+ """
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ if self.op.kind == constants.TAG_CLUSTER:
+ self.target = self.cfg.GetClusterInfo()
+ elif self.op.kind == constants.TAG_NODE:
+ name = self.cfg.ExpandNodeName(self.op.name)
+ if name is None:
+ raise errors.OpPrereqError("Invalid node name (%s)" %
+ (self.op.name,))
+ self.op.name = name
+ self.target = self.cfg.GetNodeInfo(name)
+ elif self.op.kind == constants.TAG_INSTANCE:
+ name = self.cfg.ExpandInstanceName(self.op.name)
+ if name is None:
+ raise errors.OpPrereqError("Invalid instance name (%s)" %
+ (self.op.name,))
+ self.op.name = name
+ self.target = self.cfg.GetInstanceInfo(name)
+ else:
+ raise errors.OpPrereqError("Wrong tag type requested (%s)" %
+ str(self.op.kind))
+
+
+class LUGetTags(TagsLU):
+ """Returns the tags of a given object.
+
+ """
+ _OP_REQP = ["kind", "name"]
+
+ def Exec(self, feedback_fn):
+ """Returns the tag list.
+
+ """
+ return self.target.GetTags()
+
+
+class LUSearchTags(NoHooksLU):
+ """Searches the tags for a given pattern.
+
+ """
+ _OP_REQP = ["pattern"]
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks the pattern passed for validity by compiling it.
+
+ """
+ try:
+ self.re = re.compile(self.op.pattern)
+ except re.error, err:
+ raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
+ (self.op.pattern, err))
+
+ def Exec(self, feedback_fn):
+ """Returns the tag list.
+
+ """
+ cfg = self.cfg
+ tgts = [("/cluster", cfg.GetClusterInfo())]
+ ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
+ tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
+ nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
+ tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
+ results = []
+ for path, target in tgts:
+ for tag in target.GetTags():
+ if self.re.search(tag):
+ results.append((path, tag))
+ return results
+
+
+class LUAddTags(TagsLU):
+ """Sets a tag on a given object.
+
+ """
+ _OP_REQP = ["kind", "name", "tags"]
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks the type and length of the tag name and value.
+
+ """
+ TagsLU.CheckPrereq(self)
+ for tag in self.op.tags:
+ objects.TaggableObject.ValidateTag(tag)
+
+ def Exec(self, feedback_fn):
+ """Sets the tag.
+
+ """
+ try:
+ for tag in self.op.tags:
+ self.target.AddTag(tag)
+ except errors.TagError, err:
+ raise errors.OpExecError("Error while setting tag: %s" % str(err))
+ try:
+ self.cfg.Update(self.target)
+ except errors.ConfigurationError:
+ raise errors.OpRetryError("There has been a modification to the"
+ " config file and the operation has been"
+ " aborted. Please retry.")
+
+
+class LUDelTags(TagsLU):
+ """Delete a list of tags from a given object.
+
+ """
+ _OP_REQP = ["kind", "name", "tags"]
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that we have the given tag.
+
+ """
+ TagsLU.CheckPrereq(self)
+ for tag in self.op.tags:
+ objects.TaggableObject.ValidateTag(tag)
+ del_tags = frozenset(self.op.tags)
+ cur_tags = self.target.GetTags()
+ if not del_tags <= cur_tags:
+ diff_tags = del_tags - cur_tags
+ diff_names = ["'%s'" % tag for tag in diff_tags]
+ diff_names.sort()
+ raise errors.OpPrereqError("Tag(s) %s not found" %
+ (",".join(diff_names)))
+
+ def Exec(self, feedback_fn):
+ """Remove the tag from the object.
+
+ """
+ for tag in self.op.tags:
+ self.target.RemoveTag(tag)
+ try:
+ self.cfg.Update(self.target)
+ except errors.ConfigurationError:
+ raise errors.OpRetryError("There has been a modification to the"
+ " config file and the operation has been"
+ " aborted. Please retry.")
+
+class LUTestDelay(NoHooksLU):
+ """Sleep for a specified amount of time.
+
+ This LU sleeps on the master and/or nodes for a specified amount of
+ time.
+
+ """
+ _OP_REQP = ["duration", "on_master", "on_nodes"]
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ """Expand names and set required locks.
+
+ This expands the node list, if any.
+
+ """
+ self.needed_locks = {}
+ if self.op.on_nodes:
+ # _GetWantedNodes can be used here, but is not always appropriate to use
+ # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
+ # more information.
+ self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
+ self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+
+ def Exec(self, feedback_fn):
+ """Do the actual sleep.
+
+ """
+ if self.op.on_master:
+ if not utils.TestDelay(self.op.duration):
+ raise errors.OpExecError("Error during master delay test")
+ if self.op.on_nodes:
+ result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
+ if not result:
+ raise errors.OpExecError("Complete failure from rpc call")
+ for node, node_result in result.items():
+ if not node_result:
+ raise errors.OpExecError("Failure during rpc call to node %s,"
+ " result: %s" % (node, node_result))
+
+
+class IAllocator(object):
+ """IAllocator framework.
+
+ An IAllocator instance has three sets of attributes:
+ - cfg/sstore that are needed to query the cluster
+ - input data (all members of the _KEYS class attribute are required)
+ - four buffer attributes (in|out_data|text), that represent the
+ input (to the external script) in text and data structure format,
+ and the output from it, again in two formats
+ - the result variables from the script (success, info, nodes) for
+ easy usage
+
+ """
+ _ALLO_KEYS = [
+ "mem_size", "disks", "disk_template",
+ "os", "tags", "nics", "vcpus",
+ ]
+ _RELO_KEYS = [
+ "relocate_from",
+ ]
+
+ def __init__(self, cfg, sstore, mode, name, **kwargs):
+ self.cfg = cfg
+ self.sstore = sstore
+ # init buffer variables
+ self.in_text = self.out_text = self.in_data = self.out_data = None
+ # init all input fields so that pylint is happy
+ self.mode = mode
+ self.name = name
+ self.mem_size = self.disks = self.disk_template = None
+ self.os = self.tags = self.nics = self.vcpus = None
+ self.relocate_from = None
+ # computed fields
+ self.required_nodes = None
+ # init result fields
+ self.success = self.info = self.nodes = None
+ if self.mode == constants.IALLOCATOR_MODE_ALLOC:
+ keyset = self._ALLO_KEYS
+ elif self.mode == constants.IALLOCATOR_MODE_RELOC:
+ keyset = self._RELO_KEYS
+ else:
+ raise errors.ProgrammerError("Unknown mode '%s' passed to the"
+ " IAllocator" % self.mode)
+ for key in kwargs:
+ if key not in keyset:
+ raise errors.ProgrammerError("Invalid input parameter '%s' to"
+ " IAllocator" % key)
+ setattr(self, key, kwargs[key])
+ for key in keyset:
+ if key not in kwargs:
+ raise errors.ProgrammerError("Missing input parameter '%s' to"
+ " IAllocator" % key)
+ self._BuildInputData()
+
+ def _ComputeClusterData(self):
+ """Compute the generic allocator input data.
+
+ This is the data that is independent of the actual operation.
+
+ """
+ cfg = self.cfg
+ # cluster data
+ data = {
+ "version": 1,
+ "cluster_name": self.sstore.GetClusterName(),
+ "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
+ "hypervisor_type": self.sstore.GetHypervisorType(),
+ # we don't have job IDs
+ }
+
+ i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
+
+ # node data
+ node_results = {}
+ node_list = cfg.GetNodeList()
+ node_data = rpc.call_node_info(node_list, cfg.GetVGName())
+ for nname in node_list:
+ ninfo = cfg.GetNodeInfo(nname)
+ if nname not in node_data or not isinstance(node_data[nname], dict):
+ raise errors.OpExecError("Can't get data for node %s" % nname)
+ remote_info = node_data[nname]
+ for attr in ['memory_total', 'memory_free', 'memory_dom0',
+ 'vg_size', 'vg_free', 'cpu_total']:
+ if attr not in remote_info:
+ raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
+ (nname, attr))
+ try:
+ remote_info[attr] = int(remote_info[attr])
+ except ValueError, err:
+ raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
+ " %s" % (nname, attr, str(err)))
+ # compute memory used by primary instances
+ i_p_mem = i_p_up_mem = 0
+ for iinfo in i_list:
+ if iinfo.primary_node == nname:
+ i_p_mem += iinfo.memory
+ if iinfo.status == "up":
+ i_p_up_mem += iinfo.memory
+
+ # compute memory used by instances
+ pnr = {
+ "tags": list(ninfo.GetTags()),
+ "total_memory": remote_info['memory_total'],
+ "reserved_memory": remote_info['memory_dom0'],
+ "free_memory": remote_info['memory_free'],
+ "i_pri_memory": i_p_mem,
+ "i_pri_up_memory": i_p_up_mem,
+ "total_disk": remote_info['vg_size'],
+ "free_disk": remote_info['vg_free'],
+ "primary_ip": ninfo.primary_ip,
+ "secondary_ip": ninfo.secondary_ip,
+ "total_cpus": remote_info['cpu_total'],
+ }
+ node_results[nname] = pnr
+ data["nodes"] = node_results
+
+ # instance data
+ instance_data = {}
+ for iinfo in i_list:
+ nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
+ for n in iinfo.nics]
+ pir = {
+ "tags": list(iinfo.GetTags()),
+ "should_run": iinfo.status == "up",
+ "vcpus": iinfo.vcpus,
+ "memory": iinfo.memory,
+ "os": iinfo.os,
+ "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
+ "nics": nic_data,
+ "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
+ "disk_template": iinfo.disk_template,
+ }
+ instance_data[iinfo.name] = pir
+
+ data["instances"] = instance_data
+
+ self.in_data = data
+
+ def _AddNewInstance(self):
+ """Add new instance data to allocator structure.
+
+ This in combination with _AllocatorGetClusterData will create the
+ correct structure needed as input for the allocator.
+
+ The checks for the completeness of the opcode must have already been
+ done.
+
+ """
+ data = self.in_data
+ if len(self.disks) != 2:
+ raise errors.OpExecError("Only two-disk configurations supported")
+
+ disk_space = _ComputeDiskSize(self.disk_template,
+ self.disks[0]["size"], self.disks[1]["size"])
+
+ if self.disk_template in constants.DTS_NET_MIRROR:
+ self.required_nodes = 2
+ else:
+ self.required_nodes = 1
+ request = {
+ "type": "allocate",
+ "name": self.name,
+ "disk_template": self.disk_template,
+ "tags": self.tags,
+ "os": self.os,
+ "vcpus": self.vcpus,
+ "memory": self.mem_size,
+ "disks": self.disks,
+ "disk_space_total": disk_space,
+ "nics": self.nics,
+ "required_nodes": self.required_nodes,
+ }
+ data["request"] = request
+
+ def _AddRelocateInstance(self):
+ """Add relocate instance data to allocator structure.
+
+ This in combination with _IAllocatorGetClusterData will create the
+ correct structure needed as input for the allocator.
+
+ The checks for the completeness of the opcode must have already been
+ done.
+
+ """
+ instance = self.cfg.GetInstanceInfo(self.name)
+ if instance is None:
+ raise errors.ProgrammerError("Unknown instance '%s' passed to"
+ " IAllocator" % self.name)
+
+ if instance.disk_template not in constants.DTS_NET_MIRROR:
+ raise errors.OpPrereqError("Can't relocate non-mirrored instances")
+
+ if len(instance.secondary_nodes) != 1:
+ raise errors.OpPrereqError("Instance has not exactly one secondary node")
+
+ self.required_nodes = 1
+
+ disk_space = _ComputeDiskSize(instance.disk_template,
+ instance.disks[0].size,
+ instance.disks[1].size)
+
+ request = {
+ "type": "relocate",
+ "name": self.name,
+ "disk_space_total": disk_space,
+ "required_nodes": self.required_nodes,
+ "relocate_from": self.relocate_from,
+ }
+ self.in_data["request"] = request
+
+ def _BuildInputData(self):
+ """Build input data structures.
+
+ """
+ self._ComputeClusterData()
+
+ if self.mode == constants.IALLOCATOR_MODE_ALLOC:
+ self._AddNewInstance()
+ else:
+ self._AddRelocateInstance()
+
+ self.in_text = serializer.Dump(self.in_data)
+
+ def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
+ """Run an instance allocator and return the results.
+
+ """
+ data = self.in_text
+
+ result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
+
+ if not isinstance(result, tuple) or len(result) != 4:
+ raise errors.OpExecError("Invalid result from master iallocator runner")
+
+ rcode, stdout, stderr, fail = result
+
+ if rcode == constants.IARUN_NOTFOUND:
+ raise errors.OpExecError("Can't find allocator '%s'" % name)
+ elif rcode == constants.IARUN_FAILURE:
+ raise errors.OpExecError("Instance allocator call failed: %s,"
+ " output: %s" %
+ (fail, stdout+stderr))
+ self.out_text = stdout
+ if validate:
+ self._ValidateResult()
+
+ def _ValidateResult(self):
+ """Process the allocator results.
+
+ This will process and if successful save the result in
+ self.out_data and the other parameters.
+
+ """
+ try:
+ rdict = serializer.Load(self.out_text)
+ except Exception, err:
+ raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
+
+ if not isinstance(rdict, dict):
+ raise errors.OpExecError("Can't parse iallocator results: not a dict")
+
+ for key in "success", "info", "nodes":
+ if key not in rdict:
+ raise errors.OpExecError("Can't parse iallocator results:"
+ " missing key '%s'" % key)
+ setattr(self, key, rdict[key])
+
+ if not isinstance(rdict["nodes"], list):
+ raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
+ " is not a list")
+ self.out_data = rdict
+
+
+class LUTestAllocator(NoHooksLU):
+ """Run allocator tests.
+
+ This LU runs the allocator tests
+
+ """
+ _OP_REQP = ["direction", "mode", "name"]
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks the opcode parameters depending on the director and mode test.
+
+ """
+ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
+ for attr in ["name", "mem_size", "disks", "disk_template",
+ "os", "tags", "nics", "vcpus"]:
+ if not hasattr(self.op, attr):
+ raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
+ attr)
+ iname = self.cfg.ExpandInstanceName(self.op.name)
+ if iname is not None:
+ raise errors.OpPrereqError("Instance '%s' already in the cluster" %
+ iname)
+ if not isinstance(self.op.nics, list):
+ raise errors.OpPrereqError("Invalid parameter 'nics'")
+ for row in self.op.nics:
+ if (not isinstance(row, dict) or
+ "mac" not in row or
+ "ip" not in row or
+ "bridge" not in row):
+ raise errors.OpPrereqError("Invalid contents of the"
+ " 'nics' parameter")
+ if not isinstance(self.op.disks, list):
+ raise errors.OpPrereqError("Invalid parameter 'disks'")
+ if len(self.op.disks) != 2:
+ raise errors.OpPrereqError("Only two-disk configurations supported")
+ for row in self.op.disks:
+ if (not isinstance(row, dict) or
+ "size" not in row or
+ not isinstance(row["size"], int) or
+ "mode" not in row or
+ row["mode"] not in ['r', 'w']):
+ raise errors.OpPrereqError("Invalid contents of the"
+ " 'disks' parameter")
+ elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
+ if not hasattr(self.op, "name"):
+ raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
+ fname = self.cfg.ExpandInstanceName(self.op.name)
+ if fname is None:
+ raise errors.OpPrereqError("Instance '%s' not found for relocation" %
+ self.op.name)
+ self.op.name = fname
+ self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
+ else:
+ raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
+ self.op.mode)
+
+ if self.op.direction == constants.IALLOCATOR_DIR_OUT:
+ if not hasattr(self.op, "allocator") or self.op.allocator is None:
+ raise errors.OpPrereqError("Missing allocator name")
+ elif self.op.direction != constants.IALLOCATOR_DIR_IN:
+ raise errors.OpPrereqError("Wrong allocator test '%s'" %
+ self.op.direction)
+
+ def Exec(self, feedback_fn):
+ """Run the allocator test.
+
+ """
+ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
+ ial = IAllocator(self.cfg, self.sstore,
+ mode=self.op.mode,
+ name=self.op.name,
+ mem_size=self.op.mem_size,
+ disks=self.op.disks,
+ disk_template=self.op.disk_template,
+ os=self.op.os,
+ tags=self.op.tags,
+ nics=self.op.nics,
+ vcpus=self.op.vcpus,
+ )
+ else:
+ ial = IAllocator(self.cfg, self.sstore,
+ mode=self.op.mode,
+ name=self.op.name,
+ relocate_from=list(self.relocate_from),
+ )
+
+ if self.op.direction == constants.IALLOCATOR_DIR_IN:
+ result = ial.in_text
+ else:
+ ial.Run(self.op.allocator, validate=False)
+ result = ial.out_text
+ return result