- memory_dom0 is the memory allocated for domain0 in MiB
- memory_free is the currently available (free) ram in MiB
- memory_total is the total number of ram in MiB
+ - hv_version: the hypervisor version, if available
"""
outputarray = {}
_Fail("Failed to accept instance: %s", err, exc=True)
-def FinalizeMigration(instance, info, success):
+def FinalizeMigrationDst(instance, info, success):
"""Finalize any preparation to accept an instance.
@type instance: L{objects.Instance}
"""
hyper = hypervisor.GetHypervisor(instance.hypervisor)
try:
- hyper.FinalizeMigration(instance, info, success)
+ hyper.FinalizeMigrationDst(instance, info, success)
except errors.HypervisorError, err:
- _Fail("Failed to finalize migration: %s", err, exc=True)
+ _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
def MigrateInstance(instance, target, live):
@type live: boolean
@param live: whether the migration should be done live or not (the
interpretation of this parameter is left to the hypervisor)
- @rtype: tuple
- @return: a tuple of (success, msg) where:
- - succes is a boolean denoting the success/failure of the operation
- - msg is a string with details in case of failure
+ @raise RPCFail: if migration fails for some reason
"""
hyper = hypervisor.GetHypervisor(instance.hypervisor)
_Fail("Failed to migrate instance: %s", err, exc=True)
+def FinalizeMigrationSource(instance, success, live):
+ """Finalize the instance migration on the source node.
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance definition of the migrated instance
+ @type success: bool
+ @param success: whether the migration succeeded or not
+ @type live: bool
+ @param live: whether the user requested a live migration or not
+ @raise RPCFail: If the execution fails for some reason
+
+ """
+ hyper = hypervisor.GetHypervisor(instance.hypervisor)
+
+ try:
+ hyper.FinalizeMigrationSource(instance, success, live)
+ except Exception, err: # pylint: disable=W0703
+ _Fail("Failed to finalize the migration on the source node: %s", err,
+ exc=True)
+
+
+def GetMigrationStatus(instance):
+ """Get the migration status
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that is being migrated
+ @rtype: L{objects.MigrationStatus}
+ @return: the status of the current migration (one of
+ L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+ progress info that can be retrieved from the hypervisor
+ @raise RPCFail: If the migration status cannot be retrieved
+
+ """
+ hyper = hypervisor.GetHypervisor(instance.hypervisor)
+ try:
+ return hyper.GetMigrationStatus(instance)
+ except Exception, err: # pylint: disable=W0703
+ _Fail("Failed to get migration status: %s", err, exc=True)
+
+
def BlockdevCreate(disk, size, owner, on_primary, info):
"""Creates a block device for an instance.
# W0201 since most LU attributes are defined in CheckPrereq or similar
# functions
-# C0302: since we have waaaay to many lines in this module
+# C0302: since we have waaaay too many lines in this module
import os
import os.path
self._ErrorIf(test, self.ENODEHOOKS, node_name,
"Communication failure in hooks execution: %s", msg)
if res.offline or msg:
- # No need to investigate payload if node is offline or gave an error.
- # override manually lu_result here as _ErrorIf only
- # overrides self.bad
- lu_result = 1
+ # No need to investigate payload if node is offline or gave
+ # an error.
continue
for script, hkr, output in res.payload:
test = hkr == constants.HKR_FAIL
if test:
output = self._HOOKS_INDENT_RE.sub(" ", output)
feedback_fn("%s" % output)
- lu_result = 0
+ lu_result = False
return lu_result
if not redist:
files_all.update(constants.ALL_CERT_FILES)
files_all.update(ssconf.SimpleStore().GetFileList())
+ else:
+ # we need to ship at least the RAPI certificate
+ files_all.add(constants.RAPI_CERT_FILE)
if cluster.modify_etc_hosts:
files_all.add(constants.ETC_HOSTS)
errors.ECODE_NORES)
+def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
+ """Checks if nodes have enough physical CPUs
+
+ This function checks if all given nodes have the needed number of
+ physical CPUs. In case any node has less CPUs or we cannot get the
+ information from the node, this function raises an OpPrereqError
+ exception.
+
+ @type lu: C{LogicalUnit}
+ @param lu: a logical unit from which we get configuration data
+ @type nodenames: C{list}
+ @param nodenames: the list of node names to check
+ @type requested: C{int}
+ @param requested: the minimum acceptable number of physical CPUs
+ @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
+ or we cannot check the node
+
+ """
+ nodeinfo = lu.rpc.call_node_info(nodenames, None, hypervisor_name)
+ for node in nodenames:
+ info = nodeinfo[node]
+ info.Raise("Cannot get current information from node %s" % node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ num_cpus = info.payload.get("cpu_total", None)
+ if not isinstance(num_cpus, int):
+ raise errors.OpPrereqError("Can't compute the number of physical CPUs"
+ " on node %s, result was '%s'" %
+ (node, num_cpus), errors.ECODE_ENVIRON)
+ if requested > num_cpus:
+ raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
+ "required" % (node, num_cpus, requested),
+ errors.ECODE_NORES)
+
+
class LUInstanceStartup(LogicalUnit):
"""Starts an instance.
@ivar shutdown_timeout: In case of failover timeout of the shutdown
"""
+
+ # Constants
+ _MIGRATION_POLL_INTERVAL = 1 # seconds
+ _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
+
def __init__(self, lu, instance_name, cleanup=False,
failover=False, fallback=False,
ignore_consistency=False,
"""
instance = self.instance
target_node = self.target_node
+ source_node = self.source_node
migration_info = self.migration_info
- abort_result = self.rpc.call_finalize_migration(target_node,
- instance,
- migration_info,
- False)
+ abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
+ instance,
+ migration_info,
+ False)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on target node %s: %s",
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
+ abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
+ instance, False, self.live)
+ abort_msg = abort_result.fail_msg
+ if abort_msg:
+ logging.error("Aborting migration failed on source node %s: %s",
+ source_node, abort_msg)
+
def _ExecMigration(self):
"""Migrate an instance.
target_node = self.target_node
source_node = self.source_node
+ # Check for hypervisor version mismatch and warn the user.
+ nodeinfo = self.rpc.call_node_info([source_node, target_node],
+ None, self.instance.hypervisor)
+ src_info = nodeinfo[source_node]
+ dst_info = nodeinfo[target_node]
+
+ if ((constants.HV_NODEINFO_KEY_VERSION in src_info.payload) and
+ (constants.HV_NODEINFO_KEY_VERSION in dst_info.payload)):
+ src_version = src_info.payload[constants.HV_NODEINFO_KEY_VERSION]
+ dst_version = dst_info.payload[constants.HV_NODEINFO_KEY_VERSION]
+ if src_version != dst_version:
+ self.feedback_fn("* warning: hypervisor version mismatch between"
+ " source (%s) and target (%s) node" %
+ (src_version, dst_version))
+
self.feedback_fn("* checking disk consistency between source and target")
for dev in instance.disks:
if not _CheckDiskConsistency(self.lu, dev, target_node, False):
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
+ self.feedback_fn("* starting memory transfer")
+ last_feedback = time.time()
+ while True:
+ result = self.rpc.call_instance_get_migration_status(source_node,
+ instance)
+ msg = result.fail_msg
+ ms = result.payload # MigrationStatus instance
+ if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
+ logging.error("Instance migration failed, trying to revert"
+ " disk status: %s", msg)
+ self.feedback_fn("Migration failed, aborting")
+ self._AbortMigration()
+ self._RevertDiskStatus()
+ raise errors.OpExecError("Could not migrate instance %s: %s" %
+ (instance.name, msg))
+
+ if result.payload.status != constants.HV_MIGRATION_ACTIVE:
+ self.feedback_fn("* memory transfer complete")
+ break
+
+ if (utils.TimeoutExpired(last_feedback,
+ self._MIGRATION_FEEDBACK_INTERVAL) and
+ ms.transferred_ram is not None):
+ mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
+ self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
+ last_feedback = time.time()
+
+ time.sleep(self._MIGRATION_POLL_INTERVAL)
+
+ result = self.rpc.call_instance_finalize_migration_src(source_node,
+ instance,
+ True,
+ self.live)
+ msg = result.fail_msg
+ if msg:
+ logging.error("Instance migration succeeded, but finalization failed"
+ " on the source node: %s", msg)
+ raise errors.OpExecError("Could not finalize instance migration: %s" %
+ msg)
+
instance.primary_node = target_node
+
# distribute new instance config to the other nodes
self.cfg.Update(instance, self.feedback_fn)
- result = self.rpc.call_finalize_migration(target_node,
- instance,
- migration_info,
- True)
+ result = self.rpc.call_instance_finalize_migration_dst(target_node,
+ instance,
+ migration_info,
+ True)
msg = result.fail_msg
if msg:
- logging.error("Instance migration succeeded, but finalization failed:"
- " %s", msg)
+ logging.error("Instance migration succeeded, but finalization failed"
+ " on the target node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
if einfo.has_option(constants.INISECT_INS, "disk_template"):
self.op.disk_template = einfo.get(constants.INISECT_INS,
"disk_template")
+ if self.op.disk_template not in constants.DISK_TEMPLATES:
+ raise errors.OpPrereqError("Disk template specified in configuration"
+ " file is not one of the allowed values:"
+ " %s" % " ".join(constants.DISK_TEMPLATES))
else:
raise errors.OpPrereqError("No disk template specified and the export"
" is missing the disk_template information",
errors.ECODE_INVAL)
if not self.op.disks:
- if einfo.has_option(constants.INISECT_INS, "disk_count"):
- disks = []
- # TODO: import the disk iv_name too
- for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+ disks = []
+ # TODO: import the disk iv_name too
+ for idx in range(constants.MAX_DISKS):
+ if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
disks.append({constants.IDISK_SIZE: disk_sz})
- self.op.disks = disks
- else:
+ self.op.disks = disks
+ if not disks and self.op.disk_template != constants.DT_DISKLESS:
raise errors.OpPrereqError("No disk info specified and the export"
" is missing the disk information",
errors.ECODE_INVAL)
- if (not self.op.nics and
- einfo.has_option(constants.INISECT_INS, "nic_count")):
+ if not self.op.nics:
nics = []
- for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
- ndict = {}
- for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
- v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
- ndict[name] = v
- nics.append(ndict)
+ for idx in range(constants.MAX_NICS):
+ if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
+ ndict = {}
+ for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+ v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+ ndict[name] = v
+ nics.append(ndict)
+ else:
+ break
self.op.nics = nics
if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
raise errors.OpPrereqError("Cluster does not support lvm-based"
" instances", errors.ECODE_STATE)
- if self.op.hypervisor is None:
+ if (self.op.hypervisor is None or
+ self.op.hypervisor == constants.VALUE_AUTO):
self.op.hypervisor = self.cfg.GetHypervisorType()
cluster = self.cfg.GetClusterInfo()
_CheckGlobalHvParams(self.op.hvparams)
# fill and remember the beparams dict
+ default_beparams = cluster.beparams[constants.PP_DEFAULT]
+ for param, value in self.op.beparams.iteritems():
+ if value == constants.VALUE_AUTO:
+ self.op.beparams[param] = default_beparams[param]
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.be_full = cluster.SimpleFillBE(self.op.beparams)
for idx, nic in enumerate(self.op.nics):
nic_mode_req = nic.get(constants.INIC_MODE, None)
nic_mode = nic_mode_req
- if nic_mode is None:
+ if nic_mode is None or nic_mode == constants.VALUE_AUTO:
nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
# in routed mode, for the first nic, the default ip is 'auto'
# Build nic parameters
link = nic.get(constants.INIC_LINK, None)
+ if link == constants.VALUE_AUTO:
+ link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK]
nicparams = {}
if nic_mode_req:
- nicparams[constants.NIC_MODE] = nic_mode_req
+ nicparams[constants.NIC_MODE] = nic_mode
if link:
nicparams[constants.NIC_LINK] = link
self.disks.append(new_disk)
if self.op.mode == constants.INSTANCE_IMPORT:
-
- # Check that the new instance doesn't have less disks than the export
- instance_disks = len(self.disks)
- export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
- if instance_disks < export_disks:
- raise errors.OpPrereqError("Not enough disks to import."
- " (instance: %d, export: %d)" %
- (instance_disks, export_disks),
- errors.ECODE_INVAL)
-
disk_images = []
- for idx in range(export_disks):
+ for idx in range(len(self.disks)):
option = "disk%d_dump" % idx
if export_info.has_option(constants.INISECT_INS, option):
# FIXME: are the old os-es, disk sizes, etc. useful?
self.src_images = disk_images
old_name = export_info.get(constants.INISECT_INS, "name")
- try:
- exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count")
- except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid export file, nic_count is not"
- " an integer: %s" % str(err),
- errors.ECODE_STATE)
if self.op.instance_name == old_name:
for idx, nic in enumerate(self.nics):
- if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
+ if nic.mac == constants.VALUE_AUTO:
nic_mac_ini = "nic%d_mac" % idx
nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
# local check
hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new)
_CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
- self.hv_new = hv_new # the new actual values
+ self.hv_proposed = self.hv_new = hv_new # the new actual values
self.hv_inst = i_hvdict # the new dict (without defaults)
else:
+ self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
+ instance.hvparams)
self.hv_new = self.hv_inst = {}
# beparams processing
use_none=True)
utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
be_new = cluster.SimpleFillBE(i_bedict)
- self.be_new = be_new # the new actual values
+ self.be_proposed = self.be_new = be_new # the new actual values
self.be_inst = i_bedict # the new dict (without defaults)
else:
self.be_new = self.be_inst = {}
+ self.be_proposed = cluster.SimpleFillBE(instance.beparams)
be_old = cluster.FillBE(instance)
+ # CPU param validation -- checking every time a paramtere is
+ # changed to cover all cases where either CPU mask or vcpus have
+ # changed
+ if (constants.BE_VCPUS in self.be_proposed and
+ constants.HV_CPU_MASK in self.hv_proposed):
+ cpu_list = \
+ utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
+ # Verify mask is consistent with number of vCPUs. Can skip this
+ # test if only 1 entry in the CPU mask, which means same mask
+ # is applied to all vCPUs.
+ if (len(cpu_list) > 1 and
+ len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
+ raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
+ " CPU mask [%s]" %
+ (self.be_proposed[constants.BE_VCPUS],
+ self.hv_proposed[constants.HV_CPU_MASK]),
+ errors.ECODE_INVAL)
+
+ # Only perform this test if a new CPU mask is given
+ if constants.HV_CPU_MASK in self.hv_new:
+ # Calculate the largest CPU number requested
+ max_requested_cpu = max(map(max, cpu_list))
+ # Check that all of the instance's nodes have enough physical CPUs to
+ # satisfy the requested CPU mask
+ _CheckNodesPhysicalCPUs(self, instance.all_nodes,
+ max_requested_cpu + 1, instance.hypervisor)
+
# osparams processing
if self.op.osparams:
i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
NODED_USER = _autoconf.NODED_USER
NODED_GROUP = _autoconf.NODED_GROUP
+# cpu pinning separators and constants
+CPU_PINNING_SEP = ":"
+CPU_PINNING_ALL = "all"
+# internal representation of "all"
+CPU_PINNING_ALL_VAL = -1
+# one "all" entry in a CPU list means CPU pinning is off
+CPU_PINNING_OFF = [CPU_PINNING_ALL_VAL]
+
+# A Xen-specific implementation detail - there is no way to actually say
+# "use any cpu for pinning" in a Xen configuration file, as opposed to the
+# command line, where you can say "xm vcpu-pin <domain> <vcpu> all".
+# The workaround used in Xen is "0-63" (see source code function
+# xm_vcpu_pin in <xen-source>/tools/python/xen/xm/main.py).
+# To support future changes, the following constant is treated as a
+# blackbox string that simply means use-any-cpu-for-pinning-under-xen.
+CPU_PINNING_ALL_XEN = "0-63"
+
+# A KVM-specific implementation detail - the following value is used
+# to set CPU affinity to all processors (#0 through #31), per taskset
+# man page.
+CPU_PINNING_ALL_KVM = 0xFFFFFFFF
# Wipe
DD_CMD = "dd"
HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
+# Migration statuses
+HV_MIGRATION_COMPLETED = "completed"
+HV_MIGRATION_ACTIVE = "active"
+HV_MIGRATION_FAILED = "failed"
+HV_MIGRATION_CANCELLED = "cancelled"
+
+HV_MIGRATION_VALID_STATUSES = frozenset([
+ HV_MIGRATION_COMPLETED,
+ HV_MIGRATION_ACTIVE,
+ HV_MIGRATION_FAILED,
+ HV_MIGRATION_CANCELLED,
+ ])
+
+HV_MIGRATION_FAILED_STATUSES = frozenset([
+ HV_MIGRATION_FAILED,
+ HV_MIGRATION_CANCELLED,
+ ])
+
+# KVM-specific statuses
+HV_KVM_MIGRATION_VALID_STATUSES = HV_MIGRATION_VALID_STATUSES
+
+ # Node info keys
+ HV_NODEINFO_KEY_VERSION = "hv_version"
+
# Backend parameter names
BE_MEMORY = "memory"
BE_VCPUS = "vcpus"
HV_MIGRATION_MODE: HT_MIGRATION_LIVE,
HV_BLOCKDEV_PREFIX: "sd",
HV_REBOOT_BEHAVIOR: INSTANCE_REBOOT_ALLOWED,
+ HV_CPU_MASK: CPU_PINNING_ALL,
},
HT_XEN_HVM: {
HV_BOOT_ORDER: "cd",
HV_USE_LOCALTIME: False,
HV_BLOCKDEV_PREFIX: "hd",
HV_REBOOT_BEHAVIOR: INSTANCE_REBOOT_ALLOWED,
+ HV_CPU_MASK: CPU_PINNING_ALL,
},
HT_KVM: {
HV_KERNEL_PATH: "/boot/vmlinuz-2.6-kvmU",
HV_KVM_USE_CHROOT: False,
HV_MEM_PATH: "",
HV_REBOOT_BEHAVIOR: INSTANCE_REBOOT_ALLOWED,
+ HV_CPU_MASK: CPU_PINNING_ALL,
},
HT_FAKE: {
},
import shutil
import socket
import StringIO
+try:
+ import affinity # pylint: disable=F0401
+except ImportError:
+ affinity = None
from ganeti import utils
from ganeti import constants
_KVM_NETWORK_SCRIPT = constants.SYSCONFDIR + "/ganeti/kvm-vif-bridge"
+_KVM_START_PAUSED_FLAG = "-S"
# TUN/TAP driver constants, taken from <linux/if_tun.h>
# They are architecture-independent and already hardcoded in qemu-kvm source,
constants.HV_KVM_USE_CHROOT: hv_base.NO_CHECK,
constants.HV_MEM_PATH: hv_base.OPT_DIR_CHECK,
constants.HV_REBOOT_BEHAVIOR:
- hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS)
+ hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS),
+ constants.HV_CPU_MASK: hv_base.OPT_MULTI_CPU_MASK_CHECK,
}
_MIGRATION_STATUS_RE = re.compile("Migration\s+status:\s+(\w+)",
re.M | re.I)
+ _MIGRATION_PROGRESS_RE = re.compile(
+ "\s*transferred\s+ram:\s+(?P<transferred>\d+)\s+kbytes\s*\n"
+ "\s*remaining\s+ram:\s+(?P<remaining>\d+)\s+kbytes\s*\n"
+ "\s*total\s+ram:\s+(?P<total>\d+)\s+kbytes\s*\n", re.I)
+
_MIGRATION_INFO_MAX_BAD_ANSWERS = 5
_MIGRATION_INFO_RETRY_DELAY = 2
_VERSION_RE = re.compile(r"\b(\d+)\.(\d+)\.(\d+)\b")
+ _CPU_INFO_RE = re.compile(r"cpu\s+\#(\d+).*thread_id\s*=\s*(\d+)", re.I)
+ _CPU_INFO_CMD = "info cpus"
+ _CONT_CMD = "cont"
+
ANCILLARY_FILES = [
_KVM_NETWORK_SCRIPT,
]
" Network configuration script output: %s" %
(tap, result.fail_reason, result.output))
+ @staticmethod
+ def _VerifyAffinityPackage():
+ if affinity is None:
+ raise errors.HypervisorError("affinity Python package not"
+ " found; cannot use CPU pinning under KVM")
+
+ @staticmethod
+ def _BuildAffinityCpuMask(cpu_list):
+ """Create a CPU mask suitable for sched_setaffinity from a list of
+ CPUs.
+
+ See man taskset for more info on sched_setaffinity masks.
+ For example: [ 0, 2, 5, 6 ] will return 101 (0x65, 0..01100101).
+
+ @type cpu_list: list of int
+ @param cpu_list: list of physical CPU numbers to map to vCPUs in order
+ @rtype: int
+ @return: a bit mask of CPU affinities
+
+ """
+ if cpu_list == constants.CPU_PINNING_OFF:
+ return constants.CPU_PINNING_ALL_KVM
+ else:
+ return sum(2 ** cpu for cpu in cpu_list)
+
+ @classmethod
+ def _AssignCpuAffinity(cls, cpu_mask, process_id, thread_dict):
+ """Change CPU affinity for running VM according to given CPU mask.
+
+ @param cpu_mask: CPU mask as given by the user. e.g. "0-2,4:all:1,3"
+ @type cpu_mask: string
+ @param process_id: process ID of KVM process. Used to pin entire VM
+ to physical CPUs.
+ @type process_id: int
+ @param thread_dict: map of virtual CPUs to KVM thread IDs
+ @type thread_dict: dict int:int
+
+ """
+
+ # Convert the string CPU mask to a list of list of int's
+ cpu_list = utils.ParseMultiCpuMask(cpu_mask)
+
+ if len(cpu_list) == 1:
+ all_cpu_mapping = cpu_list[0]
+ if all_cpu_mapping == constants.CPU_PINNING_OFF:
+ # If CPU pinning has 1 entry that's "all", then do nothing
+ pass
+ else:
+ # If CPU pinning has one non-all entry, map the entire VM to
+ # one set of physical CPUs
+ cls._VerifyAffinityPackage()
+ affinity.set_process_affinity_mask(process_id,
+ cls._BuildAffinityCpuMask(all_cpu_mapping))
+ else:
+ # The number of vCPUs mapped should match the number of vCPUs
+ # reported by KVM. This was already verified earlier, so
+ # here only as a sanity check.
+ assert len(thread_dict) == len(cpu_list)
+ cls._VerifyAffinityPackage()
+
+ # For each vCPU, map it to the proper list of physical CPUs
+ for vcpu, i in zip(cpu_list, range(len(cpu_list))):
+ affinity.set_process_affinity_mask(thread_dict[i],
+ cls._BuildAffinityCpuMask(vcpu))
+
+ def _GetVcpuThreadIds(self, instance_name):
+ """Get a mapping of vCPU no. to thread IDs for the instance
+
+ @type instance_name: string
+ @param instance_name: instance in question
+ @rtype: dictionary of int:int
+ @return: a dictionary mapping vCPU numbers to thread IDs
+
+ """
+ result = {}
+ output = self._CallMonitorCommand(instance_name, self._CPU_INFO_CMD)
+ for line in output.stdout.splitlines():
+ match = self._CPU_INFO_RE.search(line)
+ if not match:
+ continue
+ grp = map(int, match.groups())
+ result[grp[0]] = grp[1]
+
+ return result
+
+ def _ExecuteCpuAffinity(self, instance_name, cpu_mask):
+ """Complete CPU pinning.
+
+ @type instance_name: string
+ @param instance_name: name of instance
+ @type cpu_mask: string
+ @param cpu_mask: CPU pinning mask as entered by user
+
+ """
+ # Get KVM process ID, to be used if need to pin entire VM
+ _, pid, _ = self._InstancePidAlive(instance_name)
+ # Get vCPU thread IDs, to be used if need to pin vCPUs separately
+ thread_dict = self._GetVcpuThreadIds(instance_name)
+ # Run CPU pinning, based on configured mask
+ self._AssignCpuAffinity(cpu_mask, pid, thread_dict)
+
def ListInstances(self):
"""Get the list of running instances.
kvm_cmd.extend(["-daemonize"])
if not instance.hvparams[constants.HV_ACPI]:
kvm_cmd.extend(["-no-acpi"])
- if startup_paused:
- kvm_cmd.extend(["-S"])
if instance.hvparams[constants.HV_REBOOT_BEHAVIOR] == \
constants.INSTANCE_REBOOT_EXIT:
kvm_cmd.extend(["-no-reboot"])
self.ValidateParameters(hvp)
+ if startup_paused:
+ kvm_cmd.extend([_KVM_START_PAUSED_FLAG])
+
if hvp[constants.HV_KVM_FLAG] == constants.HT_KVM_ENABLED:
kvm_cmd.extend(["-enable-kvm"])
elif hvp[constants.HV_KVM_FLAG] == constants.HT_KVM_DISABLED:
continue
self._ConfigureNIC(instance, nic_seq, nic, taps[nic_seq])
+ # CPU affinity requires kvm to start paused, so we set this flag if the
+ # instance is not already paused and if we are not going to accept a
+ # migrating instance. In the latter case, pausing is not needed.
+ start_kvm_paused = not (_KVM_START_PAUSED_FLAG in kvm_cmd) and not incoming
+
+ # Note: CPU pinning is using up_hvp since changes take effect
+ # during instance startup anyway, and to avoid problems when soft
+ # rebooting the instance.
+ cpu_pinning = False
+ if up_hvp.get(constants.HV_CPU_MASK, None):
+ cpu_pinning = True
+ if start_kvm_paused:
+ kvm_cmd.extend([_KVM_START_PAUSED_FLAG])
+
if security_model == constants.HT_SM_POOL:
ss = ssconf.SimpleStore()
uid_pool = uidpool.ParseUidPool(ss.GetUidPool(), separator="\n")
for filename in temp_files:
utils.RemoveFile(filename)
+ # If requested, set CPU affinity and resume instance execution
+ if cpu_pinning:
+ try:
+ self._ExecuteCpuAffinity(instance.name, up_hvp[constants.HV_CPU_MASK])
+ finally:
+ if start_kvm_paused:
+ # To control CPU pinning, the VM was started frozen, so we need
+ # to resume its execution, but only if freezing was not
+ # explicitly requested.
+ # Note: this is done even when an exception occurred so the VM
+ # is not unintentionally frozen.
+ self._CallMonitorCommand(instance.name, self._CONT_CMD)
+
def StartInstance(self, instance, block_devices, startup_paused):
"""Start an instance.
incoming_address = (target, instance.hvparams[constants.HV_MIGRATION_PORT])
self._ExecuteKVMRuntime(instance, kvm_runtime, incoming=incoming_address)
- def FinalizeMigration(self, instance, info, success):
- """Finalize an instance migration.
+ def FinalizeMigrationDst(self, instance, info, success):
+ """Finalize the instance migration on the target node.
Stop the incoming mode KVM.
"""
instance_name = instance.name
port = instance.hvparams[constants.HV_MIGRATION_PORT]
- pidfile, pid, alive = self._InstancePidAlive(instance_name)
+ _, _, alive = self._InstancePidAlive(instance_name)
if not alive:
raise errors.HypervisorError("Instance not running, cannot migrate")
migrate_command = "migrate -d tcp:%s:%s" % (target, port)
self._CallMonitorCommand(instance_name, migrate_command)
+ def FinalizeMigrationSource(self, instance, success, live):
+ """Finalize the instance migration on the source node.
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that was migrated
+ @type success: bool
+ @param success: whether the migration succeeded or not
+ @type live: bool
+ @param live: whether the user requested a live migration or not
+
+ """
+ if success:
+ pidfile, pid, _ = self._InstancePidAlive(instance.name)
+ utils.KillProcess(pid)
+ self._RemoveInstanceRuntimeFiles(pidfile, instance.name)
+ elif live:
+ self._CallMonitorCommand(instance.name, self._CONT_CMD)
+
+ def GetMigrationStatus(self, instance):
+ """Get the migration status
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that is being migrated
+ @rtype: L{objects.MigrationStatus}
+ @return: the status of the current migration (one of
+ L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+ progress info that can be retrieved from the hypervisor
+
+ """
info_command = "info migrate"
- done = False
- broken_answers = 0
- while not done:
- result = self._CallMonitorCommand(instance_name, info_command)
+ for _ in range(self._MIGRATION_INFO_MAX_BAD_ANSWERS):
+ result = self._CallMonitorCommand(instance.name, info_command)
match = self._MIGRATION_STATUS_RE.search(result.stdout)
if not match:
- broken_answers += 1
if not result.stdout:
logging.info("KVM: empty 'info migrate' result")
else:
logging.warning("KVM: unknown 'info migrate' result: %s",
result.stdout)
- time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
else:
status = match.group(1)
- if status == "completed":
- done = True
- elif status == "active":
- # reset the broken answers count
- broken_answers = 0
- time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
- elif status == "failed" or status == "cancelled":
- if not live:
- self._CallMonitorCommand(instance_name, 'cont')
- raise errors.HypervisorError("Migration %s at the kvm level" %
- status)
- else:
- logging.warning("KVM: unknown migration status '%s'", status)
- broken_answers += 1
- time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
- if broken_answers >= self._MIGRATION_INFO_MAX_BAD_ANSWERS:
- raise errors.HypervisorError("Too many 'info migrate' broken answers")
+ if status in constants.HV_KVM_MIGRATION_VALID_STATUSES:
+ migration_status = objects.MigrationStatus(status=status)
+ match = self._MIGRATION_PROGRESS_RE.search(result.stdout)
+ if match:
+ migration_status.transferred_ram = match.group("transferred")
+ migration_status.total_ram = match.group("total")
- utils.KillProcess(pid)
- self._RemoveInstanceRuntimeFiles(pidfile, instance_name)
+ return migration_status
+
+ logging.warning("KVM: unknown migration status '%s'", status)
+
+ time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
+
+ return objects.MigrationStatus(status=constants.HV_MIGRATION_FAILED,
+ info="Too many 'info migrate' broken answers")
def GetNodeInfo(self):
"""Return information about the node.
- This is just a wrapper over the base GetLinuxNodeInfo method.
-
@return: a dict with the following keys (values in MiB):
- memory_total: the total memory size on the node
- memory_free: the available memory on the node for instances
- memory_dom0: the memory used by the node itself, if available
+ - hv_version: the hypervisor version in the form (major, minor,
+ revision)
"""
- return self.GetLinuxNodeInfo()
+ result = self.GetLinuxNodeInfo()
+ _, v_major, v_min, v_rev = self._GetKVMVersion()
+ result[constants.HV_NODEINFO_KEY_VERSION] = (v_major, v_min, v_rev)
+ return result
@classmethod
def GetInstanceConsole(cls, instance, hvparams, beparams):
"""
utils.RemoveFile("/etc/xen/%s" % instance_name)
+ @classmethod
+ def _CreateConfigCpus(cls, cpu_mask):
+ """Create a CPU config string that's compatible with Xen's
+ configuration file.
+
+ """
+ # Convert the string CPU mask to a list of list of int's
+ cpu_list = utils.ParseMultiCpuMask(cpu_mask)
+
+ if len(cpu_list) == 1:
+ all_cpu_mapping = cpu_list[0]
+ if all_cpu_mapping == constants.CPU_PINNING_OFF:
+ # If CPU pinning has 1 entry that's "all", then remove the
+ # parameter from the config file
+ return None
+ else:
+ # If CPU pinning has one non-all entry, mapping all vCPUS (the entire
+ # VM) to one physical CPU, using format 'cpu = "C"'
+ return "cpu = \"%s\"" % ",".join(map(str, all_cpu_mapping))
+ else:
+ def _GetCPUMap(vcpu):
+ if vcpu[0] == constants.CPU_PINNING_ALL_VAL:
+ cpu_map = constants.CPU_PINNING_ALL_XEN
+ else:
+ cpu_map = ",".join(map(str, vcpu))
+ return "\"%s\"" % cpu_map
+
+ # build the result string in format 'cpus = [ "c", "c", "c" ]',
+ # where each c is a physical CPU number, a range, a list, or any
+ # combination
+ return "cpus = [ %s ]" % ", ".join(map(_GetCPUMap, cpu_list))
+
@staticmethod
def _RunXmList(xmlist_errors):
"""Helper function for L{_GetXMList} to run "xm list".
- nr_cpus: total number of CPUs
- nr_nodes: in a NUMA system, the number of domains
- nr_sockets: the number of physical CPU sockets in the node
+ - hv_version: the hypervisor version in the form (major, minor)
"""
# note: in xen 3, memory has changed to total_memory
xmoutput = result.stdout.splitlines()
result = {}
cores_per_socket = threads_per_core = nr_cpus = None
+ xen_major, xen_minor = None, None
for line in xmoutput:
splitfields = line.split(":", 1)
cores_per_socket = int(val)
elif key == "threads_per_core":
threads_per_core = int(val)
+ elif key == "xen_major":
+ xen_major = int(val)
+ elif key == "xen_minor":
+ xen_minor = int(val)
if (cores_per_socket is not None and
threads_per_core is not None and nr_cpus is not None):
if dom0_info is not None:
result["memory_dom0"] = dom0_info[2]
+ if not (xen_major is None or xen_minor is None):
+ result[constants.HV_NODEINFO_KEY_VERSION] = (xen_major, xen_minor)
+
return result
@classmethod
"""
pass
- def FinalizeMigration(self, instance, info, success):
+ def FinalizeMigrationDst(self, instance, info, success):
"""Finalize an instance migration.
After a successful migration we write the xen config file.
if result.failed:
raise errors.HypervisorError("Failed to migrate instance %s: %s" %
(instance.name, result.output))
- # remove old xen file after migration succeeded
- try:
- self._RemoveConfigFile(instance.name)
- except EnvironmentError:
- logging.exception("Failure while removing instance config file")
+
+ def FinalizeMigrationSource(self, instance, success, live):
+ """Finalize the instance migration on the source node.
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that was migrated
+ @type success: bool
+ @param success: whether the migration succeeded or not
+ @type live: bool
+ @param live: whether the user requested a live migration or not
+
+ """
+ # pylint: disable=W0613
+ if success:
+ # remove old xen file after migration succeeded
+ try:
+ self._RemoveConfigFile(instance.name)
+ except EnvironmentError:
+ logging.exception("Failure while removing instance config file")
+
+ def GetMigrationStatus(self, instance):
+ """Get the migration status
+
+ As MigrateInstance for Xen is still blocking, if this method is called it
+ means that MigrateInstance has completed successfully. So we can safely
+ assume that the migration was successful and notify this fact to the client.
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that is being migrated
+ @rtype: L{objects.MigrationStatus}
+ @return: the status of the current migration (one of
+ L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+ progress info that can be retrieved from the hypervisor
+
+ """
+ return objects.MigrationStatus(status=constants.HV_MIGRATION_COMPLETED)
@classmethod
def PowercycleNode(cls):
# TODO: Add a check for the blockdev prefix (matching [a-z:] or similar).
constants.HV_BLOCKDEV_PREFIX: hv_base.NO_CHECK,
constants.HV_REBOOT_BEHAVIOR:
- hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS)
+ hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS),
+ constants.HV_CPU_MASK: hv_base.OPT_MULTI_CPU_MASK_CHECK,
}
@classmethod
# rest of the settings
config.write("memory = %d\n" % instance.beparams[constants.BE_MEMORY])
config.write("vcpus = %d\n" % instance.beparams[constants.BE_VCPUS])
+ cpu_pinning = cls._CreateConfigCpus(hvp[constants.HV_CPU_MASK])
+ if cpu_pinning:
+ config.write("%s\n" % cpu_pinning)
+
config.write("name = '%s'\n" % instance.name)
vif_data = []
# TODO: Add a check for the blockdev prefix (matching [a-z:] or similar).
constants.HV_BLOCKDEV_PREFIX: hv_base.NO_CHECK,
constants.HV_REBOOT_BEHAVIOR:
- hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS)
+ hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS),
+ constants.HV_CPU_MASK: hv_base.OPT_MULTI_CPU_MASK_CHECK,
}
@classmethod
config.write("builder = 'hvm'\n")
config.write("memory = %d\n" % instance.beparams[constants.BE_MEMORY])
config.write("vcpus = %d\n" % instance.beparams[constants.BE_VCPUS])
+ cpu_pinning = cls._CreateConfigCpus(hvp[constants.HV_CPU_MASK])
+ if cpu_pinning:
+ config.write("%s\n" % cpu_pinning)
+
config.write("name = '%s'\n" % instance.name)
if hvp[constants.HV_PAE]:
config.write("pae = 1\n")
metrics and thus the influence of the dynamic utilisation will be
practically insignificant.
--t *datafile*, --text-data=*datafile*
- The name of the file holding node and instance information (if not
- collecting via RAPI or LUXI). This or one of the other backends must
- be selected.
-
-S *filename*, --save-cluster=*filename*
If given, the state of the cluster before the balancing is saved to
the given file plus the extension "original"
(i.e. *filename*.original), and the state at the end of the
balancing is saved to the given file plus the extension "balanced"
(i.e. *filename*.balanced). This allows re-feeding the cluster state
- to either hbal itself or for example hspace.
+ to either hbal itself or for example hspace via the ``-t`` option.
+
+-t *datafile*, --text-data=*datafile*
+ Backend specification: the name of the file holding node and instance
+ information (if not collecting via RAPI or LUXI). This or one of the
+ other backends must be selected. The option is described in the man
+ page **htools**(1).
-m *cluster*
- Collect data directly from the *cluster* given as an argument via
- RAPI. If the argument doesn't contain a colon (:), then it is
- converted into a fully-built URL via prepending ``https://`` and
- appending the default RAPI port, otherwise it's considered a
- fully-specified URL and is used as-is.
+ Backend specification: collect data directly from the *cluster* given
+ as an argument via RAPI. The option is described in the man page
+ **htools**(1).
-L [*path*]
- Collect data directly from the master daemon, which is to be
- contacted via the luxi (an internal Ganeti protocol). An optional
- *path* argument is interpreted as the path to the unix socket on
- which the master daemon listens; otherwise, the default path used by
- ganeti when installed with *--localstatedir=/var* is used.
+ Backend specification: collect data directly from the master daemon,
+ which is to be contacted via LUXI (an internal Ganeti protocol). The
+ option is described in the man page **htools**(1).
-X
When using the Luxi backend, hbal can also execute the given
jobset will be executed in parallel. The jobsets themselves are
executed serially.
+ The execution of the job series can be interrupted, see below for
+ signal handling.
+
-l *N*, --max-length=*N*
Restrict the solution to this length. This can be used for example
to automate the execution of the balancing.
-V, --version
Just show the program version and exit.
+ SIGNAL HANDLING
+ ---------------
+
+ When executing jobs via LUXI (using the ``-X`` option), normally hbal
+ will execute all jobs until either one errors out or all the jobs finish
+ successfully.
+
+ Since balancing can take a long time, it is possible to stop hbal early
+ in two ways:
+
+ - by sending a ``SIGINT`` (``^C``), hbal will register the termination
+ request, and will wait until the currently submitted jobs finish, at
+ which point it will exit (with exit code 1)
+ - by sending a ``SIGTERM``, hbal will immediately exit (with exit code
+ 2); it is the responsibility of the user to follow up with Ganeti the
+ result of the currently-executing jobs
+
+ Note that in any situation, it's perfectly safe to kill hbal, either via
+ the above signals or via any other signal (e.g. ``SIGQUIT``,
+ ``SIGKILL``), since the jobs themselves are processed by Ganeti whereas
+ hbal (after submission) only watches their progression. In this case,
+ the use will again have to query Ganeti for job results.
+
EXIT STATUS
-----------
- The exit status of the command will be zero, unless for some reason
- the algorithm fatally failed (e.g. wrong node or instance data), or
- (in case of job execution) any job has failed.
+ The exit status of the command will be zero, unless for some reason the
+ algorithm fatally failed (e.g. wrong node or instance data), or (in case
+ of job execution) either one of the jobs has failed or the balancing was
+ interrupted early.
BUGS
----
- The program does not check its input data for consistency, and aborts
- with cryptic errors messages in this case.
+ The program does not check all its input data for consistency, and
+ sometime aborts with cryptic errors messages with invalid data.
The algorithm is not perfect.
- The output format is not easily scriptable, and the program should
- feed moves directly into Ganeti (either via RAPI or via a gnt-debug
- input file).
-
EXAMPLE
-------