from ganeti import ssconf
-def _GetSshRunner():
- return ssh.SshRunner()
+def _GetConfig():
+ return ssconf.SimpleConfigReader()
-def _GetMasterInfo():
- """Return the master ip and netdev.
+def _GetSshRunner(cluster_name):
+ return ssh.SshRunner(cluster_name)
+
+
+def _CleanDirectory(path, exclude=[]):
+ """Removes all regular files in a directory.
+
+ @param exclude: List of files to be excluded.
+ @type exclude: list
+
+ """
+ if not os.path.isdir(path):
+ return
+
+ # Normalize excluded paths
+ exclude = [os.path.normpath(i) for i in exclude]
+
+ for rel_name in utils.ListVisibleFiles(path):
+ full_name = os.path.normpath(os.path.join(path, rel_name))
+ if full_name in exclude:
+ continue
+ if os.path.isfile(full_name) and not os.path.islink(full_name):
+ utils.RemoveFile(full_name)
+
+
+def JobQueuePurge():
+ """Removes job queue files and archived jobs
+
+ """
+ _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
+ _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def GetMasterInfo():
+ """Returns master information.
+
+ This is an utility function to compute master information, either
+ for consumption here or from the node daemon.
+
+ @rtype: tuple
+ @return: (master_netdev, master_ip, master_name)
"""
try:
- ss = ssconf.SimpleStore()
- master_netdev = ss.GetMasterNetdev()
- master_ip = ss.GetMasterIP()
+ cfg = _GetConfig()
+ master_netdev = cfg.GetMasterNetdev()
+ master_ip = cfg.GetMasterIP()
+ master_node = cfg.GetMasterNode()
except errors.ConfigurationError, err:
logging.exception("Cluster configuration incomplete")
return (None, None)
- return (master_netdev, master_ip)
+ return (master_netdev, master_ip, master_node)
def StartMaster(start_daemons):
"""
ok = True
- master_netdev, master_ip = _GetMasterInfo()
+ master_netdev, master_ip, _ = GetMasterInfo()
if not master_netdev:
return False
stop the master daemons (ganet-masterd and ganeti-rapi).
"""
- master_netdev, master_ip = _GetMasterInfo()
+ master_netdev, master_ip, _ = GetMasterInfo()
if not master_netdev:
return False
"""Cleans up the current node and prepares it to be removed from the cluster.
"""
- def _CleanDirectory(path):
- if not os.path.isdir(path):
- return
- for rel_name in utils.ListVisibleFiles(path):
- full_name = os.path.join(path, rel_name)
- if os.path.isfile(full_name) and not os.path.islink(full_name):
- utils.RemoveFile(full_name)
-
_CleanDirectory(constants.DATA_DIR)
-
- # Remove job queue files and archived jobs
- _CleanDirectory(constants.QUEUE_DIR)
- _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+ JobQueuePurge()
try:
priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
raise errors.QuitGanetiException(False, 'Shutdown scheduled')
-def GetNodeInfo(vgname):
+def GetNodeInfo(vgname, hypervisor_type):
"""Gives back a hash with different informations about the node.
- Returns:
- { 'vg_size' : xxx, 'vg_free' : xxx, 'memory_domain0': xxx,
- 'memory_free' : xxx, 'memory_total' : xxx }
- where
- vg_size is the size of the configured volume group in MiB
- vg_free is the free size of the volume group in MiB
- memory_dom0 is the memory allocated for domain0 in MiB
- memory_free is the currently available (free) ram in MiB
- memory_total is the total number of ram in MiB
+ @type vgname: C{string}
+ @param vgname: the name of the volume group to ask for disk space information
+ @type hypervisor_type: C{str}
+ @param hypervisor_type: the name of the hypervisor to ask for
+ memory information
+ @rtype: C{dict}
+ @return: dictionary with the following keys:
+ - vg_size is the size of the configured volume group in MiB
+ - vg_free is the free size of the volume group in MiB
+ - memory_dom0 is the memory allocated for domain0 in MiB
+ - memory_free is the currently available (free) ram in MiB
+ - memory_total is the total number of ram in MiB
"""
outputarray = {}
outputarray['vg_size'] = vginfo['vg_size']
outputarray['vg_free'] = vginfo['vg_free']
- hyper = hypervisor.GetHypervisor()
+ hyper = hypervisor.GetHypervisor(hypervisor_type)
hyp_info = hyper.GetNodeInfo()
if hyp_info is not None:
outputarray.update(hyp_info)
return outputarray
-def VerifyNode(what):
+def VerifyNode(what, cluster_name):
"""Verify the status of the local node.
- Args:
- what - a dictionary of things to check:
- 'filelist' : list of files for which to compute checksums
- 'nodelist' : list of nodes we should check communication with
- 'hypervisor': run the hypervisor-specific verify
+ Based on the input L{what} parameter, various checks are done on the
+ local node.
+
+ If the I{filelist} key is present, this list of
+ files is checksummed and the file/checksum pairs are returned.
+
+ If the I{nodelist} key is present, we check that we have
+ connectivity via ssh with the target nodes (and check the hostname
+ report).
+
+ If the I{node-net-test} key is present, we check that we have
+ connectivity to the given nodes via both primary IP and, if
+ applicable, secondary IPs.
- Requested files on local node are checksummed and the result returned.
+ @type what: C{dict}
+ @param what: a dictionary of things to check:
+ - filelist: list of files for which to compute checksums
+ - nodelist: list of nodes we should check ssh communication with
+ - node-net-test: list of nodes we should check node daemon port
+ connectivity with
+ - hypervisor: list with hypervisors to run the verify for
- The nodelist is traversed, with the following checks being made
- for each node:
- - known_hosts key correct
- - correct resolving of node name (target node returns its own hostname
- by ssh-execution of 'hostname', result compared against name in list.
"""
result = {}
if 'hypervisor' in what:
- result['hypervisor'] = hypervisor.GetHypervisor().Verify()
+ result['hypervisor'] = my_dict = {}
+ for hv_name in what['hypervisor']:
+ my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
if 'filelist' in what:
result['filelist'] = utils.FingerprintFiles(what['filelist'])
result['nodelist'] = {}
random.shuffle(what['nodelist'])
for node in what['nodelist']:
- success, message = _GetSshRunner().VerifyNodeHostname(node)
+ success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
if not success:
result['nodelist'][node] = message
if 'node-net-test' in what:
" primary/secondary IP"
" in the node list")
else:
- port = ssconf.SimpleStore().GetNodeDaemonPort()
+ port = utils.GetNodeDaemonPort()
for name, pip, sip in what['node-net-test']:
fail = []
if not utils.TcpPing(pip, port, source=my_pip):
return True
-def GetInstanceList():
+def GetInstanceList(hypervisor_list):
"""Provides a list of instances.
- Returns:
- A list of all running instances on the current node
- - instance1.example.com
- - instance2.example.com
+ @type hypervisor_list: list
+ @param hypervisor_list: the list of hypervisors to query information
+
+ @rtype: list
+ @return: a list of all running instances on the current node
+ - instance1.example.com
+ - instance2.example.com
"""
- try:
- names = hypervisor.GetHypervisor().ListInstances()
- except errors.HypervisorError, err:
- logging.exception("Error enumerating instances")
- raise
+ results = []
+ for hname in hypervisor_list:
+ try:
+ names = hypervisor.GetHypervisor(hname).ListInstances()
+ results.extend(names)
+ except errors.HypervisorError, err:
+ logging.exception("Error enumerating instances for hypevisor %s", hname)
+ # FIXME: should we somehow not propagate this to the master?
+ raise
- return names
+ return results
-def GetInstanceInfo(instance):
+def GetInstanceInfo(instance, hname):
"""Gives back the informations about an instance as a dictionary.
- Args:
- instance: name of the instance (ex. instance1.example.com)
+ @type instance: string
+ @param instance: the instance name
+ @type hname: string
+ @param hname: the hypervisor type of the instance
- Returns:
- { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
- where
- memory: memory size of instance (int)
- state: xen state of instance (string)
- time: cpu time of instance (float)
+ @rtype: dict
+ @return: dictionary with the following keys:
+ - memory: memory size of instance (int)
+ - state: xen state of instance (string)
+ - time: cpu time of instance (float)
"""
output = {}
- iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
+ iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
if iinfo is not None:
output['memory'] = iinfo[2]
output['state'] = iinfo[4]
return output
-def GetAllInstancesInfo():
+def GetAllInstancesInfo(hypervisor_list):
"""Gather data about all instances.
This is the equivalent of `GetInstanceInfo()`, except that it
computes data for all instances at once, thus being faster if one
needs data about more than one instance.
- Returns: a dictionary of dictionaries, keys being the instance name,
- and with values:
- { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
- where
- memory: memory size of instance (int)
- state: xen state of instance (string)
- time: cpu time of instance (float)
- vcpus: the number of cpus
+ @type hypervisor_list: list
+ @param hypervisor_list: list of hypervisors to query for instance data
+
+ @rtype: dict of dicts
+ @return: dictionary of instance: data, with data having the following keys:
+ - memory: memory size of instance (int)
+ - state: xen state of instance (string)
+ - time: cpu time of instance (float)
+ - vcpuus: the number of vcpus
"""
output = {}
- iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
- if iinfo:
- for name, inst_id, memory, vcpus, state, times in iinfo:
- output[name] = {
- 'memory': memory,
- 'vcpus': vcpus,
- 'state': state,
- 'time': times,
- }
+ for hname in hypervisor_list:
+ iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
+ if iinfo:
+ for name, inst_id, memory, vcpus, state, times in iinfo:
+ if name in output:
+ raise errors.HypervisorError("Instance %s running duplicate" % name)
+ output[name] = {
+ 'memory': memory,
+ 'vcpus': vcpus,
+ 'state': state,
+ 'time': times,
+ }
return output
inst_os.path, create_script, instance.name,
real_os_dev.dev_path, real_swap_dev.dev_path,
logfile)
+ env = {'HYPERVISOR': instance.hypervisor}
- result = utils.RunCmd(command)
+ result = utils.RunCmd(command, env=env)
if result.failed:
logging.error("os create command '%s' returned error: %s, logfile: %s,"
" output: %s", command, result.fail_reason, logfile,
def StartInstance(instance, extra_args):
"""Start an instance.
- Args:
- instance - name of instance to start.
+ @type instance: instance object
+ @param instance: the instance object
+ @rtype: boolean
+ @return: whether the startup was successful or not
"""
- running_instances = GetInstanceList()
+ running_instances = GetInstanceList([instance.hypervisor])
if instance.name in running_instances:
return True
block_devices = _GatherBlockDevs(instance)
- hyper = hypervisor.GetHypervisor()
+ hyper = hypervisor.GetHypervisor(instance.hypervisor)
try:
hyper.StartInstance(instance, block_devices, extra_args)
def ShutdownInstance(instance):
"""Shut an instance down.
- Args:
- instance - name of instance to shutdown.
+ @type instance: instance object
+ @param instance: the instance object
+ @rtype: boolean
+ @return: whether the startup was successful or not
"""
- running_instances = GetInstanceList()
+ hv_name = instance.hypervisor
+ running_instances = GetInstanceList([hv_name])
if instance.name not in running_instances:
return True
- hyper = hypervisor.GetHypervisor()
+ hyper = hypervisor.GetHypervisor(hv_name)
try:
hyper.StopInstance(instance)
except errors.HypervisorError, err:
time.sleep(1)
for dummy in range(11):
- if instance.name not in GetInstanceList():
+ if instance.name not in GetInstanceList([hv_name]):
break
time.sleep(10)
else:
return False
time.sleep(1)
- if instance.name in GetInstanceList():
+ if instance.name in GetInstanceList([hv_name]):
logging.error("could not shutdown instance '%s' even by destroy",
instance.name)
return False
reboot_type - how to reboot [soft,hard,full]
"""
- running_instances = GetInstanceList()
+ running_instances = GetInstanceList([instance.hypervisor])
if instance.name not in running_instances:
logging.error("Cannot reboot instance that is not running")
return False
- hyper = hypervisor.GetHypervisor()
+ hyper = hypervisor.GetHypervisor(instance.hypervisor)
if reboot_type == constants.INSTANCE_REBOOT_SOFT:
try:
hyper.RebootInstance(instance)
else:
raise errors.ParameterError("reboot_type invalid")
-
return True
def MigrateInstance(instance, target, live):
"""Migrates an instance to another node.
+ @type instance: C{objects.Instance}
+ @param instance: the instance definition
+ @type target: string
+ @param target: the target node name
+ @type live: boolean
+ @param live: whether the migration should be done live or not (the
+ interpretation of this parameter is left to the hypervisor)
+ @rtype: tuple
+ @return: a tuple of (success, msg) where:
+ - succes is a boolean denoting the success/failure of the operation
+ - msg is a string with details in case of failure
+
"""
- hyper = hypervisor.GetHypervisor()
+ hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
try:
- hyper.MigrateInstance(instance, target, live)
+ hyper.MigrateInstance(instance.name, target, live)
except errors.HypervisorError, err:
msg = "Failed to migrate instance: %s" % str(err)
logging.error(msg)
return rbd
return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
-def _IsJobQueueFile(file_name):
- queue_dir = os.path.normpath(constants.QUEUE_DIR)
- return os.path.commonprefix([queue_dir, file_name]) == queue_dir
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
"""Write a file to the filesystem.
constants.SSH_KNOWN_HOSTS_FILE,
constants.VNC_PASSWORD_FILE,
]
- allowed_files.extend(ssconf.SimpleStore().GetFileList())
- if not (file_name in allowed_files or _IsJobQueueFile(file_name)):
+ if file_name not in allowed_files:
logging.error("Filename passed to UploadFile not in allowed"
" upload targets: '%s'", file_name)
return False
(disk.unique_id, disk.dev_type))
-def ExportSnapshot(disk, dest_node, instance):
+def ExportSnapshot(disk, dest_node, instance, cluster_name):
"""Export a block device snapshot to a remote node.
Args:
destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
destdir, destdir, destfile)
- remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
- destcmd)
+ remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+ constants.GANETI_RUNAS,
+ destcmd)
# all commands have been checked, so we're safe to combine them
command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
return config
-def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
+def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
+ cluster_name):
"""Import an os image into an instance.
Args:
os.mkdir(constants.LOG_OS_DIR, 0750)
destcmd = utils.BuildShellCmd('cat %s', src_image)
- remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
- destcmd)
+ remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
+ constants.GANETI_RUNAS,
+ destcmd)
comprcmd = "gunzip"
impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
logfile)
command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+ env = {'HYPERVISOR': instance.hypervisor}
- result = utils.RunCmd(command)
+ result = utils.RunCmd(command, env=env)
if result.failed:
logging.error("os import command '%s' returned error: %s"
normalized file_storage_dir (string) if valid, None otherwise
"""
+ cfg = _GetConfig()
file_storage_dir = os.path.normpath(file_storage_dir)
- base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
+ base_file_storage_dir = cfg.GetFileStorageDir()
if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
base_file_storage_dir):
logging.error("file storage directory '%s' is not under base file"
return result
+def _IsJobQueueFile(file_name):
+ """Checks whether the given filename is in the queue directory.
+
+ """
+ queue_dir = os.path.normpath(constants.QUEUE_DIR)
+ result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+ if not result:
+ logging.error("'%s' is not a file in the queue directory",
+ file_name)
+
+ return result
+
+
+def JobQueueUpdate(file_name, content):
+ """Updates a file in the queue directory.
+
+ """
+ if not _IsJobQueueFile(file_name):
+ return False
+
+ # Write and replace the file atomically
+ utils.WriteFile(file_name, data=content)
+
+ return True
+
+
+def JobQueueRename(old, new):
+ """Renames a job queue file.
+
+ """
+ if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+ return False
+
+ os.rename(old, new)
+
+ return True
+
+
def CloseBlockDevices(disks):
"""Closes the given block devices.