X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/df5758b1f2312bcb9024f1a2cb3be835c973f8ee..b82d4c5e4e5751f12b61a71b16592fb20b8f822e:/lib/backend.py?ds=sidebyside diff --git a/lib/backend.py b/lib/backend.py index 4ef08ea..2bae5a2 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -76,6 +76,9 @@ _IES_STATUS_FILE = "status" _IES_PID_FILE = "pid" _IES_CA_FILE = "ca" +#: Valid LVS output line regex +_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$") + class RPCFail(Exception): """Class denoting RPC failure. @@ -440,19 +443,20 @@ def GetNodeInfo(vgname, hypervisor_type): """ outputarray = {} - vginfo = bdev.LogicalVolume.GetVGInfo([vgname]) - vg_free = vg_size = None - if vginfo: - vg_free = int(round(vginfo[0][0], 0)) - vg_size = int(round(vginfo[0][1], 0)) - - outputarray['vg_size'] = vg_size - outputarray['vg_free'] = vg_free - - hyper = hypervisor.GetHypervisor(hypervisor_type) - hyp_info = hyper.GetNodeInfo() - if hyp_info is not None: - outputarray.update(hyp_info) + if vgname is not None: + vginfo = bdev.LogicalVolume.GetVGInfo([vgname]) + vg_free = vg_size = None + if vginfo: + vg_free = int(round(vginfo[0][0], 0)) + vg_size = int(round(vginfo[0][1], 0)) + outputarray['vg_size'] = vg_size + outputarray['vg_free'] = vg_free + + if hypervisor_type is not None: + hyper = hypervisor.GetHypervisor(hypervisor_type) + hyp_info = hyper.GetNodeInfo() + if hyp_info is not None: + outputarray.update(hyp_info) outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n") @@ -491,8 +495,9 @@ def VerifyNode(what, cluster_name): result = {} my_name = netutils.Hostname.GetSysName() port = netutils.GetDaemonPort(constants.NODED) + vm_capable = my_name not in what.get(constants.NV_VMNODES, []) - if constants.NV_HYPERVISOR in what: + if constants.NV_HYPERVISOR in what and vm_capable: result[constants.NV_HYPERVISOR] = tmp = {} for hv_name in what[constants.NV_HYPERVISOR]: try: @@ -501,6 +506,15 @@ def VerifyNode(what, cluster_name): val = "Error while checking hypervisor: %s" % str(err) tmp[hv_name] = val + if constants.NV_HVPARAMS in what and vm_capable: + result[constants.NV_HVPARAMS] = tmp = [] + for source, hv_name, hvparms in what[constants.NV_HVPARAMS]: + try: + logging.info("Validating hv %s, %s", hv_name, hvparms) + hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms) + except errors.HypervisorError, err: + tmp.append((source, hv_name, str(err))) + if constants.NV_FILELIST in what: result[constants.NV_FILELIST] = utils.FingerprintFiles( what[constants.NV_FILELIST]) @@ -547,14 +561,30 @@ def VerifyNode(what, cluster_name): result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port, source=source) - if constants.NV_LVLIST in what: + if constants.NV_OOB_PATHS in what: + result[constants.NV_OOB_PATHS] = tmp = [] + for path in what[constants.NV_OOB_PATHS]: + try: + st = os.stat(path) + except OSError, err: + tmp.append("error stating out of band helper: %s" % err) + else: + if stat.S_ISREG(st.st_mode): + if stat.S_IMODE(st.st_mode) & stat.S_IXUSR: + tmp.append(None) + else: + tmp.append("out of band helper %s is not executable" % path) + else: + tmp.append("out of band helper %s is not a file" % path) + + if constants.NV_LVLIST in what and vm_capable: try: - val = GetVolumeList(what[constants.NV_LVLIST]) + val = GetVolumeList(utils.ListVolumeGroups().keys()) except RPCFail, err: val = str(err) result[constants.NV_LVLIST] = val - if constants.NV_INSTANCELIST in what: + if constants.NV_INSTANCELIST in what and vm_capable: # GetInstanceList can fail try: val = GetInstanceList(what[constants.NV_INSTANCELIST]) @@ -562,10 +592,10 @@ def VerifyNode(what, cluster_name): val = str(err) result[constants.NV_INSTANCELIST] = val - if constants.NV_VGLIST in what: + if constants.NV_VGLIST in what and vm_capable: result[constants.NV_VGLIST] = utils.ListVolumeGroups() - if constants.NV_PVLIST in what: + if constants.NV_PVLIST in what and vm_capable: result[constants.NV_PVLIST] = \ bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST], filter_allocatable=False) @@ -574,11 +604,11 @@ def VerifyNode(what, cluster_name): result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION, constants.RELEASE_VERSION) - if constants.NV_HVINFO in what: + if constants.NV_HVINFO in what and vm_capable: hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO]) result[constants.NV_HVINFO] = hyper.GetNodeInfo() - if constants.NV_DRBDLIST in what: + if constants.NV_DRBDLIST in what and vm_capable: try: used_minors = bdev.DRBD8.GetUsedDevs().keys() except errors.BlockDeviceError, err: @@ -586,7 +616,7 @@ def VerifyNode(what, cluster_name): used_minors = str(err) result[constants.NV_DRBDLIST] = used_minors - if constants.NV_DRBDHELPER in what: + if constants.NV_DRBDHELPER in what and vm_capable: status = True try: payload = bdev.BaseDRBD.GetUsermodeHelper() @@ -611,23 +641,24 @@ def VerifyNode(what, cluster_name): if constants.NV_TIME in what: result[constants.NV_TIME] = utils.SplitTime(time.time()) - if constants.NV_OSLIST in what: + if constants.NV_OSLIST in what and vm_capable: result[constants.NV_OSLIST] = DiagnoseOS() return result -def GetVolumeList(vg_name): +def GetVolumeList(vg_names): """Compute list of logical volumes and their size. - @type vg_name: str - @param vg_name: the volume group whose LVs we should list + @type vg_names: list + @param vg_names: the volume groups whose LVs we should list, or + empty for all volume groups @rtype: dict @return: dictionary of all partions (key) with value being a tuple of their size (in MiB), inactive and online status:: - {'test1': ('20.06', True, True)} + {'xenvg/test1': ('20.06', True, True)} in case of errors, a string is returned with the error details. @@ -635,20 +666,21 @@ def GetVolumeList(vg_name): """ lvs = {} sep = '|' + if not vg_names: + vg_names = [] result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", "--separator=%s" % sep, - "-olv_name,lv_size,lv_attr", vg_name]) + "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names) if result.failed: _Fail("Failed to list logical volumes, lvs output: %s", result.output) - valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$") for line in result.stdout.splitlines(): line = line.strip() - match = valid_line_re.match(line) + match = _LVSLINE_REGEX.match(line) if not match: logging.error("Invalid line returned from lvs output: '%s'", line) continue - name, size, attr = match.groups() + vg_name, name, size, attr = match.groups() inactive = attr[4] == '-' online = attr[5] == 'o' virtual = attr[0] == 'v' @@ -656,7 +688,7 @@ def GetVolumeList(vg_name): # we don't want to report such volumes as existing, since they # don't really hold data continue - lvs[name] = (size, inactive, online) + lvs[vg_name+"/"+name] = (size, inactive, online) return lvs @@ -937,8 +969,8 @@ def RunRenameInstance(instance, old_name, debug): def _GetBlockDevSymlinkPath(instance_name, idx): - return utils.PathJoin(constants.DISK_LINKS_DIR, - "%s:%d" % (instance_name, idx)) + return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" % + (instance_name, constants.DISK_SEPARATOR, idx)) def _SymlinkBlockDev(instance_name, device_path, idx): @@ -1287,6 +1319,87 @@ def BlockdevCreate(disk, size, owner, on_primary, info): return device.unique_id +def _WipeDevice(path, offset, size): + """This function actually wipes the device. + + @param path: The path to the device to wipe + @param offset: The offset in MiB in the file + @param size: The size in MiB to write + + """ + cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset, + "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path, + "count=%d" % size] + result = utils.RunCmd(cmd) + + if result.failed: + _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd, + result.fail_reason, result.output) + + +def BlockdevWipe(disk, offset, size): + """Wipes a block device. + + @type disk: L{objects.Disk} + @param disk: the disk object we want to wipe + @type offset: int + @param offset: The offset in MiB in the file + @type size: int + @param size: The size in MiB to write + + """ + try: + rdev = _RecursiveFindBD(disk) + except errors.BlockDeviceError: + rdev = None + + if not rdev: + _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name) + + # Do cross verify some of the parameters + if offset > rdev.size: + _Fail("Offset is bigger than device size") + if (offset + size) > rdev.size: + _Fail("The provided offset and size to wipe is bigger than device size") + + _WipeDevice(rdev.dev_path, offset, size) + + +def BlockdevPauseResumeSync(disks, pause): + """Pause or resume the sync of the block device. + + @type disks: list of L{objects.Disk} + @param disks: the disks object we want to pause/resume + @type pause: bool + @param pause: Wheater to pause or resume + + """ + success = [] + for disk in disks: + try: + rdev = _RecursiveFindBD(disk) + except errors.BlockDeviceError: + rdev = None + + if not rdev: + success.append((False, ("Cannot change sync for device %s:" + " device not found" % disk.iv_name))) + continue + + result = rdev.PauseResumeSync(pause) + + if result: + success.append((result, None)) + else: + if pause: + msg = "Pause" + else: + msg = "Resume" + success.append((result, "%s for device %s failed" % (msg, disk.iv_name))) + + return success + + def BlockdevRemove(disk): """Remove a block device. @@ -1492,9 +1605,7 @@ def BlockdevGetmirrorstatus(disks): @type disks: list of L{objects.Disk} @param disks: the list of disks which we should query @rtype: disk - @return: - a list of (mirror_done, estimated_time) tuples, which - are the result of L{bdev.BlockDev.CombinedSyncStatus} + @return: List of L{objects.BlockDevStatus}, one for each disk @raise errors.BlockDeviceError: if any of the disks cannot be found @@ -1510,6 +1621,37 @@ def BlockdevGetmirrorstatus(disks): return stats +def BlockdevGetmirrorstatusMulti(disks): + """Get the mirroring status of a list of devices. + + @type disks: list of L{objects.Disk} + @param disks: the list of disks which we should query + @rtype: disk + @return: List of tuples, (bool, status), one for each disk; bool denotes + success/failure, status is L{objects.BlockDevStatus} on success, string + otherwise + + """ + result = [] + for disk in disks: + try: + rbd = _RecursiveFindBD(disk) + if rbd is None: + result.append((False, "Can't find device %s" % disk)) + continue + + status = rbd.CombinedSyncStatus() + except errors.BlockDeviceError, err: + logging.exception("Error while getting disk status") + result.append((False, str(err))) + else: + result.append((True, status)) + + assert len(disks) == len(result) + + return result + + def _RecursiveFindBD(disk): """Check if a device is activated. @@ -1671,8 +1813,30 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): raw_data = _Decompress(data) - utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid, - atime=atime, mtime=mtime) + utils.SafeWriteFile(file_name, None, + data=raw_data, mode=mode, uid=uid, gid=gid, + atime=atime, mtime=mtime) + + +def RunOob(oob_program, command, node, timeout): + """Executes oob_program with given command on given node. + + @param oob_program: The path to the executable oob_program + @param command: The command to invoke on oob_program + @param node: The node given as an argument to the program + @param timeout: Timeout after which we kill the oob program + + @return: stdout + @raise RPCFail: If execution fails for some reason + + """ + result = utils.RunCmd([oob_program, command, node], timeout=timeout) + + if result.failed: + _Fail("'%s' failed with reason '%s'; output: %s", result.cmd, + result.fail_reason, result.output) + + return result.stdout def WriteSsconfFiles(values): @@ -1966,8 +2130,9 @@ def OSEnvironment(instance, inst_os, debug=0): """ result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug) - result['INSTANCE_NAME'] = instance.name - result['INSTANCE_OS'] = instance.os + for attr in ["name", "os", "uuid", "ctime", "mtime"]: + result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr)) + result['HYPERVISOR'] = instance.hypervisor result['DISK_COUNT'] = '%d' % len(instance.disks) result['NIC_COUNT'] = '%d' % len(instance.nics) @@ -2041,7 +2206,7 @@ def BlockdevSnapshot(disk): @type disk: L{objects.Disk} @param disk: the disk to be snapshotted @rtype: string - @return: snapshot disk path + @return: snapshot disk ID as (vg, lv) """ if disk.dev_type == constants.LD_DRBD8: @@ -2359,7 +2524,7 @@ def _EnsureJobQueueFile(file_name): def JobQueueUpdate(file_name, content): """Updates a file in the queue directory. - This is just a wrapper over L{utils.WriteFile}, with proper + This is just a wrapper over L{utils.io.WriteFile}, with proper checking. @type file_name: str @@ -2797,6 +2962,11 @@ def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs): if port: cmd.append("--port=%s" % port) + if opts.ipv6: + cmd.append("--ipv6") + else: + cmd.append("--ipv4") + if opts.compress: cmd.append("--compress=%s" % opts.compress) @@ -2812,6 +2982,15 @@ def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs): if cmd_suffix: cmd.append("--cmd-suffix=%s" % cmd_suffix) + if mode == constants.IEM_EXPORT: + # Retry connection a few times when connecting to remote peer + cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES) + cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT) + elif opts.connect_timeout is not None: + assert mode == constants.IEM_IMPORT + # Overall timeout for establishing connection while listening + cmd.append("--connect-timeout=%s" % opts.connect_timeout) + logfile = _InstanceLogName(prefix, instance.os, instance.name) # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has @@ -3241,7 +3420,7 @@ class DevCacheManager(object): def RemoveCache(cls, dev_path): """Remove data for a dev_path. - This is just a wrapper over L{utils.RemoveFile} with a converted + This is just a wrapper over L{utils.io.RemoveFile} with a converted path name and logging. @type dev_path: str