X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/19d7f90acdbd52e2aef1039ca2db6be348249d09..e9ce0a64358758dfdd28cdb13e19ea7fb72ac96c:/lib/backend.py diff --git a/lib/backend.py b/lib/backend.py index 6aa8c83..c956ecc 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -33,6 +33,8 @@ import subprocess import random import logging import tempfile +import zlib +import base64 from ganeti import errors from ganeti import utils @@ -45,25 +47,65 @@ from ganeti import ssconf def _GetConfig(): - return ssconf.SimpleConfigReader() + """Simple wrapper to return a SimpleStore. + + @rtype: L{ssconf.SimpleStore} + @return: a SimpleStore instance + + """ + return ssconf.SimpleStore() def _GetSshRunner(cluster_name): + """Simple wrapper to return an SshRunner. + + @type cluster_name: str + @param cluster_name: the cluster name, which is needed + by the SshRunner constructor + @rtype: L{ssh.SshRunner} + @return: an SshRunner instance + + """ return ssh.SshRunner(cluster_name) -def _CleanDirectory(path, exclude=[]): +def _Decompress(data): + """Unpacks data compressed by the RPC client. + + @type data: list or tuple + @param data: Data sent by RPC client + @rtype: str + @return: Decompressed data + + """ + assert isinstance(data, (list, tuple)) + assert len(data) == 2 + (encoding, content) = data + if encoding == constants.RPC_ENCODING_NONE: + return content + elif encoding == constants.RPC_ENCODING_ZLIB_BASE64: + return zlib.decompress(base64.b64decode(content)) + else: + raise AssertionError("Unknown data encoding") + + +def _CleanDirectory(path, exclude=None): """Removes all regular files in a directory. - @param exclude: List of files to be excluded. + @type path: str + @param path: the directory to clean @type exclude: list + @param exclude: list of files to be excluded, defaults + to the empty list """ if not os.path.isdir(path): return - - # Normalize excluded paths - exclude = [os.path.normpath(i) for i in exclude] + if exclude is None: + exclude = [] + else: + # Normalize excluded paths + exclude = [os.path.normpath(i) for i in exclude] for rel_name in utils.ListVisibleFiles(path): full_name = os.path.normpath(os.path.join(path, rel_name)) @@ -74,7 +116,9 @@ def _CleanDirectory(path, exclude=[]): def JobQueuePurge(): - """Removes job queue files and archived jobs + """Removes job queue files and archived jobs. + + @rtype: None """ _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) @@ -88,7 +132,8 @@ def GetMasterInfo(): for consumption here or from the node daemon. @rtype: tuple - @return: (master_netdev, master_ip, master_name) + @return: (master_netdev, master_ip, master_name) if we have a good + configuration, otherwise (None, None, None) """ try: @@ -98,7 +143,7 @@ def GetMasterInfo(): master_node = cfg.GetMasterNode() except errors.ConfigurationError, err: logging.exception("Cluster configuration incomplete") - return (None, None) + return (None, None, None) return (master_netdev, master_ip, master_node) @@ -106,9 +151,13 @@ def StartMaster(start_daemons): """Activate local node as master node. The function will always try activate the IP address of the master - (if someone else has it, then it won't). Then, if the start_daemons - parameter is True, it will also start the master daemons - (ganet-masterd and ganeti-rapi). + (unless someone else has it). It will also start the master daemons, + based on the start_daemons parameter. + + @type start_daemons: boolean + @param start_daemons: whther to also start the master + daemons (ganeti-masterd and ganeti-rapi) + @rtype: None """ ok = True @@ -149,8 +198,13 @@ def StopMaster(stop_daemons): """Deactivate this node as master. The function will always try to deactivate the IP address of the - master. Then, if the stop_daemons parameter is True, it will also - stop the master daemons (ganet-masterd and ganeti-rapi). + master. It will also stop the master daemons depending on the + stop_daemons parameter. + + @type stop_daemons: boolean + @param stop_daemons: whether to also stop the master daemons + (ganeti-masterd and ganeti-rapi) + @rtype: None """ master_netdev, master_ip, _ = GetMasterInfo() @@ -179,6 +233,21 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub): - adds the ssh private key to the user - adds the ssh public key to the users' authorized_keys file + @type dsa: str + @param dsa: the DSA private key to write + @type dsapub: str + @param dsapub: the DSA public key to write + @type rsa: str + @param rsa: the RSA private key to write + @type rsapub: str + @param rsapub: the RSA public key to write + @type sshkey: str + @param sshkey: the SSH private key to write + @type sshpub: str + @param sshpub: the SSH public key to write + @rtype: boolean + @return: the success of the operation + """ sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600), (constants.SSH_HOST_RSA_PUB, rsapub, 0644), @@ -205,7 +274,14 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub): def LeaveCluster(): - """Cleans up the current node and prepares it to be removed from the cluster. + """Cleans up and remove the current node. + + This function cleans up and prepares the current node to be removed + from the cluster. + + If processing is successful, then it raises an + L{errors.QuitGanetiException} which is used as a special case to + shutdown the node daemon. """ _CleanDirectory(constants.DATA_DIR) @@ -290,41 +366,45 @@ def VerifyNode(what, cluster_name): - node-net-test: list of nodes we should check node daemon port connectivity with - hypervisor: list with hypervisors to run the verify for + @rtype: dict + @return: a dictionary with the same keys as the input dict, and + values representing the result of the checks """ result = {} - if 'hypervisor' in what: - result['hypervisor'] = my_dict = {} - for hv_name in what['hypervisor']: - my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify() + if constants.NV_HYPERVISOR in what: + result[constants.NV_HYPERVISOR] = tmp = {} + for hv_name in what[constants.NV_HYPERVISOR]: + tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify() - if 'filelist' in what: - result['filelist'] = utils.FingerprintFiles(what['filelist']) + if constants.NV_FILELIST in what: + result[constants.NV_FILELIST] = utils.FingerprintFiles( + what[constants.NV_FILELIST]) - if 'nodelist' in what: - result['nodelist'] = {} - random.shuffle(what['nodelist']) - for node in what['nodelist']: + if constants.NV_NODELIST in what: + result[constants.NV_NODELIST] = tmp = {} + random.shuffle(what[constants.NV_NODELIST]) + for node in what[constants.NV_NODELIST]: success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node) if not success: - result['nodelist'][node] = message - if 'node-net-test' in what: - result['node-net-test'] = {} + tmp[node] = message + + if constants.NV_NODENETTEST in what: + result[constants.NV_NODENETTEST] = tmp = {} my_name = utils.HostInfo().name my_pip = my_sip = None - for name, pip, sip in what['node-net-test']: + for name, pip, sip in what[constants.NV_NODENETTEST]: if name == my_name: my_pip = pip my_sip = sip break if not my_pip: - result['node-net-test'][my_name] = ("Can't find my own" - " primary/secondary IP" - " in the node list") + tmp[my_name] = ("Can't find my own primary/secondary IP" + " in the node list") else: port = utils.GetNodeDaemonPort() - for name, pip, sip in what['node-net-test']: + for name, pip, sip in what[constants.NV_NODENETTEST]: fail = [] if not utils.TcpPing(pip, port, source=my_pip): fail.append("primary") @@ -332,9 +412,34 @@ def VerifyNode(what, cluster_name): if not utils.TcpPing(sip, port, source=my_sip): fail.append("secondary") if fail: - result['node-net-test'][name] = ("failure using the %s" - " interface(s)" % - " and ".join(fail)) + tmp[name] = ("failure using the %s interface(s)" % + " and ".join(fail)) + + if constants.NV_LVLIST in what: + result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST]) + + if constants.NV_INSTANCELIST in what: + result[constants.NV_INSTANCELIST] = GetInstanceList( + what[constants.NV_INSTANCELIST]) + + if constants.NV_VGLIST in what: + result[constants.NV_VGLIST] = ListVolumeGroups() + + if constants.NV_VERSION in what: + result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION, + constants.RELEASE_VERSION) + + if constants.NV_HVINFO in what: + hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO]) + result[constants.NV_HVINFO] = hyper.GetNodeInfo() + + if constants.NV_DRBDLIST in what: + try: + used_minors = bdev.DRBD8.GetUsedDevs().keys() + except errors.BlockDeviceError: + logging.warning("Can't get used minors list", exc_info=True) + used_minors = [] + result[constants.NV_DRBDLIST] = used_minors return result @@ -342,10 +447,17 @@ def VerifyNode(what, cluster_name): def GetVolumeList(vg_name): """Compute list of logical volumes and their size. - Returns: - dictionary of all partions (key) with their size (in MiB), inactive - and online status: - {'test1': ('20.06', True, True)} + @type vg_name: str + @param vg_name: the volume group whose LVs we should list + @rtype: dict + @return: + dictionary of all partions (key) with value being a tuple of + their size (in MiB), inactive and online status:: + + {'test1': ('20.06', True, True)} + + in case of errors, a string is returned with the error + details. """ lvs = {} @@ -376,8 +488,9 @@ def GetVolumeList(vg_name): def ListVolumeGroups(): """List the volume groups and their size. - Returns: - Dictionary with keys volume name and values the size of the volume + @rtype: dict + @return: dictionary with keys volume name and values the + size of the volume """ return utils.ListVolumeGroups() @@ -386,6 +499,21 @@ def ListVolumeGroups(): def NodeVolumes(): """List all volumes on this node. + @rtype: list + @return: + A list of dictionaries, each having four keys: + - name: the logical volume name, + - size: the size of the logical volume + - dev: the physical device on which the LV lives + - vg: the volume group to which it belongs + + In case of errors, we return an empty list and log the + error. + + Note that since a logical volume can live on multiple physical + volumes, the resulting list might include a logical volume + multiple times. + """ result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", "--separator=|", @@ -393,7 +521,7 @@ def NodeVolumes(): if result.failed: logging.error("Failed to list logical volumes, lvs output: %s", result.output) - return {} + return [] def parse_dev(dev): if '(' in dev: @@ -416,8 +544,8 @@ def NodeVolumes(): def BridgesExist(bridges_list): """Check if a list of bridges exist on the current node. - Returns: - True if all of them exist, false otherwise + @rtype: boolean + @return: C{True} if all of them exist, C{False} otherwise """ for bridge in bridges_list: @@ -435,8 +563,8 @@ def GetInstanceList(hypervisor_list): @rtype: list @return: a list of all running instances on the current node - - instance1.example.com - - instance2.example.com + - instance1.example.com + - instance2.example.com """ results = [] @@ -478,22 +606,46 @@ def GetInstanceInfo(instance, hname): return output +def GetInstanceMigratable(instance): + """Gives whether an instance can be migrated. + + @type instance: L{objects.Instance} + @param instance: object representing the instance to be checked. + + @rtype: tuple + @return: tuple of (result, description) where: + - result: whether the instance can be migrated or not + - description: a description of the issue, if relevant + + """ + hyper = hypervisor.GetHypervisor(instance.hypervisor) + if instance.name not in hyper.ListInstances(): + return (False, 'not running') + + for idx in range(len(instance.disks)): + link_name = _GetBlockDevSymlinkPath(instance.name, idx) + if not os.path.islink(link_name): + return (False, 'not restarted since ganeti 1.2.5') + + return (True, '') + + def GetAllInstancesInfo(hypervisor_list): """Gather data about all instances. - This is the equivalent of `GetInstanceInfo()`, except that it + This is the equivalent of L{GetInstanceInfo}, except that it computes data for all instances at once, thus being faster if one needs data about more than one instance. @type hypervisor_list: list @param hypervisor_list: list of hypervisors to query for instance data - @rtype: dict of dicts + @rtype: dict @return: dictionary of instance: data, with data having the following keys: - memory: memory size of instance (int) - state: xen state of instance (string) - time: cpu time of instance (float) - - vcpuus: the number of vcpus + - vcpus: the number of vcpus """ output = {} @@ -521,80 +673,79 @@ def AddOSToInstance(instance): @type instance: L{objects.Instance} @param instance: Instance whose OS is to be installed + @rtype: boolean + @return: the success of the operation """ inst_os = OSFromDisk(instance.os) - create_script = inst_os.create_script create_env = OSEnvironment(instance) logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os, instance.name, int(time.time())) - if not os.path.exists(constants.LOG_OS_DIR): - os.mkdir(constants.LOG_OS_DIR, 0750) - - command = utils.BuildShellCmd("cd %s && %s &>%s", - inst_os.path, create_script, logfile) - result = utils.RunCmd(command, env=create_env) + result = utils.RunCmd([inst_os.create_script], env=create_env, + cwd=inst_os.path, output=logfile,) if result.failed: logging.error("os create command '%s' returned error: %s, logfile: %s," - " output: %s", command, result.fail_reason, logfile, + " output: %s", result.cmd, result.fail_reason, logfile, result.output) - return False + lines = [val.encode("string_escape") + for val in utils.TailFile(logfile, lines=20)] + return (False, "OS create script failed (%s), last lines in the" + " log file:\n%s" % (result.fail_reason, "\n".join(lines))) - return True + return (True, "Successfully installed") def RunRenameInstance(instance, old_name): """Run the OS rename script for an instance. - @type instance: objects.Instance + @type instance: L{objects.Instance} @param instance: Instance whose OS is to be installed @type old_name: string @param old_name: previous instance name + @rtype: boolean + @return: the success of the operation """ inst_os = OSFromDisk(instance.os) - script = inst_os.rename_script rename_env = OSEnvironment(instance) rename_env['OLD_INSTANCE_NAME'] = old_name logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os, old_name, instance.name, int(time.time())) - if not os.path.exists(constants.LOG_OS_DIR): - os.mkdir(constants.LOG_OS_DIR, 0750) - - command = utils.BuildShellCmd("cd %s && %s &>%s", - inst_os.path, script, logfile) - result = utils.RunCmd(command, env=rename_env) + result = utils.RunCmd([inst_os.rename_script], env=rename_env, + cwd=inst_os.path, output=logfile) if result.failed: logging.error("os create command '%s' returned error: %s output: %s", - command, result.fail_reason, result.output) - return False + result.cmd, result.fail_reason, result.output) + lines = [val.encode("string_escape") + for val in utils.TailFile(logfile, lines=20)] + return (False, "OS rename script failed (%s), last lines in the" + " log file:\n%s" % (result.fail_reason, "\n".join(lines))) - return True + return (True, "Rename successful") def _GetVGInfo(vg_name): """Get informations about the volume group. - Args: - vg_name: the volume group - - Returns: - { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx } - where - vg_size is the total size of the volume group in MiB - vg_free is the free size of the volume group in MiB - pv_count are the number of physical disks in that vg + @type vg_name: str + @param vg_name: the volume group which we query + @rtype: dict + @return: + A dictionary with the following keys: + - C{vg_size} is the total size of the volume group in MiB + - C{vg_free} is the free size of the volume group in MiB + - C{pv_count} are the number of physical disks in that VG - If an error occurs during gathering of data, we return the same dict - with keys all set to None. + If an error occurs during gathering of data, we return the same dict + with keys all set to None. """ retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"]) @@ -621,28 +772,86 @@ def _GetVGInfo(vg_name): return retdic -def _GatherBlockDevs(instance): +def _GetBlockDevSymlinkPath(instance_name, idx): + return os.path.join(constants.DISK_LINKS_DIR, + "%s:%d" % (instance_name, idx)) + + +def _SymlinkBlockDev(instance_name, device_path, idx): + """Set up symlinks to a instance's block device. + + This is an auxiliary function run when an instance is start (on the primary + node) or when an instance is migrated (on the target node). + + + @param instance_name: the name of the target instance + @param device_path: path of the physical block device, on the node + @param idx: the disk index + @return: absolute path to the disk's symlink + + """ + link_name = _GetBlockDevSymlinkPath(instance_name, idx) + try: + os.symlink(device_path, link_name) + except OSError, err: + if err.errno == errno.EEXIST: + if (not os.path.islink(link_name) or + os.readlink(link_name) != device_path): + os.remove(link_name) + os.symlink(device_path, link_name) + else: + raise + + return link_name + + +def _RemoveBlockDevLinks(instance_name, disks): + """Remove the block device symlinks belonging to the given instance. + + """ + for idx, disk in enumerate(disks): + link_name = _GetBlockDevSymlinkPath(instance_name, idx) + if os.path.islink(link_name): + try: + os.remove(link_name) + except OSError: + logging.exception("Can't remove symlink '%s'", link_name) + + +def _GatherAndLinkBlockDevs(instance): """Set up an instance's block device(s). This is run on the primary node at instance startup. The block devices must be already assembled. + @type instance: L{objects.Instance} + @param instance: the instance whose disks we shoul assemble + @rtype: list + @return: list of (disk_object, device_path) + """ block_devices = [] - for disk in instance.disks: + for idx, disk in enumerate(instance.disks): device = _RecursiveFindBD(disk) if device is None: raise errors.BlockDeviceError("Block device '%s' is not set up." % str(disk)) device.Open() - block_devices.append((disk, device)) + try: + link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx) + except OSError, e: + raise errors.BlockDeviceError("Cannot create block device symlink: %s" % + e.strerror) + + block_devices.append((disk, link_name)) + return block_devices def StartInstance(instance, extra_args): """Start an instance. - @type instance: instance object + @type instance: L{objects.Instance} @param instance: the instance object @rtype: boolean @return: whether the startup was successful or not @@ -651,24 +860,29 @@ def StartInstance(instance, extra_args): running_instances = GetInstanceList([instance.hypervisor]) if instance.name in running_instances: - return True - - block_devices = _GatherBlockDevs(instance) - hyper = hypervisor.GetHypervisor(instance.hypervisor) + return (True, "Already running") try: + block_devices = _GatherAndLinkBlockDevs(instance) + hyper = hypervisor.GetHypervisor(instance.hypervisor) hyper.StartInstance(instance, block_devices, extra_args) + except errors.BlockDeviceError, err: + logging.exception("Failed to start instance") + return (False, "Block device error: %s" % str(err)) except errors.HypervisorError, err: logging.exception("Failed to start instance") - return False + _RemoveBlockDevLinks(instance.name, instance.disks) + return (False, "Hypervisor error: %s" % str(err)) - return True + return (True, "Instance started successfully") def ShutdownInstance(instance): """Shut an instance down. - @type instance: instance object + @note: this functions uses polling with a hardcoded timeout. + + @type instance: L{objects.Instance} @param instance: the instance object @rtype: boolean @return: whether the startup was successful or not @@ -684,11 +898,10 @@ def ShutdownInstance(instance): try: hyper.StopInstance(instance) except errors.HypervisorError, err: - logging.error("Failed to stop instance") + logging.error("Failed to stop instance: %s" % err) return False # test every 10secs for 2min - shutdown_ok = False time.sleep(1) for dummy in range(11): @@ -697,29 +910,43 @@ def ShutdownInstance(instance): time.sleep(10) else: # the shutdown did not succeed - logging.error("shutdown of '%s' unsuccessful, using destroy", instance) + logging.error("Shutdown of '%s' unsuccessful, using destroy", + instance.name) try: hyper.StopInstance(instance, force=True) except errors.HypervisorError, err: - logging.exception("Failed to stop instance") + logging.exception("Failed to stop instance: %s" % err) return False time.sleep(1) if instance.name in GetInstanceList([hv_name]): - logging.error("could not shutdown instance '%s' even by destroy", + logging.error("Could not shutdown instance '%s' even by destroy", instance.name) return False + _RemoveBlockDevLinks(instance.name, instance.disks) + return True def RebootInstance(instance, reboot_type, extra_args): """Reboot an instance. - Args: - instance - name of instance to reboot - reboot_type - how to reboot [soft,hard,full] + @type instance: L{objects.Instance} + @param instance: the instance object to reboot + @type reboot_type: str + @param reboot_type: the type of reboot, one the following + constants: + - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the + instance OS, do not recreate the VM + - L{constants.INSTANCE_REBOOT_HARD}: tear down and + restart the VM (at the hypervisor level) + - the other reboot type (L{constants.INSTANCE_REBOOT_HARD}) + is not accepted here, since that mode is handled + differently + @rtype: boolean + @return: the success of the operation """ running_instances = GetInstanceList([instance.hypervisor]) @@ -748,10 +975,69 @@ def RebootInstance(instance, reboot_type, extra_args): return True +def MigrationInfo(instance): + """Gather information about an instance to be migrated. + + @type instance: L{objects.Instance} + @param instance: the instance definition + + """ + hyper = hypervisor.GetHypervisor(instance.hypervisor) + try: + info = hyper.MigrationInfo(instance) + except errors.HypervisorError, err: + msg = "Failed to fetch migration information" + logging.exception(msg) + return (False, '%s: %s' % (msg, err)) + return (True, info) + + +def AcceptInstance(instance, info, target): + """Prepare the node to accept an instance. + + @type instance: L{objects.Instance} + @param instance: the instance definition + @type info: string/data (opaque) + @param info: migration information, from the source node + @type target: string + @param target: target host (usually ip), on this node + + """ + hyper = hypervisor.GetHypervisor(instance.hypervisor) + try: + hyper.AcceptInstance(instance, info, target) + except errors.HypervisorError, err: + msg = "Failed to accept instance" + logging.exception(msg) + return (False, '%s: %s' % (msg, err)) + return (True, "Accept successfull") + + +def FinalizeMigration(instance, info, success): + """Finalize any preparation to accept an instance. + + @type instance: L{objects.Instance} + @param instance: the instance definition + @type info: string/data (opaque) + @param info: migration information, from the source node + @type success: boolean + @param success: whether the migration was a success or a failure + + """ + hyper = hypervisor.GetHypervisor(instance.hypervisor) + try: + hyper.FinalizeMigration(instance, info, success) + except errors.HypervisorError, err: + msg = "Failed to finalize migration" + logging.exception(msg) + return (False, '%s: %s' % (msg, err)) + return (True, "Migration Finalized") + + def MigrateInstance(instance, target, live): """Migrates an instance to another node. - @type instance: C{objects.Instance} + @type instance: L{objects.Instance} @param instance: the instance definition @type target: string @param target: the target node name @@ -764,31 +1050,36 @@ def MigrateInstance(instance, target, live): - msg is a string with details in case of failure """ - hyper = hypervisor.GetHypervisor(instance.hypervisor_name) + hyper = hypervisor.GetHypervisor(instance.hypervisor) try: hyper.MigrateInstance(instance.name, target, live) except errors.HypervisorError, err: - msg = "Failed to migrate instance: %s" % str(err) - logging.error(msg) - return (False, msg) + msg = "Failed to migrate instance" + logging.exception(msg) + return (False, "%s: %s" % (msg, err)) return (True, "Migration successfull") def CreateBlockDevice(disk, size, owner, on_primary, info): """Creates a block device for an instance. - Args: - disk: a ganeti.objects.Disk object - size: the size of the physical underlying device - owner: a string with the name of the instance - on_primary: a boolean indicating if it is the primary node or not - info: string that will be sent to the physical device creation - - Returns: - the new unique_id of the device (this can sometime be - computed only after creation), or None. On secondary nodes, - it's not required to return anything. + @type disk: L{objects.Disk} + @param disk: the object describing the disk we should create + @type size: int + @param size: the size of the physical underlying device, in MiB + @type owner: str + @param owner: the name of the instance for which disk is created, + used for device cache data + @type on_primary: boolean + @param on_primary: indicates if it is the primary node or not + @type info: string + @param info: string that will be sent to the physical device + creation, used for example to set (LVM) tags on LVs + + @return: the new unique_id of the device (this can sometime be + computed only after creation), or None. On secondary nodes, + it's not required to return anything. """ clist = [] @@ -800,25 +1091,17 @@ def CreateBlockDevice(disk, size, owner, on_primary, info): # be assembled crdev.Open() clist.append(crdev) + try: - device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist) - if device is not None: - logging.info("removing existing device %s", disk) - device.Remove() - except errors.BlockDeviceError, err: - pass + device = bdev.Create(disk.dev_type, disk.physical_id, clist, size) + except errors.GenericError, err: + return False, "Can't create block device: %s" % str(err) - device = bdev.Create(disk.dev_type, disk.physical_id, - clist, size) - if device is None: - raise ValueError("Can't create child device for %s, %s" % - (disk, size)) if on_primary or disk.AssembleOnSecondary(): if not device.Assemble(): - errorstring = "Can't assemble device after creation" + errorstring = "Can't assemble device after creation, very unusual event" logging.error(errorstring) - raise errors.BlockDeviceError("%s, very unusual event - check the node" - " daemon logs" % errorstring) + return False, errorstring device.SetSyncSpeed(constants.SYNC_SPEED) if on_primary or disk.OpenOnSecondary(): device.Open(force=True) @@ -828,19 +1111,22 @@ def CreateBlockDevice(disk, size, owner, on_primary, info): device.SetInfo(info) physical_id = device.unique_id - return physical_id + return True, physical_id def RemoveBlockDevice(disk): """Remove a block device. - This is intended to be called recursively. + @note: This is intended to be called recursively. + + @type disk: L{objects.Disk} + @param disk: the disk object we should remove + @rtype: boolean + @return: the success of the operation """ try: - # since we are removing the device, allow a partial match - # this allows removal of broken mirrors - rdev = _RecursiveFindBD(disk, allow_partial=True) + rdev = _RecursiveFindBD(disk) except errors.BlockDeviceError, err: # probably can't attach logging.info("Can't attach to device %s in remove", disk) @@ -863,16 +1149,21 @@ def _RecursiveAssembleBD(disk, owner, as_primary): This is run on the primary and secondary nodes for an instance. - This function is called recursively. - - Args: - disk: a objects.Disk object - as_primary: if we should make the block device read/write - - Returns: - the assembled device or None (in case no device was assembled) + @note: this function is called recursively. - If the assembly is not successful, an exception is raised. + @type disk: L{objects.Disk} + @param disk: the disk we try to assemble + @type owner: str + @param owner: the name of the instance which owns the disk + @type as_primary: boolean + @param as_primary: if we should make the block device + read/write + + @return: the assembled device or None (in case no device + was assembled) + @raise errors.BlockDeviceError: in case there is an error + during the activation of the children or the device + itself """ children = [] @@ -893,7 +1184,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary): children.append(cdev) if as_primary or disk.AssembleOnSecondary(): - r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children) + r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children) r_dev.SetSyncSpeed(constants.SYNC_SPEED) result = r_dev if as_primary or disk.OpenOnSecondary(): @@ -911,9 +1202,9 @@ def AssembleBlockDevice(disk, owner, as_primary): This is a wrapper over _RecursiveAssembleBD. - Returns: - a /dev path for primary nodes - True for secondary nodes + @rtype: str or boolean + @return: a C{/dev/...} path for primary nodes, and + C{True} for secondary nodes """ result = _RecursiveAssembleBD(disk, owner, as_primary) @@ -925,13 +1216,20 @@ def AssembleBlockDevice(disk, owner, as_primary): def ShutdownBlockDevice(disk): """Shut down a block device. - First, if the device is assembled (can `Attach()`), then the device - is shutdown. Then the children of the device are shutdown. + First, if the device is assembled (Attach() is successfull), then + the device is shutdown. Then the children of the device are + shutdown. This function is called recursively. Note that we don't cache the children or such, as oppossed to assemble, shutdown of different devices doesn't require that the upper device was active. + @type disk: L{objects.Disk} + @param disk: the description of the disk we should + shutdown + @rtype: boolean + @return: the success of the operation + """ r_dev = _RecursiveFindBD(disk) if r_dev is not None: @@ -950,8 +1248,15 @@ def ShutdownBlockDevice(disk): def MirrorAddChildren(parent_cdev, new_cdevs): """Extend a mirrored block device. + @type parent_cdev: L{objects.Disk} + @param parent_cdev: the disk to which we should add children + @type new_cdevs: list of L{objects.Disk} + @param new_cdevs: the list of children which we should add + @rtype: boolean + @return: the success of the operation + """ - parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True) + parent_bdev = _RecursiveFindBD(parent_cdev) if parent_bdev is None: logging.error("Can't find parent device") return False @@ -967,6 +1272,13 @@ def MirrorAddChildren(parent_cdev, new_cdevs): def MirrorRemoveChildren(parent_cdev, new_cdevs): """Shrink a mirrored block device. + @type parent_cdev: L{objects.Disk} + @param parent_cdev: the disk from which we should remove children + @type new_cdevs: list of L{objects.Disk} + @param new_cdevs: the list of children which we should remove + @rtype: boolean + @return: the success of the operation + """ parent_bdev = _RecursiveFindBD(parent_cdev) if parent_bdev is None: @@ -992,12 +1304,14 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs): def GetMirrorStatus(disks): """Get the mirroring status of a list of devices. - Args: - disks: list of `objects.Disk` - - Returns: - list of (mirror_done, estimated_time) tuples, which - are the result of bdev.BlockDevice.CombinedSyncStatus() + @type disks: list of L{objects.Disk} + @param disks: the list of disks which we should query + @rtype: disk + @return: + a list of (mirror_done, estimated_time) tuples, which + are the result of L{bdev.BlockDev.CombinedSyncStatus} + @raise errors.BlockDeviceError: if any of the disks cannot be + found """ stats = [] @@ -1009,20 +1323,16 @@ def GetMirrorStatus(disks): return stats -def _RecursiveFindBD(disk, allow_partial=False): +def _RecursiveFindBD(disk): """Check if a device is activated. If so, return informations about the real device. - Args: - disk: the objects.Disk instance - allow_partial: don't abort the find if a child of the - device can't be found; this is intended to be - used when repairing mirrors + @type disk: L{objects.Disk} + @param disk: the disk object we need to find - Returns: - None if the device can't be found - otherwise the device instance + @return: None if the device can't be found, + otherwise the device instance """ children = [] @@ -1036,13 +1346,14 @@ def _RecursiveFindBD(disk, allow_partial=False): def FindBlockDevice(disk): """Check if a device is activated. - If so, return informations about the real device. + If it is, return informations about the real device. - Args: - disk: the objects.Disk instance - Returns: - None if the device can't be found - (device_path, major, minor, sync_percent, estimated_time, is_degraded) + @type disk: L{objects.Disk} + @param disk: the disk to find + @rtype: None or tuple + @return: None if the disk cannot be found, otherwise a + tuple (device_path, major, minor, sync_percent, + estimated_time, is_degraded) """ rbd = _RecursiveFindBD(disk) @@ -1057,6 +1368,24 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): This allows the master to overwrite(!) a file. It will only perform the operation if the file belongs to a list of configuration files. + @type file_name: str + @param file_name: the target file name + @type data: str + @param data: the new contents of the file + @type mode: int + @param mode: the mode to give the file (can be None) + @type uid: int + @param uid: the owner of the file (can be -1 for default) + @type gid: int + @param gid: the group of the file (can be -1 for default) + @type atime: float + @param atime: the atime to set on the file (can be None) + @type mtime: float + @param mtime: the mtime to set on the file (can be None) + @rtype: boolean + @return: the success of the operation; errors are logged + in the node daemon log + """ if not os.path.isabs(file_name): logging.error("Filename passed to UploadFile is not absolute: '%s'", @@ -1075,17 +1404,31 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): " upload targets: '%s'", file_name) return False - utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid, + raw_data = _Decompress(data) + + utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid, atime=atime, mtime=mtime) return True +def WriteSsconfFiles(values): + """Update all ssconf files. + + Wrapper around the SimpleStore.WriteFiles. + + """ + ssconf.SimpleStore().WriteFiles(values) + + def _ErrnoOrStr(err): """Format an EnvironmentError exception. - If the `err` argument has an errno attribute, it will be looked up - and converted into a textual EXXXX description. Otherwise the string - representation of the error will be returned. + If the L{err} argument has an errno attribute, it will be looked up + and converted into a textual C{E...} description. Otherwise the + string representation of the error will be returned. + + @type err: L{EnvironmentError} + @param err: the exception to format """ if hasattr(err, 'errno'): @@ -1098,11 +1441,18 @@ def _ErrnoOrStr(err): def _OSOndiskVersion(name, os_dir): """Compute and return the API version of a given OS. - This function will try to read the API version of the os given by + This function will try to read the API version of the OS given by the 'name' parameter and residing in the 'os_dir' directory. - Return value will be either an integer denoting the version or None in the - case when this is not a valid OS name. + @type name: str + @param name: the OS name we should look for + @type os_dir: str + @param os_dir: the directory inwhich we should look for the OS + @rtype: int or None + @return: + Either an integer denoting the version or None in the + case when this is not a valid OS name. + @raise errors.InvalidOS: if the OS cannot be found """ api_file = os.path.sep.join([os_dir, "ganeti_api_version"]) @@ -1140,11 +1490,13 @@ def _OSOndiskVersion(name, os_dir): def DiagnoseOS(top_dirs=None): """Compute the validity for all OSes. - Returns an OS object for each name in all the given top directories - (if not given defaults to constants.OS_SEARCH_PATH) - - Returns: - list of OS objects + @type top_dirs: list + @param top_dirs: the list of directories in which to + search (if not given defaults to + L{constants.OS_SEARCH_PATH}) + @rtype: list of L{objects.OS} + @return: an OS object for each name in all the given + directories """ if top_dirs is None: @@ -1173,15 +1525,16 @@ def OSFromDisk(name, base_dir=None): This function will return an OS instance if the given name is a valid OS name. Otherwise, it will raise an appropriate - `errors.InvalidOS` exception, detailing why this is not a valid - OS. + L{errors.InvalidOS} exception, detailing why this is not a valid OS. @type base_dir: string @keyword base_dir: Base directory containing OS installations. Defaults to a search in all the OS_SEARCH_PATH dirs. + @rtype: L{objects.OS} + @return: the OS instance if we find a valid one + @raise errors.InvalidOS: if we don't find a valid OS """ - if base_dir is None: os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir) if os_dir is None: @@ -1227,12 +1580,14 @@ def OSFromDisk(name, base_dir=None): def OSEnvironment(instance, debug=0): """Calculate the environment for an os script. - @type instance: instance object + @type instance: L{objects.Instance} @param instance: target instance for the os script run @type debug: integer - @param debug: debug level (0 or 1, for os api 10) + @param debug: debug level (0 or 1, for OS Api 10) @rtype: dict @return: dict of environment variables + @raise errors.BlockDeviceError: if the block device + cannot be found """ result = {} @@ -1274,14 +1629,14 @@ def GrowBlockDevice(disk, amount): """Grow a stack of block devices. This function is called recursively, with the childrens being the - first one resize. + first ones to resize. - Args: - disk: the disk to be grown - - Returns: a tuple of (status, result), with: - status: the result (true/false) of the operation - result: the error message if the operation failed, otherwise not used + @type disk: L{objects.Disk} + @param disk: the disk to be grown + @rtype: (status, result) + @return: a tuple with the status of the operation + (True/False), and the errors message if status + is False """ r_dev = _RecursiveFindBD(disk) @@ -1331,21 +1686,25 @@ def SnapshotBlockDevice(disk): (disk.unique_id, disk.dev_type)) -def ExportSnapshot(disk, dest_node, instance, cluster_name): +def ExportSnapshot(disk, dest_node, instance, cluster_name, idx): """Export a block device snapshot to a remote node. - Args: - disk: the snapshot block device - dest_node: the node to send the image to - instance: instance being exported - - Returns: - True if successful, False otherwise. + @type disk: L{objects.Disk} + @param disk: the description of the disk to export + @type dest_node: str + @param dest_node: the destination node to export to + @type instance: L{objects.Instance} + @param instance: the instance object to whom the disk belongs + @type cluster_name: str + @param cluster_name: the cluster name, needed for SSH hostalias + @type idx: int + @param idx: the index of the disk in the instance's disk list, + used to export to the OS scripts environment + @rtype: boolean + @return: the success of the operation """ - # TODO(ultrotter): Import/Export still to be converted to OS API 10 - logging.error("Import/Export still to be converted to OS API 10") - return False + export_env = OSEnvironment(instance) inst_os = OSFromDisk(instance.os) export_script = inst_os.export_script @@ -1354,12 +1713,14 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name): instance.name, int(time.time())) if not os.path.exists(constants.LOG_OS_DIR): os.mkdir(constants.LOG_OS_DIR, 0750) - - real_os_dev = _RecursiveFindBD(disk) - if real_os_dev is None: + real_disk = _RecursiveFindBD(disk) + if real_disk is None: raise errors.BlockDeviceError("Block device '%s' is not set up" % str(disk)) - real_os_dev.Open() + real_disk.Open() + + export_env['EXPORT_DEVICE'] = real_disk.dev_path + export_env['EXPORT_INDEX'] = str(idx) destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new") destfile = disk.physical_id[1] @@ -1367,10 +1728,8 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name): # the target command is built out of three individual commands, # which are joined by pipes; we check each individual command for # valid parameters - - expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path, - export_script, instance.name, - real_os_dev.dev_path, logfile) + expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path, + export_script, logfile) comprcmd = "gzip" @@ -1383,7 +1742,7 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name): # all commands have been checked, so we're safe to combine them command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)]) - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=export_env) if result.failed: logging.error("os snapshot export command '%s' returned error: %s" @@ -1396,12 +1755,15 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name): def FinalizeExport(instance, snap_disks): """Write out the export configuration information. - Args: - instance: instance configuration - snap_disks: snapshot block devices + @type instance: L{objects.Instance} + @param instance: the instance which we export, used for + saving configuration + @type snap_disks: list of L{objects.Disk} + @param snap_disks: list of snapshot block devices, which + will be used to get the actual name of the dump file - Returns: - False in case of error, True otherwise. + @rtype: boolean + @return: the success of the operation """ destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new") @@ -1424,34 +1786,32 @@ def FinalizeExport(instance, snap_disks): instance.beparams[constants.BE_VCPUS]) config.set(constants.INISECT_INS, 'disk_template', instance.disk_template) - nic_count = 0 + nic_total = 0 for nic_count, nic in enumerate(instance.nics): + nic_total += 1 config.set(constants.INISECT_INS, 'nic%d_mac' % nic_count, '%s' % nic.mac) config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip) config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge) # TODO: redundant: on load can read nics until it doesn't exist - config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count) + config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total) - disk_count = 0 + disk_total = 0 for disk_count, disk in enumerate(snap_disks): if disk: + disk_total += 1 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count, ('%s' % disk.iv_name)) config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count, ('%s' % disk.physical_id[1])) config.set(constants.INISECT_INS, 'disk%d_size' % disk_count, ('%d' % disk.size)) - config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count) - cff = os.path.join(destdir, constants.EXPORT_CONF_FILE) - cfo = open(cff, 'w') - try: - config.write(cfo) - finally: - cfo.close() + config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total) + utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE), + data=config.Dumps()) shutil.rmtree(finaldestdir, True) shutil.move(destdir, finaldestdir) @@ -1461,11 +1821,12 @@ def FinalizeExport(instance, snap_disks): def ExportInfo(dest): """Get export configuration information. - Args: - dest: directory containing the export + @type dest: str + @param dest: directory containing the export - Returns: - A serializable config file containing the export info. + @rtype: L{objects.SerializableConfigParser} + @return: a serializable config file containing the + export info """ cff = os.path.join(dest, constants.EXPORT_CONF_FILE) @@ -1480,82 +1841,62 @@ def ExportInfo(dest): return config -def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image, - cluster_name): +def ImportOSIntoInstance(instance, src_node, src_images, cluster_name): """Import an os image into an instance. - Args: - instance: the instance object - os_disk: the instance-visible name of the os device - swap_disk: the instance-visible name of the swap device - src_node: node holding the source image - src_image: path to the source image on src_node - - Returns: - False in case of error, True otherwise. + @type instance: L{objects.Instance} + @param instance: instance to import the disks into + @type src_node: string + @param src_node: source node for the disk images + @type src_images: list of string + @param src_images: absolute paths of the disk images + @rtype: list of boolean + @return: each boolean represent the success of importing the n-th disk """ - # TODO(ultrotter): Import/Export still to be converted to OS API 10 - logging.error("Import/Export still to be converted to OS API 10") - return False - + import_env = OSEnvironment(instance) inst_os = OSFromDisk(instance.os) import_script = inst_os.import_script - os_device = instance.FindDisk(os_disk) - if os_device is None: - logging.error("Can't find this device-visible name '%s'", os_disk) - return False - - swap_device = instance.FindDisk(swap_disk) - if swap_device is None: - logging.error("Can't find this device-visible name '%s'", swap_disk) - return False - - real_os_dev = _RecursiveFindBD(os_device) - if real_os_dev is None: - raise errors.BlockDeviceError("Block device '%s' is not set up" % - str(os_device)) - real_os_dev.Open() - - real_swap_dev = _RecursiveFindBD(swap_device) - if real_swap_dev is None: - raise errors.BlockDeviceError("Block device '%s' is not set up" % - str(swap_device)) - real_swap_dev.Open() - logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os, instance.name, int(time.time())) if not os.path.exists(constants.LOG_OS_DIR): os.mkdir(constants.LOG_OS_DIR, 0750) - destcmd = utils.BuildShellCmd('cat %s', src_image) - remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node, - constants.GANETI_RUNAS, - destcmd) - comprcmd = "gunzip" - impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)", - inst_os.path, import_script, instance.name, - real_os_dev.dev_path, real_swap_dev.dev_path, - logfile) - - command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) - env = {'HYPERVISOR': instance.hypervisor} - - result = utils.RunCmd(command, env=env) - - if result.failed: - logging.error("os import command '%s' returned error: %s" - " output: %s", command, result.fail_reason, result.output) - return False + impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path, + import_script, logfile) + + final_result = [] + for idx, image in enumerate(src_images): + if image: + destcmd = utils.BuildShellCmd('cat %s', image) + remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node, + constants.GANETI_RUNAS, + destcmd) + command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) + import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx] + import_env['IMPORT_INDEX'] = str(idx) + result = utils.RunCmd(command, env=import_env) + if result.failed: + logging.error("Disk import command '%s' returned error: %s" + " output: %s", command, result.fail_reason, + result.output) + final_result.append(False) + else: + final_result.append(True) + else: + final_result.append(True) - return True + return final_result def ListExports(): """Return a list of exports currently available on this machine. + @rtype: list + @return: list of the exports + """ if os.path.isdir(constants.EXPORT_DIR): return utils.ListVisibleFiles(constants.EXPORT_DIR) @@ -1566,11 +1907,10 @@ def ListExports(): def RemoveExport(export): """Remove an existing export from the node. - Args: - export: the name of the export to remove - - Returns: - False in case of error, True otherwise. + @type export: str + @param export: the name of the export to remove + @rtype: boolean + @return: the success of the operation """ target = os.path.join(constants.EXPORT_DIR, export) @@ -1585,9 +1925,14 @@ def RemoveExport(export): def RenameBlockDevices(devlist): """Rename a list of block devices. - The devlist argument is a list of tuples (disk, new_logical, - new_physical). The return value will be a combined boolean result - (True only if all renames succeeded). + @type devlist: list of tuples + @param devlist: list of tuples of the form (disk, + new_logical_id, new_physical_id); disk is an + L{objects.Disk} object describing the current disk, + and new logical_id/physical_id is the name we + rename it to + @rtype: boolean + @return: True if all renames succeeded, False otherwise """ result = True @@ -1620,11 +1965,10 @@ def _TransformFileStorageDir(file_storage_dir): default file_storage_dir stored in SimpleStore. Only paths under that directory are allowed. - Args: - file_storage_dir: string with path + @type file_storage_dir: str + @param file_storage_dir: the path to check - Returns: - normalized file_storage_dir (string) if valid, None otherwise + @return: the normalized path if valid, None otherwise """ cfg = _GetConfig() @@ -1642,12 +1986,12 @@ def _TransformFileStorageDir(file_storage_dir): def CreateFileStorageDir(file_storage_dir): """Create file storage directory. - Args: - file_storage_dir: string containing the path + @type file_storage_dir: str + @param file_storage_dir: directory to create - Returns: - tuple with first element a boolean indicating wheter dir - creation was successful or not + @rtype: tuple + @return: tuple with first element a boolean indicating wheter dir + creation was successful or not """ file_storage_dir = _TransformFileStorageDir(file_storage_dir) @@ -1674,12 +2018,11 @@ def RemoveFileStorageDir(file_storage_dir): Remove it only if it's empty. If not log an error and return. - Args: - file_storage_dir: string containing the path - - Returns: - tuple with first element a boolean indicating wheter dir - removal was successful or not + @type file_storage_dir: str + @param file_storage_dir: the directory we should cleanup + @rtype: tuple (success,) + @return: tuple of one element, C{success}, denoting + whether the operation was successfull """ file_storage_dir = _TransformFileStorageDir(file_storage_dir) @@ -1704,13 +2047,13 @@ def RemoveFileStorageDir(file_storage_dir): def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): """Rename the file storage directory. - Args: - old_file_storage_dir: string containing the old path - new_file_storage_dir: string containing the new path - - Returns: - tuple with first element a boolean indicating wheter dir - rename was successful or not + @type old_file_storage_dir: str + @param old_file_storage_dir: the current path + @type new_file_storage_dir: str + @param new_file_storage_dir: the name we should rename to + @rtype: tuple (success,) + @return: tuple of one element, C{success}, denoting + whether the operation was successful """ old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir) @@ -1741,6 +2084,11 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): def _IsJobQueueFile(file_name): """Checks whether the given filename is in the queue directory. + @type file_name: str + @param file_name: the file name we should check + @rtype: boolean + @return: whether the file is under the queue directory + """ queue_dir = os.path.normpath(constants.QUEUE_DIR) result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) @@ -1755,12 +2103,22 @@ def _IsJobQueueFile(file_name): def JobQueueUpdate(file_name, content): """Updates a file in the queue directory. + This is just a wrapper over L{utils.WriteFile}, with proper + checking. + + @type file_name: str + @param file_name: the job file name + @type content: str + @param content: the new job contents + @rtype: boolean + @return: the success of the operation + """ if not _IsJobQueueFile(file_name): return False # Write and replace the file atomically - utils.WriteFile(file_name, data=content) + utils.WriteFile(file_name, data=_Decompress(content)) return True @@ -1768,11 +2126,20 @@ def JobQueueUpdate(file_name, content): def JobQueueRename(old, new): """Renames a job queue file. + This is just a wrapper over os.rename with proper checking. + + @type old: str + @param old: the old (actual) file name + @type new: str + @param new: the desired file name + @rtype: boolean + @return: the success of the operation + """ if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): return False - os.rename(old, new) + utils.RenameFile(old, new, mkdir=True) return True @@ -1782,8 +2149,11 @@ def JobQueueSetDrainFlag(drain_flag): This will set or unset the queue drain flag. - @type drain_flag: bool + @type drain_flag: boolean @param drain_flag: if True, will set the drain flag, otherwise reset it. + @rtype: boolean + @return: always True + @warning: the function always returns True """ if drain_flag: @@ -1794,10 +2164,21 @@ def JobQueueSetDrainFlag(drain_flag): return True -def CloseBlockDevices(disks): +def CloseBlockDevices(instance_name, disks): """Closes the given block devices. - This means they will be switched to secondary mode (in case of DRBD). + This means they will be switched to secondary mode (in case of + DRBD). + + @param instance_name: if the argument is not empty, the symlinks + of this instance will be removed + @type disks: list of L{objects.Disk} + @param disks: the list of disks to be closed + @rtype: tuple (success, message) + @return: a tuple of success and message, where success + indicates the succes of the operation, and message + which will contain the error details in case we + failed """ bdevs = [] @@ -1816,6 +2197,8 @@ def CloseBlockDevices(disks): if msg: return (False, "Can't make devices secondary: %s" % ",".join(msg)) else: + if instance_name: + _RemoveBlockDevLinks(instance_name, disks) return (True, "All devices secondary") @@ -1826,8 +2209,11 @@ def ValidateHVParams(hvname, hvparams): @param hvname: the hypervisor name @type hvparams: dict @param hvparams: the hypervisor parameters to be validated - @rtype: tuple (bool, str) - @return: tuple of (success, message) + @rtype: tuple (success, message) + @return: a tuple of success and message, where success + indicates the succes of the operation, and message + which will contain the error details in case we + failed """ try: @@ -1838,11 +2224,150 @@ def ValidateHVParams(hvname, hvparams): return (False, str(err)) +def DemoteFromMC(): + """Demotes the current node from master candidate role. + + """ + # try to ensure we're not the master by mistake + master, myself = ssconf.GetMasterAndMyself() + if master == myself: + return (False, "ssconf status shows I'm the master node, will not demote") + pid_file = utils.DaemonPidFileName(constants.MASTERD_PID) + if utils.IsProcessAlive(utils.ReadPidFile(pid_file)): + return (False, "The master daemon is running, will not demote") + try: + utils.CreateBackup(constants.CLUSTER_CONF_FILE) + except EnvironmentError, err: + if err.errno != errno.ENOENT: + return (False, "Error while backing up cluster file: %s" % str(err)) + utils.RemoveFile(constants.CLUSTER_CONF_FILE) + return (True, "Done") + + +def _FindDisks(nodes_ip, disks): + """Sets the physical ID on disks and returns the block devices. + + """ + # set the correct physical ID + my_name = utils.HostInfo().name + for cf in disks: + cf.SetPhysicalID(my_name, nodes_ip) + + bdevs = [] + + for cf in disks: + rd = _RecursiveFindBD(cf) + if rd is None: + return (False, "Can't find device %s" % cf) + bdevs.append(rd) + return (True, bdevs) + + +def DrbdDisconnectNet(nodes_ip, disks): + """Disconnects the network on a list of drbd devices. + + """ + status, bdevs = _FindDisks(nodes_ip, disks) + if not status: + return status, bdevs + + # disconnect disks + for rd in bdevs: + try: + rd.DisconnectNet() + except errors.BlockDeviceError, err: + logging.exception("Failed to go into standalone mode") + return (False, "Can't change network configuration: %s" % str(err)) + return (True, "All disks are now disconnected") + + +def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster): + """Attaches the network on a list of drbd devices. + + """ + status, bdevs = _FindDisks(nodes_ip, disks) + if not status: + return status, bdevs + + if multimaster: + for idx, rd in enumerate(bdevs): + try: + _SymlinkBlockDev(instance_name, rd.dev_path, idx) + except EnvironmentError, err: + return (False, "Can't create symlink: %s" % str(err)) + # reconnect disks, switch to new master configuration and if + # needed primary mode + for rd in bdevs: + try: + rd.AttachNet(multimaster) + except errors.BlockDeviceError, err: + return (False, "Can't change network configuration: %s" % str(err)) + # wait until the disks are connected; we need to retry the re-attach + # if the device becomes standalone, as this might happen if the one + # node disconnects and reconnects in a different mode before the + # other node reconnects; in this case, one or both of the nodes will + # decide it has wrong configuration and switch to standalone + RECONNECT_TIMEOUT = 2 * 60 + sleep_time = 0.100 # start with 100 miliseconds + timeout_limit = time.time() + RECONNECT_TIMEOUT + while time.time() < timeout_limit: + all_connected = True + for rd in bdevs: + stats = rd.GetProcStatus() + if not (stats.is_connected or stats.is_in_resync): + all_connected = False + if stats.is_standalone: + # peer had different config info and this node became + # standalone, even though this should not happen with the + # new staged way of changing disk configs + try: + rd.ReAttachNet(multimaster) + except errors.BlockDeviceError, err: + return (False, "Can't change network configuration: %s" % str(err)) + if all_connected: + break + time.sleep(sleep_time) + sleep_time = min(5, sleep_time * 1.5) + if not all_connected: + return (False, "Timeout in disk reconnecting") + if multimaster: + # change to primary mode + for rd in bdevs: + rd.Open() + if multimaster: + msg = "multi-master and primary" + else: + msg = "single-master" + return (True, "Disks are now configured as %s" % msg) + + +def DrbdWaitSync(nodes_ip, disks): + """Wait until DRBDs have synchronized. + + """ + status, bdevs = _FindDisks(nodes_ip, disks) + if not status: + return status, bdevs + + min_resync = 100 + alldone = True + failure = False + for rd in bdevs: + stats = rd.GetProcStatus() + if not (stats.is_connected or stats.is_in_resync): + failure = True + break + alldone = alldone and (not stats.is_in_resync) + if stats.sync_percent is not None: + min_resync = min(min_resync, stats.sync_percent) + return (not failure, (alldone, min_resync)) + + class HooksRunner(object): """Hook runner. - This class is instantiated on the node side (ganeti-noded) and not on - the master side. + This class is instantiated on the node side (ganeti-noded) and not + on the master side. """ RE_MASK = re.compile("^[a-zA-Z0-9_-]+$") @@ -1850,9 +2375,9 @@ class HooksRunner(object): def __init__(self, hooks_base_dir=None): """Constructor for hooks runner. - Args: - - hooks_base_dir: if not None, this overrides the - constants.HOOKS_BASE_DIR (useful for unittests) + @type hooks_base_dir: str or None + @param hooks_base_dir: if not None, this overrides the + L{constants.HOOKS_BASE_DIR} (useful for unittests) """ if hooks_base_dir is None: @@ -1863,9 +2388,15 @@ class HooksRunner(object): def ExecHook(script, env): """Exec one hook script. - Args: - - script: the full path to the script - - env: the environment with which to exec the script + @type script: str + @param script: the full path to the script + @type env: dict + @param env: the environment with which to exec the script + @rtype: tuple (success, message) + @return: a tuple of success and message, where success + indicates the succes of the operation, and message + which will contain the error details in case we + failed """ # exec the process using subprocess and log the output @@ -1906,7 +2437,23 @@ class HooksRunner(object): def RunHooks(self, hpath, phase, env): """Run the scripts in the hooks directory. - This method will not be usually overriden by child opcodes. + @type hpath: str + @param hpath: the path to the hooks directory which + holds the scripts + @type phase: str + @param phase: either L{constants.HOOKS_PHASE_PRE} or + L{constants.HOOKS_PHASE_POST} + @type env: dict + @param env: dictionary with the environment for the hook + @rtype: list + @return: list of 3-element tuples: + - script path + - script result, either L{constants.HKR_SUCCESS} or + L{constants.HKR_FAIL} + - output of the script + + @raise errors.ProgrammerError: for invalid input + parameters """ if phase == constants.HOOKS_PHASE_PRE: @@ -1922,7 +2469,7 @@ class HooksRunner(object): try: dir_contents = utils.ListVisibleFiles(dir_name) except OSError, err: - # must log + # FIXME: must log output in case of failures return rr # we use the standard python sort order, @@ -1955,11 +2502,17 @@ class IAllocatorRunner(object): def Run(self, name, idata): """Run an iallocator script. - Return value: tuple of: + @type name: str + @param name: the iallocator script name + @type idata: str + @param idata: the allocator input data + + @rtype: tuple + @return: four element tuple of: - run status (one of the IARUN_ constants) - stdout - stderr - - fail reason (as from utils.RunResult) + - fail reason (as from L{utils.RunResult}) """ alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH, @@ -1993,7 +2546,12 @@ class DevCacheManager(object): """Converts a /dev/name path to the cache file name. This replaces slashes with underscores and strips the /dev - prefix. It then returns the full path to the cache file + prefix. It then returns the full path to the cache file. + + @type dev_path: str + @param dev_path: the C{/dev/} path name + @rtype: str + @return: the converted path name """ if dev_path.startswith(cls._DEV_PREFIX): @@ -2006,6 +2564,19 @@ class DevCacheManager(object): def UpdateCache(cls, dev_path, owner, on_primary, iv_name): """Updates the cache information for a given device. + @type dev_path: str + @param dev_path: the pathname of the device + @type owner: str + @param owner: the owner (instance name) of the device + @type on_primary: bool + @param on_primary: whether this is the primary + node nor not + @type iv_name: str + @param iv_name: the instance-visible name of the + device, as in objects.Disk.iv_name + + @rtype: None + """ if dev_path is None: logging.error("DevCacheManager.UpdateCache got a None dev_path") @@ -2027,6 +2598,14 @@ class DevCacheManager(object): def RemoveCache(cls, dev_path): """Remove data for a dev_path. + This is just a wrapper over L{utils.RemoveFile} with a converted + path name and logging. + + @type dev_path: str + @param dev_path: the pathname of the device + + @rtype: None + """ if dev_path is None: logging.error("DevCacheManager.RemoveCache got a None dev_path")