# 02110-1301, USA.
-"""Functions used by the node daemon"""
+"""Functions used by the node daemon
+
+@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
+ the L{UploadFile} function
+
+"""
import os
from ganeti import ssconf
+_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
+
+
class RPCFail(Exception):
"""Class denoting RPC failure.
"""
+
def _Fail(msg, *args, **kwargs):
"""Log an error and the raise an RPCFail exception.
utils.RemoveFile(full_name)
+def _BuildUploadFileList():
+ """Build the list of allowed upload files.
+
+ This is abstracted so that it's built only once at module import time.
+
+ """
+ allowed_files = set([
+ constants.CLUSTER_CONF_FILE,
+ constants.ETC_HOSTS,
+ constants.SSH_KNOWN_HOSTS_FILE,
+ constants.VNC_PASSWORD_FILE,
+ constants.RAPI_CERT_FILE,
+ constants.RAPI_USERS_FILE,
+ constants.HMAC_CLUSTER_KEY,
+ ])
+
+ for hv_name in constants.HYPER_TYPES:
+ hv_class = hypervisor.GetHypervisorClass(hv_name)
+ allowed_files.update(hv_class.GetAncillaryFiles())
+
+ return frozenset(allowed_files)
+
+
+_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
+
+
def JobQueuePurge():
"""Removes job queue files and archived jobs.
master_ip = cfg.GetMasterIP()
master_node = cfg.GetMasterNode()
except errors.ConfigurationError, err:
- _Fail("Cluster configuration incomplete", exc=True)
- return master_netdev, master_ip, master_node
+ _Fail("Cluster configuration incomplete: %s", err, exc=True)
+ return (master_netdev, master_ip, master_node)
-def StartMaster(start_daemons):
+def StartMaster(start_daemons, no_voting):
"""Activate local node as master node.
The function will always try activate the IP address of the master
@type start_daemons: boolean
@param start_daemons: whether to also start the master
daemons (ganeti-masterd and ganeti-rapi)
+ @type no_voting: boolean
+ @param no_voting: whether to start ganeti-masterd without a node vote
+ (if start_daemons is True), but still non-interactively
@rtype: None
"""
# GetMasterInfo will raise an exception if not able to return data
master_netdev, master_ip, _ = GetMasterInfo()
- payload = []
+ err_msgs = []
if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
if utils.OwnIpAddress(master_ip):
# we already have the ip:
else:
msg = "Someone else has the master ip, not activating"
logging.error(msg)
- payload.append(msg)
+ err_msgs.append(msg)
else:
result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
"dev", master_netdev, "label",
if result.failed:
msg = "Can't activate master IP: %s" % result.output
logging.error(msg)
- payload.append(msg)
+ err_msgs.append(msg)
result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
"-s", master_ip, master_ip])
# and now start the master and rapi daemons
if start_daemons:
- for daemon in 'ganeti-masterd', 'ganeti-rapi':
- result = utils.RunCmd([daemon])
+ daemons_params = {
+ 'ganeti-masterd': [],
+ 'ganeti-rapi': [],
+ }
+ if no_voting:
+ daemons_params['ganeti-masterd'].append('--no-voting')
+ daemons_params['ganeti-masterd'].append('--yes-do-it')
+ for daemon in daemons_params:
+ cmd = [daemon]
+ cmd.extend(daemons_params[daemon])
+ result = utils.RunCmd(cmd)
if result.failed:
msg = "Can't start daemon %s: %s" % (daemon, result.output)
logging.error(msg)
- payload.append(msg)
+ err_msgs.append(msg)
- if payload:
- _Fail("; ".join(payload))
+ if err_msgs:
+ _Fail("; ".join(err_msgs))
def StopMaster(stop_daemons):
if stop_daemons:
# stop/kill the rapi and the master daemon
- for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+ for daemon in constants.RAPI, constants.MASTERD:
utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
try:
priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
- f = open(pub_key, 'r')
- try:
- utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
- finally:
- f.close()
+ utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
utils.RemoveFile(priv_key)
utils.RemoveFile(pub_key)
except errors.OpExecError:
logging.exception("Error while processing ssh files")
+ try:
+ utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
+ utils.RemoveFile(constants.RAPI_CERT_FILE)
+ utils.RemoveFile(constants.SSL_CERT_FILE)
+ except:
+ logging.exception("Error while removing cluster secrets")
+
+ confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
+
+ if confd_pid:
+ utils.KillProcess(confd_pid, timeout=2)
+
# Raise a custom exception (handled in ganeti-noded)
raise errors.QuitGanetiException(True, 'Shutdown scheduled')
def GetNodeInfo(vgname, hypervisor_type):
- """Gives back a hash with different informations about the node.
+ """Gives back a hash with different information about the node.
@type vgname: C{string}
@param vgname: the name of the volume group to ask for disk space information
if hyp_info is not None:
outputarray.update(hyp_info)
- f = open("/proc/sys/kernel/random/boot_id", 'r')
- try:
- outputarray["bootid"] = f.read(128).rstrip("\n")
- finally:
- f.close()
+ outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
return outputarray
tmp[my_name] = ("Can't find my own primary/secondary IP"
" in the node list")
else:
- port = utils.GetNodeDaemonPort()
+ port = utils.GetDaemonPort(constants.NODED)
for name, pip, sip in what[constants.NV_NODENETTEST]:
fail = []
if not utils.TcpPing(pip, port, source=my_pip):
name, size, attr = match.groups()
inactive = attr[4] == '-'
online = attr[5] == 'o'
+ virtual = attr[0] == 'v'
+ if virtual:
+ # we don't want to report such volumes as existing, since they
+ # don't really hold data
+ continue
lvs[name] = (size, inactive, online)
return lvs
def GetInstanceInfo(instance, hname):
- """Gives back the informations about an instance as a dictionary.
+ """Gives back the information about an instance as a dictionary.
@type instance: string
@param instance: the instance name
for hname in hypervisor_list:
iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
if iinfo:
- for name, inst_id, memory, vcpus, state, times in iinfo:
+ for name, _, memory, vcpus, state, times in iinfo:
value = {
'memory': memory,
'vcpus': vcpus,
"""
inst_os = OSFromDisk(instance.os)
- create_env = OSEnvironment(instance)
+ create_env = OSEnvironment(instance, inst_os)
if reinstall:
create_env['INSTANCE_REINSTALL'] = "1"
"""
inst_os = OSFromDisk(instance.os)
- rename_env = OSEnvironment(instance)
+ rename_env = OSEnvironment(instance, inst_os)
rename_env['OLD_INSTANCE_NAME'] = old_name
logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
def _GetVGInfo(vg_name):
- """Get informations about the volume group.
+ """Get information about the volume group.
@type vg_name: str
@param vg_name: the volume group which we query
"pv_count": int(valarr[2]),
}
except ValueError, err:
- logging.exception("Fail to parse vgs output")
+ logging.exception("Fail to parse vgs output: %s", err)
else:
logging.error("vgs output has the wrong number of fields (expected"
" three): %s", str(valarr))
"""Remove the block device symlinks belonging to the given instance.
"""
- for idx, disk in enumerate(disks):
+ for idx, _ in enumerate(disks):
link_name = _GetBlockDevSymlinkPath(instance_name, idx)
if os.path.islink(link_name):
try:
# test every 10secs for 2min
time.sleep(1)
- for dummy in range(11):
+ for _ in range(11):
if instance.name not in GetInstanceList([hv_name]):
break
time.sleep(10)
instance OS, do not recreate the VM
- L{constants.INSTANCE_REBOOT_HARD}: tear down and
restart the VM (at the hypervisor level)
- - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
- is not accepted here, since that mode is handled
- differently
+ - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
+ not accepted here, since that mode is handled differently, in
+ cmdlib, and translates into full stop and start of the
+ instance (instead of a call_instance_reboot RPC)
@rtype: None
"""
def BlockdevShutdown(disk):
"""Shut down a block device.
- First, if the device is assembled (Attach() is successfull), then
+ First, if the device is assembled (Attach() is successful), then
the device is shutdown. Then the children of the device are
shutdown.
rbd = _RecursiveFindBD(dsk)
if rbd is None:
_Fail("Can't find device %s", dsk)
+
stats.append(rbd.CombinedSyncStatus())
+
return stats
def _RecursiveFindBD(disk):
"""Check if a device is activated.
- If so, return informations about the real device.
+ If so, return information about the real device.
@type disk: L{objects.Disk}
@param disk: the disk object we need to find
def BlockdevFind(disk):
"""Check if a device is activated.
- If it is, return informations about the real device.
+ If it is, return information about the real device.
@type disk: L{objects.Disk}
@param disk: the disk to find
- @rtype: None or tuple
- @return: None if the disk cannot be found, otherwise a
- tuple (device_path, major, minor, sync_percent,
- estimated_time, is_degraded)
+ @rtype: None or objects.BlockDevStatus
+ @return: None if the disk cannot be found, otherwise a the current
+ information
"""
try:
rbd = _RecursiveFindBD(disk)
except errors.BlockDeviceError, err:
_Fail("Failed to find device: %s", err, exc=True)
+
if rbd is None:
return None
- return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
+
+ return rbd.GetSyncStatus()
+
+
+def BlockdevGetsize(disks):
+ """Computes the size of the given disks.
+
+ If a disk is not found, returns None instead.
+
+ @type disks: list of L{objects.Disk}
+ @param disks: the list of disk to compute the size for
+ @rtype: list
+ @return: list with elements None if the disk cannot be found,
+ otherwise the size
+
+ """
+ result = []
+ for cf in disks:
+ try:
+ rbd = _RecursiveFindBD(cf)
+ except errors.BlockDeviceError, err:
+ result.append(None)
+ continue
+ if rbd is None:
+ result.append(None)
+ else:
+ result.append(rbd.GetActualSize())
+ return result
+
+
+def BlockdevExport(disk, dest_node, dest_path, cluster_name):
+ """Export a block device to a remote node.
+
+ @type disk: L{objects.Disk}
+ @param disk: the description of the disk to export
+ @type dest_node: str
+ @param dest_node: the destination node to export to
+ @type dest_path: str
+ @param dest_path: the destination path on the target node
+ @type cluster_name: str
+ @param cluster_name: the cluster name, needed for SSH hostalias
+ @rtype: None
+
+ """
+ real_disk = _RecursiveFindBD(disk)
+ if real_disk is None:
+ _Fail("Block device '%s' is not set up", disk)
+
+ real_disk.Open()
+
+ # the block size on the read dd is 1MiB to match our units
+ expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
+ "dd if=%s bs=1048576 count=%s",
+ real_disk.dev_path, str(disk.size))
+
+ # we set here a smaller block size as, due to ssh buffering, more
+ # than 64-128k will mostly ignored; we use nocreat to fail if the
+ # device is not already there or we pass a wrong path; we use
+ # notrunc to no attempt truncate on an LV device; we use oflag=dsync
+ # to not buffer too much memory; this means that at best, we flush
+ # every 64k, which will not be very fast
+ destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
+ " oflag=dsync", dest_path)
+
+ remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+ constants.GANETI_RUNAS,
+ destcmd)
+
+ # all commands have been checked, so we're safe to combine them
+ command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
+
+ result = utils.RunCmd(["bash", "-c", command])
+
+ if result.failed:
+ _Fail("Disk copy command '%s' returned error: %s"
+ " output: %s", command, result.fail_reason, result.output)
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
if not os.path.isabs(file_name):
_Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
- allowed_files = set([
- constants.CLUSTER_CONF_FILE,
- constants.ETC_HOSTS,
- constants.SSH_KNOWN_HOSTS_FILE,
- constants.VNC_PASSWORD_FILE,
- constants.RAPI_CERT_FILE,
- constants.RAPI_USERS_FILE,
- ])
-
- for hv_name in constants.HYPER_TYPES:
- hv_class = hypervisor.GetHypervisor(hv_name)
- allowed_files.update(hv_class.GetAncillaryFiles())
-
- if file_name not in allowed_files:
+ if file_name not in _ALLOWED_UPLOAD_FILES:
_Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
file_name)
return detail
-def _OSOndiskVersion(name, os_dir):
+def _OSOndiskAPIVersion(name, os_dir):
"""Compute and return the API version of a given OS.
This function will try to read the API version of the OS given by
data holding either the vaid versions or an error message
"""
- api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
+ api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
try:
st = os.stat(api_file)
except EnvironmentError, err:
- return False, ("Required file 'ganeti_api_version' file not"
- " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
+ return False, ("Required file '%s' not found under path %s: %s" %
+ (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
- return False, ("File 'ganeti_api_version' file at %s is not"
- " a regular file" % os_dir)
+ return False, ("File '%s' in %s is not a regular file" %
+ (constants.OS_API_FILE, os_dir))
try:
- f = open(api_file)
- try:
- api_versions = f.readlines()
- finally:
- f.close()
+ api_versions = utils.ReadFile(api_file).splitlines()
except EnvironmentError, err:
return False, ("Error while reading the API version file at %s: %s" %
(api_file, _ErrnoOrStr(err)))
- api_versions = [version.strip() for version in api_versions]
try:
- api_versions = [int(version) for version in api_versions]
+ api_versions = [int(version.strip()) for version in api_versions]
except (TypeError, ValueError), err:
return False, ("API version(s) can't be converted to integer: %s" %
str(err))
try:
f_names = utils.ListVisibleFiles(dir_name)
except EnvironmentError, err:
- logging.exception("Can't list the OS directory %s", dir_name)
+ logging.exception("Can't list the OS directory %s: %s", dir_name, err)
break
for name in f_names:
os_path = os.path.sep.join([dir_name, name])
else:
os_dir = os.path.sep.join([base_dir, name])
- status, api_versions = _OSOndiskVersion(name, os_dir)
+ status, api_versions = _OSOndiskAPIVersion(name, os_dir)
if not status:
# push the error up
return status, api_versions
- if constants.OS_API_VERSION not in api_versions:
+ if not constants.OS_API_VERSIONS.intersection(api_versions):
return False, ("API version mismatch for path '%s': found %s, want %s." %
- (os_dir, api_versions, constants.OS_API_VERSION))
+ (os_dir, api_versions, constants.OS_API_VERSIONS))
# OS Scripts dictionary, we will populate it with the actual script names
os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
return payload
-def OSEnvironment(instance, debug=0):
+def OSEnvironment(instance, os, debug=0):
"""Calculate the environment for an os script.
@type instance: L{objects.Instance}
@param instance: target instance for the os script run
+ @type os: L{objects.OS}
+ @param os: operating system for which the environment is being built
@type debug: integer
@param debug: debug level (0 or 1, for OS Api 10)
@rtype: dict
"""
result = {}
- result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
+ api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
+ result['OS_API_VERSION'] = '%d' % api_version
result['INSTANCE_NAME'] = instance.name
result['INSTANCE_OS'] = instance.os
result['HYPERVISOR'] = instance.hypervisor
@rtype: None
"""
- export_env = OSEnvironment(instance)
-
inst_os = OSFromDisk(instance.os)
+ export_env = OSEnvironment(instance, inst_os)
+
export_script = inst_os.export_script
logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
# the target command is built out of three individual commands,
# which are joined by pipes; we check each individual command for
# valid parameters
- expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
- export_script, logfile)
+ expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
+ inst_os.path, export_script, logfile)
comprcmd = "gzip"
# all commands have been checked, so we're safe to combine them
command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
- result = utils.RunCmd(command, env=export_env)
+ result = utils.RunCmd(["bash", "-c", command], env=export_env)
if result.failed:
_Fail("OS snapshot export command '%s' returned error: %s"
@return: each boolean represent the success of importing the n-th disk
"""
- import_env = OSEnvironment(instance)
inst_os = OSFromDisk(instance.os)
+ import_env = OSEnvironment(instance, inst_os)
import_script = inst_os.import_script
logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
@param file_storage_dir: the directory we should cleanup
@rtype: tuple (success,)
@return: tuple of one element, C{success}, denoting
- whether the operation was successfull
+ whether the operation was successful
"""
file_storage_dir = _TransformFileStorageDir(file_storage_dir)
master, myself = ssconf.GetMasterAndMyself()
if master == myself:
_Fail("ssconf status shows I'm the master node, will not demote")
- pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+ pid_file = utils.DaemonPidFileName(constants.MASTERD)
if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
_Fail("The master daemon is running, will not demote")
try:
- utils.CreateBackup(constants.CLUSTER_CONF_FILE)
+ if os.path.isfile(constants.CLUSTER_CONF_FILE):
+ utils.CreateBackup(constants.CLUSTER_CONF_FILE)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
_Fail("Error while backing up cluster file: %s", err, exc=True)
min_resync = 100
alldone = True
- failure = False
for rd in bdevs:
stats = rd.GetProcStatus()
if not (stats.is_connected or stats.is_in_resync):
hyper = hypervisor.GetHypervisor(hypervisor_type)
try:
pid = os.fork()
- except OSError, err:
+ except OSError:
# if we can't fork, we'll pretend that we're in the child process
pid = 0
if pid > 0:
dir_name = "%s/%s" % (self._BASE_DIR, subdir)
try:
dir_contents = utils.ListVisibleFiles(dir_name)
- except OSError, err:
+ except OSError:
# FIXME: must log output in case of failures
return rr
try:
utils.WriteFile(fpath, data=fdata)
except EnvironmentError, err:
- logging.exception("Can't update bdev cache for %s", dev_path)
+ logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
@classmethod
def RemoveCache(cls, dev_path):
try:
utils.RemoveFile(fpath)
except EnvironmentError, err:
- logging.exception("Can't update bdev cache for %s", dev_path)
+ logging.exception("Can't update bdev cache for %s: %s", dev_path, err)