import fcntl
import shutil
import socket
+import stat
import StringIO
+try:
+ import affinity # pylint: disable=F0401
+except ImportError:
+ affinity = None
from ganeti import utils
from ganeti import constants
_KVM_NETWORK_SCRIPT = constants.SYSCONFDIR + "/ganeti/kvm-vif-bridge"
+_KVM_START_PAUSED_FLAG = "-S"
# TUN/TAP driver constants, taken from <linux/if_tun.h>
# They are architecture-independent and already hardcoded in qemu-kvm source,
"""QEMU Messaging Protocol (QMP) message.
"""
-
def __init__(self, data):
"""Creates a new QMP message based on the passed data.
is not contained in the message
"""
-
- if field_name in self.data:
- return self.data[field_name]
-
- return None
+ return self.data.get(field_name, None)
def __setitem__(self, field_name, field_value):
"""Set the value of the required field_name to field_value.
return QmpMessage(data)
def __str__(self):
- # The protocol expects the JSON object to be sent as a single
- # line, hence the need for indent=False.
- return serializer.DumpJson(self.data, indent=False)
+ # The protocol expects the JSON object to be sent as a single line.
+ return serializer.DumpJson(self.data)
def __eq__(self, other):
# When comparing two QmpMessages, we are interested in comparing
_FIRST_MESSAGE_KEY = "QMP"
_EVENT_KEY = "event"
_ERROR_KEY = "error"
+ _RETURN_KEY = RETURN_KEY = "return"
+ _ACTUAL_KEY = ACTUAL_KEY = "actual"
_ERROR_CLASS_KEY = "class"
_ERROR_DATA_KEY = "data"
_ERROR_DESC_KEY = "desc"
self._connected = False
self._buf = ""
+ def _check_socket(self):
+ sock_stat = None
+ try:
+ sock_stat = os.stat(self.monitor_filename)
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ raise errors.HypervisorError("No qmp socket found")
+ else:
+ raise errors.HypervisorError("Error checking qmp socket: %s",
+ utils.ErrnoOrStr(err))
+ if not stat.S_ISSOCK(sock_stat.st_mode):
+ raise errors.HypervisorError("Qmp socket is not a socket")
+
def _check_connection(self):
"""Make sure that the connection is established.
@raise errors.ProgrammerError: when there are data serialization errors
"""
- self.sock.connect(self.monitor_filename)
+ if self._connected:
+ raise errors.ProgrammerError("Cannot connect twice")
+
+ self._check_socket()
+
+ # Check file existance/stuff
+ try:
+ self.sock.connect(self.monitor_filename)
+ except EnvironmentError:
+ raise errors.HypervisorError("Can't connect to qmp socket")
self._connected = True
# Check if we receive a correct greeting message from the server
class KVMHypervisor(hv_base.BaseHypervisor):
- """KVM hypervisor interface"""
+ """KVM hypervisor interface
+
+ """
CAN_MIGRATE = True
_ROOT_DIR = constants.RUN_GANETI_DIR + "/kvm-hypervisor"
hv_base.ParamInSet(False,
constants.HT_KVM_SPICE_VALID_VIDEO_STREAM_DETECTION_OPTIONS),
constants.HV_KVM_SPICE_AUDIO_COMPR: hv_base.NO_CHECK,
+ constants.HV_KVM_SPICE_USE_TLS: hv_base.NO_CHECK,
+ constants.HV_KVM_SPICE_TLS_CIPHERS: hv_base.NO_CHECK,
+ constants.HV_KVM_SPICE_USE_VDAGENT: hv_base.NO_CHECK,
constants.HV_KVM_FLOPPY_IMAGE_PATH: hv_base.OPT_FILE_CHECK,
constants.HV_CDROM_IMAGE_PATH: hv_base.OPT_FILE_CHECK,
constants.HV_KVM_CDROM2_IMAGE_PATH: hv_base.OPT_FILE_CHECK,
constants.HV_KVM_USE_CHROOT: hv_base.NO_CHECK,
constants.HV_MEM_PATH: hv_base.OPT_DIR_CHECK,
constants.HV_REBOOT_BEHAVIOR:
- hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS)
+ hv_base.ParamInSet(True, constants.REBOOT_BEHAVIORS),
+ constants.HV_CPU_MASK: hv_base.OPT_MULTI_CPU_MASK_CHECK,
}
_MIGRATION_STATUS_RE = re.compile("Migration\s+status:\s+(\w+)",
re.M | re.I)
+ _MIGRATION_PROGRESS_RE = \
+ re.compile(r"\s*transferred\s+ram:\s+(?P<transferred>\d+)\s+kbytes\s*\n"
+ r"\s*remaining\s+ram:\s+(?P<remaining>\d+)\s+kbytes\s*\n"
+ r"\s*total\s+ram:\s+(?P<total>\d+)\s+kbytes\s*\n", re.I)
+
_MIGRATION_INFO_MAX_BAD_ANSWERS = 5
_MIGRATION_INFO_RETRY_DELAY = 2
_VERSION_RE = re.compile(r"\b(\d+)\.(\d+)(\.(\d+))?\b")
+ _CPU_INFO_RE = re.compile(r"cpu\s+\#(\d+).*thread_id\s*=\s*(\d+)", re.I)
+ _CPU_INFO_CMD = "info cpus"
+ _CONT_CMD = "cont"
+
ANCILLARY_FILES = [
_KVM_NETWORK_SCRIPT,
]
@type tap: str
"""
-
if instance.tags:
tags = " ".join(instance.tags)
else:
" Network configuration script output: %s" %
(tap, result.fail_reason, result.output))
+ @staticmethod
+ def _VerifyAffinityPackage():
+ if affinity is None:
+ raise errors.HypervisorError("affinity Python package not"
+ " found; cannot use CPU pinning under KVM")
+
+ @staticmethod
+ def _BuildAffinityCpuMask(cpu_list):
+ """Create a CPU mask suitable for sched_setaffinity from a list of
+ CPUs.
+
+ See man taskset for more info on sched_setaffinity masks.
+ For example: [ 0, 2, 5, 6 ] will return 101 (0x65, 0..01100101).
+
+ @type cpu_list: list of int
+ @param cpu_list: list of physical CPU numbers to map to vCPUs in order
+ @rtype: int
+ @return: a bit mask of CPU affinities
+
+ """
+ if cpu_list == constants.CPU_PINNING_OFF:
+ return constants.CPU_PINNING_ALL_KVM
+ else:
+ return sum(2 ** cpu for cpu in cpu_list)
+
+ @classmethod
+ def _AssignCpuAffinity(cls, cpu_mask, process_id, thread_dict):
+ """Change CPU affinity for running VM according to given CPU mask.
+
+ @param cpu_mask: CPU mask as given by the user. e.g. "0-2,4:all:1,3"
+ @type cpu_mask: string
+ @param process_id: process ID of KVM process. Used to pin entire VM
+ to physical CPUs.
+ @type process_id: int
+ @param thread_dict: map of virtual CPUs to KVM thread IDs
+ @type thread_dict: dict int:int
+
+ """
+ # Convert the string CPU mask to a list of list of int's
+ cpu_list = utils.ParseMultiCpuMask(cpu_mask)
+
+ if len(cpu_list) == 1:
+ all_cpu_mapping = cpu_list[0]
+ if all_cpu_mapping == constants.CPU_PINNING_OFF:
+ # If CPU pinning has 1 entry that's "all", then do nothing
+ pass
+ else:
+ # If CPU pinning has one non-all entry, map the entire VM to
+ # one set of physical CPUs
+ cls._VerifyAffinityPackage()
+ affinity.set_process_affinity_mask(process_id,
+ cls._BuildAffinityCpuMask(all_cpu_mapping))
+ else:
+ # The number of vCPUs mapped should match the number of vCPUs
+ # reported by KVM. This was already verified earlier, so
+ # here only as a sanity check.
+ assert len(thread_dict) == len(cpu_list)
+ cls._VerifyAffinityPackage()
+
+ # For each vCPU, map it to the proper list of physical CPUs
+ for vcpu, i in zip(cpu_list, range(len(cpu_list))):
+ affinity.set_process_affinity_mask(thread_dict[i],
+ cls._BuildAffinityCpuMask(vcpu))
+
+ def _GetVcpuThreadIds(self, instance_name):
+ """Get a mapping of vCPU no. to thread IDs for the instance
+
+ @type instance_name: string
+ @param instance_name: instance in question
+ @rtype: dictionary of int:int
+ @return: a dictionary mapping vCPU numbers to thread IDs
+
+ """
+ result = {}
+ output = self._CallMonitorCommand(instance_name, self._CPU_INFO_CMD)
+ for line in output.stdout.splitlines():
+ match = self._CPU_INFO_RE.search(line)
+ if not match:
+ continue
+ grp = map(int, match.groups())
+ result[grp[0]] = grp[1]
+
+ return result
+
+ def _ExecuteCpuAffinity(self, instance_name, cpu_mask):
+ """Complete CPU pinning.
+
+ @type instance_name: string
+ @param instance_name: name of instance
+ @type cpu_mask: string
+ @param cpu_mask: CPU pinning mask as entered by user
+
+ """
+ # Get KVM process ID, to be used if need to pin entire VM
+ _, pid, _ = self._InstancePidAlive(instance_name)
+ # Get vCPU thread IDs, to be used if need to pin vCPUs separately
+ thread_dict = self._GetVcpuThreadIds(instance_name)
+ # Run CPU pinning, based on configured mask
+ self._AssignCpuAffinity(cpu_mask, pid, thread_dict)
+
def ListInstances(self):
"""Get the list of running instances.
return None
_, memory, vcpus = self._InstancePidInfo(pid)
- stat = "---b-"
+ istat = "---b-"
times = "0"
- return (instance_name, pid, memory, vcpus, stat, times)
+ try:
+ qmp = QmpConnection(self._InstanceQmpMonitor(instance_name))
+ qmp.connect()
+ vcpus = len(qmp.Execute("query-cpus")[qmp.RETURN_KEY])
+ # Will fail if ballooning is not enabled, but we can then just resort to
+ # the value above.
+ mem_bytes = qmp.Execute("query-balloon")[qmp.RETURN_KEY][qmp.ACTUAL_KEY]
+ memory = mem_bytes / 1048576
+ except errors.HypervisorError:
+ pass
+
+ return (instance_name, pid, memory, vcpus, istat, times)
def GetAllInstancesInfo(self):
"""Get properties of all instances.
"""Generate KVM information to start an instance.
"""
- # pylint: disable=R0914
+ # pylint: disable=R0914,R0915
_, v_major, v_min, _ = self._GetKVMVersion()
pidfile = self._InstancePidFile(instance.name)
kvm_cmd = [kvm]
# used just by the vnc server, if enabled
kvm_cmd.extend(["-name", instance.name])
- kvm_cmd.extend(["-m", instance.beparams[constants.BE_MEMORY]])
+ kvm_cmd.extend(["-m", instance.beparams[constants.BE_MAXMEM]])
kvm_cmd.extend(["-smp", instance.beparams[constants.BE_VCPUS]])
kvm_cmd.extend(["-pidfile", pidfile])
+ kvm_cmd.extend(["-balloon", "virtio"])
kvm_cmd.extend(["-daemonize"])
if not instance.hvparams[constants.HV_ACPI]:
kvm_cmd.extend(["-no-acpi"])
- if startup_paused:
- kvm_cmd.extend(["-S"])
if instance.hvparams[constants.HV_REBOOT_BEHAVIOR] == \
constants.INSTANCE_REBOOT_EXIT:
kvm_cmd.extend(["-no-reboot"])
self.ValidateParameters(hvp)
+ if startup_paused:
+ kvm_cmd.extend([_KVM_START_PAUSED_FLAG])
+
if hvp[constants.HV_KVM_FLAG] == constants.HT_KVM_ENABLED:
kvm_cmd.extend(["-enable-kvm"])
elif hvp[constants.HV_KVM_FLAG] == constants.HT_KVM_DISABLED:
# we have both ipv4 and ipv6, let's use the cluster default IP
# version
cluster_family = ssconf.SimpleStore().GetPrimaryIPFamily()
- spice_ip_version = netutils.IPAddress.GetVersionFromAddressFamily(
- cluster_family)
+ spice_ip_version = \
+ netutils.IPAddress.GetVersionFromAddressFamily(cluster_family)
elif addresses[constants.IP4_VERSION]:
spice_ip_version = constants.IP4_VERSION
elif addresses[constants.IP6_VERSION]:
# ValidateParameters checked it.
spice_address = spice_bind
- spice_arg = "addr=%s,port=%s" % (spice_address, instance.network_port)
+ spice_arg = "addr=%s" % spice_address
+ if hvp[constants.HV_KVM_SPICE_USE_TLS]:
+ spice_arg = "%s,tls-port=%s,x509-cacert-file=%s" % (spice_arg,
+ instance.network_port, constants.SPICE_CACERT_FILE)
+ spice_arg = "%s,x509-key-file=%s,x509-cert-file=%s" % (spice_arg,
+ constants.SPICE_CERT_FILE, constants.SPICE_CERT_FILE)
+ tls_ciphers = hvp[constants.HV_KVM_SPICE_TLS_CIPHERS]
+ if tls_ciphers:
+ spice_arg = "%s,tls-ciphers=%s" % (spice_arg, tls_ciphers)
+ else:
+ spice_arg = "%s,port=%s" % (spice_arg, instance.network_port)
+
if not hvp[constants.HV_KVM_SPICE_PASSWORD_FILE]:
spice_arg = "%s,disable-ticketing" % spice_arg
# Audio compression, by default in qemu-kvm it is on
if not hvp[constants.HV_KVM_SPICE_AUDIO_COMPR]:
spice_arg = "%s,playback-compression=off" % spice_arg
+ if not hvp[constants.HV_KVM_SPICE_USE_VDAGENT]:
+ spice_arg = "%s,agent-mouse=off" % spice_arg
logging.info("KVM: SPICE will listen on port %s", instance.network_port)
kvm_cmd.extend(["-spice", spice_arg])
continue
self._ConfigureNIC(instance, nic_seq, nic, taps[nic_seq])
+ # CPU affinity requires kvm to start paused, so we set this flag if the
+ # instance is not already paused and if we are not going to accept a
+ # migrating instance. In the latter case, pausing is not needed.
+ start_kvm_paused = not (_KVM_START_PAUSED_FLAG in kvm_cmd) and not incoming
+
+ # Note: CPU pinning is using up_hvp since changes take effect
+ # during instance startup anyway, and to avoid problems when soft
+ # rebooting the instance.
+ cpu_pinning = False
+ if up_hvp.get(constants.HV_CPU_MASK, None):
+ cpu_pinning = True
+ if start_kvm_paused:
+ kvm_cmd.extend([_KVM_START_PAUSED_FLAG])
+
if security_model == constants.HT_SM_POOL:
ss = ssconf.SimpleStore()
uid_pool = uidpool.ParseUidPool(ss.GetUidPool(), separator="\n")
# for connection.
spice_password_file = conf_hvp[constants.HV_KVM_SPICE_PASSWORD_FILE]
if spice_password_file:
+ spice_pwd = ""
try:
spice_pwd = utils.ReadOneLineFile(spice_password_file, strict=True)
- qmp = QmpConnection(self._InstanceQmpMonitor(instance.name))
- qmp.connect()
- arguments = {
- "protocol": "spice",
- "password": spice_pwd,
- }
- qmp.Execute("set_password", arguments)
except EnvironmentError, err:
raise errors.HypervisorError("Failed to open SPICE password file %s: %s"
% (spice_password_file, err))
+ qmp = QmpConnection(self._InstanceQmpMonitor(instance.name))
+ qmp.connect()
+ arguments = {
+ "protocol": "spice",
+ "password": spice_pwd,
+ }
+ qmp.Execute("set_password", arguments)
+
for filename in temp_files:
utils.RemoveFile(filename)
+ # If requested, set CPU affinity and resume instance execution
+ if cpu_pinning:
+ try:
+ self._ExecuteCpuAffinity(instance.name, up_hvp[constants.HV_CPU_MASK])
+ finally:
+ if start_kvm_paused:
+ # To control CPU pinning, the VM was started frozen, so we need
+ # to resume its execution, but only if freezing was not
+ # explicitly requested.
+ # Note: this is done even when an exception occurred so the VM
+ # is not unintentionally frozen.
+ self._CallMonitorCommand(instance.name, self._CONT_CMD)
+
def StartInstance(self, instance, block_devices, startup_paused):
"""Start an instance.
incoming_address = (target, instance.hvparams[constants.HV_MIGRATION_PORT])
self._ExecuteKVMRuntime(instance, kvm_runtime, incoming=incoming_address)
- def FinalizeMigration(self, instance, info, success):
- """Finalize an instance migration.
+ def FinalizeMigrationDst(self, instance, info, success):
+ """Finalize the instance migration on the target node.
Stop the incoming mode KVM.
"""
instance_name = instance.name
port = instance.hvparams[constants.HV_MIGRATION_PORT]
- pidfile, pid, alive = self._InstancePidAlive(instance_name)
+ _, _, alive = self._InstancePidAlive(instance_name)
if not alive:
raise errors.HypervisorError("Instance not running, cannot migrate")
migrate_command = "migrate -d tcp:%s:%s" % (target, port)
self._CallMonitorCommand(instance_name, migrate_command)
+ def FinalizeMigrationSource(self, instance, success, live):
+ """Finalize the instance migration on the source node.
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that was migrated
+ @type success: bool
+ @param success: whether the migration succeeded or not
+ @type live: bool
+ @param live: whether the user requested a live migration or not
+
+ """
+ if success:
+ pidfile, pid, _ = self._InstancePidAlive(instance.name)
+ utils.KillProcess(pid)
+ self._RemoveInstanceRuntimeFiles(pidfile, instance.name)
+ elif live:
+ self._CallMonitorCommand(instance.name, self._CONT_CMD)
+
+ def GetMigrationStatus(self, instance):
+ """Get the migration status
+
+ @type instance: L{objects.Instance}
+ @param instance: the instance that is being migrated
+ @rtype: L{objects.MigrationStatus}
+ @return: the status of the current migration (one of
+ L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
+ progress info that can be retrieved from the hypervisor
+
+ """
info_command = "info migrate"
- done = False
- broken_answers = 0
- while not done:
- result = self._CallMonitorCommand(instance_name, info_command)
+ for _ in range(self._MIGRATION_INFO_MAX_BAD_ANSWERS):
+ result = self._CallMonitorCommand(instance.name, info_command)
match = self._MIGRATION_STATUS_RE.search(result.stdout)
if not match:
- broken_answers += 1
if not result.stdout:
logging.info("KVM: empty 'info migrate' result")
else:
logging.warning("KVM: unknown 'info migrate' result: %s",
result.stdout)
- time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
else:
status = match.group(1)
- if status == "completed":
- done = True
- elif status == "active":
- # reset the broken answers count
- broken_answers = 0
- time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
- elif status == "failed" or status == "cancelled":
- if not live:
- self._CallMonitorCommand(instance_name, 'cont')
- raise errors.HypervisorError("Migration %s at the kvm level" %
- status)
- else:
- logging.warning("KVM: unknown migration status '%s'", status)
- broken_answers += 1
- time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
- if broken_answers >= self._MIGRATION_INFO_MAX_BAD_ANSWERS:
- raise errors.HypervisorError("Too many 'info migrate' broken answers")
+ if status in constants.HV_KVM_MIGRATION_VALID_STATUSES:
+ migration_status = objects.MigrationStatus(status=status)
+ match = self._MIGRATION_PROGRESS_RE.search(result.stdout)
+ if match:
+ migration_status.transferred_ram = match.group("transferred")
+ migration_status.total_ram = match.group("total")
- utils.KillProcess(pid)
- self._RemoveInstanceRuntimeFiles(pidfile, instance_name)
+ return migration_status
+
+ logging.warning("KVM: unknown migration status '%s'", status)
+
+ time.sleep(self._MIGRATION_INFO_RETRY_DELAY)
+
+ return objects.MigrationStatus(status=constants.HV_MIGRATION_FAILED,
+ info="Too many 'info migrate' broken answers")
def GetNodeInfo(self):
"""Return information about the node.
constants.HV_KVM_SPICE_JPEG_IMG_COMPR,
constants.HV_KVM_SPICE_ZLIB_GLZ_IMG_COMPR,
constants.HV_KVM_SPICE_STREAMING_VIDEO_DETECTION,
+ constants.HV_KVM_SPICE_USE_TLS,
])
for param in spice_additional_params:
if hvparams[param]: