import random
import logging
import tempfile
+import zlib
+import base64
from ganeti import errors
from ganeti import utils
return ssh.SshRunner(cluster_name)
-def _CleanDirectory(path, exclude=[]):
+def _Decompress(data):
+ """Unpacks data compressed by the RPC client.
+
+ @type data: list or tuple
+ @param data: Data sent by RPC client
+ @rtype: str
+ @return: Decompressed data
+
+ """
+ assert isinstance(data, (list, tuple))
+ assert len(data) == 2
+ (encoding, content) = data
+ if encoding == constants.RPC_ENCODING_NONE:
+ return content
+ elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
+ return zlib.decompress(base64.b64decode(content))
+ else:
+ raise AssertionError("Unknown data encoding")
+
+
+def _CleanDirectory(path, exclude=None):
"""Removes all regular files in a directory.
@type path: str
@type exclude: list
@param exclude: list of files to be excluded, defaults
to the empty list
- @rtype: None
"""
if not os.path.isdir(path):
return
-
- # Normalize excluded paths
- exclude = [os.path.normpath(i) for i in exclude]
+ if exclude is None:
+ exclude = []
+ else:
+ # Normalize excluded paths
+ exclude = [os.path.normpath(i) for i in exclude]
for rel_name in utils.ListVisibleFiles(path):
full_name = os.path.normpath(os.path.join(path, rel_name))
from the cluster.
If processing is successful, then it raises an
- L{errors.GanetiQuitException} which is used as a special case to
+ L{errors.QuitGanetiException} which is used as a special case to
shutdown the node daemon.
"""
"""
result = {}
- if 'hypervisor' in what:
- result['hypervisor'] = my_dict = {}
- for hv_name in what['hypervisor']:
- my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+ if constants.NV_HYPERVISOR in what:
+ result[constants.NV_HYPERVISOR] = tmp = {}
+ for hv_name in what[constants.NV_HYPERVISOR]:
+ tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
- if 'filelist' in what:
- result['filelist'] = utils.FingerprintFiles(what['filelist'])
+ if constants.NV_FILELIST in what:
+ result[constants.NV_FILELIST] = utils.FingerprintFiles(
+ what[constants.NV_FILELIST])
- if 'nodelist' in what:
- result['nodelist'] = {}
- random.shuffle(what['nodelist'])
- for node in what['nodelist']:
+ if constants.NV_NODELIST in what:
+ result[constants.NV_NODELIST] = tmp = {}
+ random.shuffle(what[constants.NV_NODELIST])
+ for node in what[constants.NV_NODELIST]:
success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
if not success:
- result['nodelist'][node] = message
- if 'node-net-test' in what:
- result['node-net-test'] = {}
+ tmp[node] = message
+
+ if constants.NV_NODENETTEST in what:
+ result[constants.NV_NODENETTEST] = tmp = {}
my_name = utils.HostInfo().name
my_pip = my_sip = None
- for name, pip, sip in what['node-net-test']:
+ for name, pip, sip in what[constants.NV_NODENETTEST]:
if name == my_name:
my_pip = pip
my_sip = sip
break
if not my_pip:
- result['node-net-test'][my_name] = ("Can't find my own"
- " primary/secondary IP"
- " in the node list")
+ tmp[my_name] = ("Can't find my own primary/secondary IP"
+ " in the node list")
else:
port = utils.GetNodeDaemonPort()
- for name, pip, sip in what['node-net-test']:
+ for name, pip, sip in what[constants.NV_NODENETTEST]:
fail = []
if not utils.TcpPing(pip, port, source=my_pip):
fail.append("primary")
if not utils.TcpPing(sip, port, source=my_sip):
fail.append("secondary")
if fail:
- result['node-net-test'][name] = ("failure using the %s"
- " interface(s)" %
- " and ".join(fail))
+ tmp[name] = ("failure using the %s interface(s)" %
+ " and ".join(fail))
+
+ if constants.NV_LVLIST in what:
+ result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+
+ if constants.NV_INSTANCELIST in what:
+ result[constants.NV_INSTANCELIST] = GetInstanceList(
+ what[constants.NV_INSTANCELIST])
+
+ if constants.NV_VGLIST in what:
+ result[constants.NV_VGLIST] = ListVolumeGroups()
+
+ if constants.NV_VERSION in what:
+ result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
+
+ if constants.NV_HVINFO in what:
+ hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
+ result[constants.NV_HVINFO] = hyper.GetNodeInfo()
return result
return retdic
-def _GatherBlockDevs(instance):
+def _SymlinkBlockDev(instance_name, device_path, device_name):
+ """Set up symlinks to a instance's block device.
+
+ This is an auxiliary function run when an instance is start (on the primary
+ node) or when an instance is migrated (on the target node).
+
+ Args:
+ instance_name: the name of the target instance
+ device_path: path of the physical block device, on the node
+ device_name: 'virtual' name of the device
+
+ Returns:
+ absolute path to the disk's symlink
+
+ """
+ link_basename = "%s-%s" % (instance_name, device_name)
+ link_name = os.path.join(constants.DISK_LINKS_DIR, link_basename)
+ try:
+ os.symlink(device_path, link_name)
+ except OSError, e:
+ if e.errno == errno.EEXIST:
+ if (not os.path.islink(link_name) or
+ os.readlink(link_name) != device_path):
+ os.remove(link_name)
+ os.symlink(device_path, link_name)
+ else:
+ raise
+
+ return link_name
+
+
+def _GatherAndLinkBlockDevs(instance):
"""Set up an instance's block device(s).
This is run on the primary node at instance startup. The block
@type instance: L{objects.Instance}
@param instance: the instance whose disks we shoul assemble
- @rtype: list of L{bdev.BlockDev}
- @return: list of the block devices
+ @rtype: list
+ @return: list of (disk_object, device_path)
"""
block_devices = []
- for disk in instance.disks:
+ for idx, disk in enumerate(instance.disks):
device = _RecursiveFindBD(disk)
if device is None:
raise errors.BlockDeviceError("Block device '%s' is not set up." %
str(disk))
device.Open()
- block_devices.append((disk, device))
+ try:
+ link_name = _SymlinkBlockDev(instance.name, device.dev_path,
+ "disk%d" % idx)
+ except OSError, e:
+ raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
+ e.strerror)
+
+ block_devices.append((disk, link_name))
+
return block_devices
if instance.name in running_instances:
return True
- block_devices = _GatherBlockDevs(instance)
+ block_devices = _GatherAndLinkBlockDevs(instance)
hyper = hypervisor.GetHypervisor(instance.hypervisor)
try:
return False
# test every 10secs for 2min
- shutdown_ok = False
time.sleep(1)
for dummy in range(11):
@note: This is intended to be called recursively.
- @type disk: L{objects.disk}
+ @type disk: L{objects.Disk}
@param disk: the disk object we should remove
@rtype: boolean
@return: the success of the operation
"""
try:
- # since we are removing the device, allow a partial match
- # this allows removal of broken mirrors
- rdev = _RecursiveFindBD(disk, allow_partial=True)
+ rdev = _RecursiveFindBD(disk)
except errors.BlockDeviceError, err:
# probably can't attach
logging.info("Can't attach to device %s in remove", disk)
def ShutdownBlockDevice(disk):
"""Shut down a block device.
- First, if the device is assembled (can L{Attach()}), then the device
- is shutdown. Then the children of the device are shutdown.
+ First, if the device is assembled (Attach() is successfull), then
+ the device is shutdown. Then the children of the device are
+ shutdown.
This function is called recursively. Note that we don't cache the
children or such, as oppossed to assemble, shutdown of different
@return: the success of the operation
"""
- parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
+ parent_bdev = _RecursiveFindBD(parent_cdev)
if parent_bdev is None:
logging.error("Can't find parent device")
return False
@rtype: disk
@return:
a list of (mirror_done, estimated_time) tuples, which
- are the result of L{bdev.BlockDevice.CombinedSyncStatus}
+ are the result of L{bdev.BlockDev.CombinedSyncStatus}
@raise errors.BlockDeviceError: if any of the disks cannot be
found
return stats
-def _RecursiveFindBD(disk, allow_partial=False):
+def _RecursiveFindBD(disk):
"""Check if a device is activated.
If so, return informations about the real device.
@type disk: L{objects.Disk}
@param disk: the disk object we need to find
- @type allow_partial: boolean
- @param allow_partial: if true, don't abort the find if a
- child of the device can't be found; this is intended
- to be used when repairing mirrors
@return: None if the device can't be found,
otherwise the device instance
" upload targets: '%s'", file_name)
return False
- utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
+ raw_data = _Decompress(data)
+
+ utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
atime=atime, mtime=mtime)
return True
instance.beparams[constants.BE_VCPUS])
config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
- nic_count = 0
+ nic_total = 0
for nic_count, nic in enumerate(instance.nics):
+ nic_total += 1
config.set(constants.INISECT_INS, 'nic%d_mac' %
nic_count, '%s' % nic.mac)
config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
'%s' % nic.bridge)
# TODO: redundant: on load can read nics until it doesn't exist
- config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
+ config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
disk_total = 0
for disk_count, disk in enumerate(snap_disks):
return False
# Write and replace the file atomically
- utils.WriteFile(file_name, data=content)
+ utils.WriteFile(file_name, data=_Decompress(content))
return True
def JobQueueRename(old, new):
"""Renames a job queue file.
- This is just a wrapper over L{os.rename} with proper checking.
+ This is just a wrapper over os.rename with proper checking.
@type old: str
@param old: the old (actual) file name
if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
return False
- os.rename(old, new)
+ utils.RenameFile(old, new, mkdir=True)
return True
return (False, str(err))
+def DemoteFromMC():
+ """Demotes the current node from master candidate role.
+
+ """
+ # try to ensure we're not the master by mistake
+ master, myself = ssconf.GetMasterAndMyself()
+ if master == myself:
+ return (False, "ssconf status shows I'm the master node, will not demote")
+ pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+ if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
+ return (False, "The master daemon is running, will not demote")
+ try:
+ utils.CreateBackup(constants.CLUSTER_CONF_FILE)
+ except EnvironmentError, err:
+ if err.errno != errno.ENOENT:
+ return (False, "Error while backing up cluster file: %s" % str(err))
+ utils.RemoveFile(constants.CLUSTER_CONF_FILE)
+ return (True, "Done")
+
+
class HooksRunner(object):
"""Hook runner.
node nor not
@type iv_name: str
@param iv_name: the instance-visible name of the
- device, as in L{objects.Disk.iv_name}
+ device, as in objects.Disk.iv_name
@rtype: None