X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/78feb6fb918213135236381eea192c38e0cb36c8..8c229cc7e24f1c7075b38ff0e21aab7c75299c4d:/lib/utils.py diff --git a/lib/utils.py b/lib/utils.py index d5d604b..ceacc9b 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -20,6 +20,7 @@ """Ganeti small utilities + """ @@ -35,14 +36,24 @@ import shutil import errno import pwd import itertools +import select +import fcntl +import resource +import logging + +from cStringIO import StringIO -from ganeti import logger from ganeti import errors +from ganeti import constants _locksheld = [] _re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$') +debug = False +no_fork = False + + class RunResult(object): """Simple class for holding the result of running external programs. @@ -77,6 +88,10 @@ class RunResult(object): else: self.fail_reason = "unable to determine termination reason" + if self.failed: + logging.debug("Command '%s' failed (%s); output: %s", + self.cmd, self.fail_reason, self.output) + def _GetOutput(self): """Returns the combined stdout and stderr for easier usage. @@ -86,113 +101,6 @@ class RunResult(object): output = property(_GetOutput, None, None, "Return full output") -def _GetLockFile(subsystem): - """Compute the file name for a given lock name.""" - return "/var/lock/ganeti_lock_%s" % subsystem - - -def Lock(name, max_retries=None, debug=False): - """Lock a given subsystem. - - In case the lock is already held by an alive process, the function - will sleep indefintely and poll with a one second interval. - - When the optional integer argument 'max_retries' is passed with a - non-zero value, the function will sleep only for this number of - times, and then it will will raise a LockError if the lock can't be - acquired. Passing in a negative number will cause only one try to - get the lock. Passing a positive number will make the function retry - for approximately that number of seconds. - - """ - lockfile = _GetLockFile(name) - - if name in _locksheld: - raise errors.LockError('Lock "%s" already held!' % (name,)) - - errcount = 0 - - retries = 0 - while True: - try: - fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR | os.O_SYNC) - break - except OSError, creat_err: - if creat_err.errno != errno.EEXIST: - raise errors.LockError("Can't create the lock file. Error '%s'." % - str(creat_err)) - - try: - pf = open(lockfile, 'r') - except IOError, open_err: - errcount += 1 - if errcount >= 5: - raise errors.LockError("Lock file exists but cannot be opened." - " Error: '%s'." % str(open_err)) - time.sleep(1) - continue - - try: - pid = int(pf.read()) - except ValueError: - raise errors.LockError("Invalid pid string in %s" % - (lockfile,)) - - if not IsProcessAlive(pid): - raise errors.LockError("Stale lockfile %s for pid %d?" % - (lockfile, pid)) - - if max_retries and max_retries <= retries: - raise errors.LockError("Can't acquire lock during the specified" - " time, aborting.") - if retries == 5 and (debug or sys.stdin.isatty()): - logger.ToStderr("Waiting for '%s' lock from pid %d..." % (name, pid)) - - time.sleep(1) - retries += 1 - continue - - os.write(fd, '%d\n' % (os.getpid(),)) - os.close(fd) - - _locksheld.append(name) - - -def Unlock(name): - """Unlock a given subsystem. - - """ - lockfile = _GetLockFile(name) - - try: - fd = os.open(lockfile, os.O_RDONLY) - except OSError: - raise errors.LockError('Lock "%s" not held.' % (name,)) - - f = os.fdopen(fd, 'r') - pid_str = f.read() - - try: - pid = int(pid_str) - except ValueError: - raise errors.LockError('Unable to determine PID of locking process.') - - if pid != os.getpid(): - raise errors.LockError('Lock not held by me (%d != %d)' % - (os.getpid(), pid,)) - - os.unlink(lockfile) - _locksheld.remove(name) - - -def LockCleanup(): - """Remove all locks. - - """ - for lock in _locksheld: - Unlock(lock) - - def RunCmd(cmd): """Execute a (shell) command. @@ -205,6 +113,9 @@ def RunCmd(cmd): Returns: `RunResult` instance """ + if no_fork: + raise errors.ProgrammerError("utils.RunCmd() called with fork() disabled") + if isinstance(cmd, list): cmd = [str(val) for val in cmd] strcmd = " ".join(cmd) @@ -212,8 +123,10 @@ def RunCmd(cmd): else: strcmd = cmd shell = True + logging.debug("RunCmd '%s'", strcmd) env = os.environ.copy() env["LC_ALL"] = "C" + poller = select.poll() child = subprocess.Popen(cmd, shell=shell, stderr=subprocess.PIPE, stdout=subprocess.PIPE, @@ -221,8 +134,35 @@ def RunCmd(cmd): close_fds=True, env=env) child.stdin.close() - out = child.stdout.read() - err = child.stderr.read() + poller.register(child.stdout, select.POLLIN) + poller.register(child.stderr, select.POLLIN) + out = StringIO() + err = StringIO() + fdmap = { + child.stdout.fileno(): (out, child.stdout), + child.stderr.fileno(): (err, child.stderr), + } + for fd in fdmap: + status = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, status | os.O_NONBLOCK) + + while fdmap: + for fd, event in poller.poll(): + if event & select.POLLIN or event & select.POLLPRI: + data = fdmap[fd][1].read() + # no data from read signifies EOF (the same as POLLHUP) + if not data: + poller.unregister(fd) + del fdmap[fd] + continue + fdmap[fd][0].write(data) + if (event & select.POLLNVAL or event & select.POLLHUP or + event & select.POLLERR): + poller.unregister(fd) + del fdmap[fd] + + out = out.getvalue() + err = err.getvalue() status = child.wait() if status >= 0: @@ -235,30 +175,6 @@ def RunCmd(cmd): return RunResult(exitcode, signal, out, err, strcmd) -def RunCmdUnlocked(cmd): - """Execute a shell command without the 'cmd' lock. - - This variant of `RunCmd()` drops the 'cmd' lock before running the - command and re-aquires it afterwards, thus it can be used to call - other ganeti commands. - - The argument and return values are the same as for the `RunCmd()` - function. - - Args: - cmd - command to run. (str) - - Returns: - `RunResult` - - """ - Unlock('cmd') - ret = RunCmd(cmd) - Lock('cmd') - - return ret - - def RemoveFile(filename): """Remove a file ignoring some errors. @@ -343,8 +259,7 @@ def CheckDict(target, template, logname=None): target[k] = template[k] if missing and logname: - logger.Debug('%s missing keys %s' % - (logname, ', '.join(missing))) + logging.warning('%s missing keys %s', logname, ', '.join(missing)) def IsProcessAlive(pid): @@ -417,6 +332,12 @@ class HostInfo: self.name, self.aliases, self.ipaddrs = self.LookupHostname(name) self.ip = self.ipaddrs[0] + def ShortName(self): + """Returns the hostname without domain. + + """ + return self.name.split('.')[0] + @staticmethod def SysName(): """Return the current system's name. @@ -465,7 +386,7 @@ def ListVolumeGroups(): name, size = line.split() size = int(float(size)) except (IndexError, ValueError), err: - logger.Error("Invalid output from vgs (%s): %s" % (err, line)) + logging.error("Invalid output from vgs (%s): %s", err, line) continue retval[name] = size @@ -517,59 +438,6 @@ def NiceSort(name_list): return [tup[1] for tup in to_sort] -def CheckDaemonAlive(pid_file, process_string): - """Check wether the specified daemon is alive. - - Args: - - pid_file: file to read the daemon pid from, the file is - expected to contain only a single line containing - only the PID - - process_string: a substring that we expect to find in - the command line of the daemon process - - Returns: - - True if the daemon is judged to be alive (that is: - - the PID file exists, is readable and contains a number - - a process of the specified PID is running - - that process contains the specified string in its - command line - - the process is not in state Z (zombie)) - - False otherwise - - """ - try: - pid_file = file(pid_file, 'r') - try: - pid = int(pid_file.readline()) - finally: - pid_file.close() - - cmdline_file_path = "/proc/%s/cmdline" % (pid) - cmdline_file = open(cmdline_file_path, 'r') - try: - cmdline = cmdline_file.readline() - finally: - cmdline_file.close() - - if not process_string in cmdline: - return False - - stat_file_path = "/proc/%s/stat" % (pid) - stat_file = open(stat_file_path, 'r') - try: - process_state = stat_file.readline().split()[2] - finally: - stat_file.close() - - if process_state == 'Z': - return False - - except (IndexError, IOError, ValueError): - return False - - return True - - def TryConvert(fn, val): """Try to convert a value ignoring errors. @@ -724,21 +592,115 @@ def RemoveAuthorizedKey(file_name, key): key_fields = key.split() fd, tmpname = tempfile.mkstemp(dir=os.path.dirname(file_name)) - out = os.fdopen(fd, 'w') try: - f = open(file_name, 'r') + out = os.fdopen(fd, 'w') try: - for line in f: - # Ignore whitespace changes while comparing lines - if line.split() != key_fields: + f = open(file_name, 'r') + try: + for line in f: + # Ignore whitespace changes while comparing lines + if line.split() != key_fields: + out.write(line) + + out.flush() + os.rename(tmpname, file_name) + finally: + f.close() + finally: + out.close() + except: + RemoveFile(tmpname) + raise + + +def SetEtcHostsEntry(file_name, ip, hostname, aliases): + """Sets the name of an IP address and hostname in /etc/hosts. + + """ + # Ensure aliases are unique + aliases = UniqueSequence([hostname] + aliases)[1:] + + fd, tmpname = tempfile.mkstemp(dir=os.path.dirname(file_name)) + try: + out = os.fdopen(fd, 'w') + try: + f = open(file_name, 'r') + try: + written = False + for line in f: + fields = line.split() + if fields and not fields[0].startswith('#') and ip == fields[0]: + continue out.write(line) - out.flush() - os.rename(tmpname, file_name) + out.write("%s\t%s" % (ip, hostname)) + if aliases: + out.write(" %s" % ' '.join(aliases)) + out.write('\n') + + out.flush() + os.fsync(out) + os.rename(tmpname, file_name) + finally: + f.close() finally: - f.close() - finally: - out.close() + out.close() + except: + RemoveFile(tmpname) + raise + + +def AddHostToEtcHosts(hostname): + """Wrapper around SetEtcHostsEntry. + + """ + hi = HostInfo(name=hostname) + SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()]) + + +def RemoveEtcHostsEntry(file_name, hostname): + """Removes a hostname from /etc/hosts. + + IP addresses without names are removed from the file. + """ + fd, tmpname = tempfile.mkstemp(dir=os.path.dirname(file_name)) + try: + out = os.fdopen(fd, 'w') + try: + f = open(file_name, 'r') + try: + for line in f: + fields = line.split() + if len(fields) > 1 and not fields[0].startswith('#'): + names = fields[1:] + if hostname in names: + while hostname in names: + names.remove(hostname) + if names: + out.write("%s %s\n" % (fields[0], ' '.join(names))) + continue + + out.write(line) + + out.flush() + os.fsync(out) + os.rename(tmpname, file_name) + finally: + f.close() + finally: + out.close() + except: + RemoveFile(tmpname) + raise + + +def RemoveHostFromEtcHosts(hostname): + """Wrapper around RemoveEtcHostsEntry. + + """ + hi = HostInfo(name=hostname) + RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name) + RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName()) def CreateBackup(file_name): @@ -752,11 +714,11 @@ def CreateBackup(file_name): file_name) prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time())) - dir = os.path.dirname(file_name) + dir_name = os.path.dirname(file_name) fsrc = open(file_name, 'rb') try: - (fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir) + (fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name) fdst = os.fdopen(fd, 'wb') try: shutil.copyfileobj(fsrc, fdst) @@ -785,25 +747,29 @@ def ShellQuoteArgs(args): return ' '.join([ShellQuote(i) for i in args]) - -def TcpPing(source, target, port, timeout=10, live_port_needed=False): +def TcpPing(target, port, timeout=10, live_port_needed=False, source=None): """Simple ping implementation using TCP connect(2). - Try to do a TCP connect(2) from the specified source IP to the specified - target IP and the specified target port. If live_port_needed is set to true, - requires the remote end to accept the connection. The timeout is specified - in seconds and defaults to 10 seconds + Try to do a TCP connect(2) from an optional source IP to the + specified target IP and the specified target port. If the optional + parameter live_port_needed is set to true, requires the remote end + to accept the connection. The timeout is specified in seconds and + defaults to 10 seconds. If the source optional argument is not + passed, the source address selection is left to the kernel, + otherwise we try to connect using the passed address (failures to + bind other than EADDRNOTAVAIL will be ignored). """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sucess = False - try: - sock.bind((source, 0)) - except socket.error, (errcode, errstring): - if errcode == errno.EADDRNOTAVAIL: - success = False + if source is not None: + try: + sock.bind((source, 0)) + except socket.error, (errcode, errstring): + if errcode == errno.EADDRNOTAVAIL: + success = False sock.settimeout(timeout) @@ -823,7 +789,9 @@ def ListVisibleFiles(path): """Returns a list of all visible files in a directory. """ - return [i for i in os.listdir(path) if not i.startswith(".")] + files = [i for i in os.listdir(path) if not i.startswith(".")] + files.sort() + return files def GetHomeDir(user, default=None): @@ -847,7 +815,7 @@ def GetHomeDir(user, default=None): return result.pw_dir -def GetUUID(): +def NewUUID(): """Returns a random UUID. """ @@ -860,7 +828,9 @@ def GetUUID(): def WriteFile(file_name, fn=None, data=None, mode=None, uid=-1, gid=-1, - atime=None, mtime=None): + atime=None, mtime=None, close=True, + dry_run=False, backup=False, + prewrite=None, postwrite=None): """(Over)write a file atomically. The file_name and either fn (a function taking one argument, the @@ -874,6 +844,22 @@ def WriteFile(file_name, fn=None, data=None, exception, an existing target file should be unmodified and the temporary file should be removed. + Args: + file_name: New filename + fn: Content writing function, called with file descriptor as parameter + data: Content as string + mode: File mode + uid: Owner + gid: Group + atime: Access time + mtime: Modification time + close: Whether to close file after writing it + prewrite: Function object called before writing content + postwrite: Function object called after writing content + + Returns: + None if "close" parameter evaluates to True, otherwise file descriptor. + """ if not os.path.isabs(file_name): raise errors.ProgrammerError("Path passed to WriteFile is not" @@ -886,6 +872,8 @@ def WriteFile(file_name, fn=None, data=None, raise errors.ProgrammerError("Both atime and mtime must be either" " set or None") + if backup and not dry_run and os.path.isfile(file_name): + CreateBackup(file_name) dir_name, base_name = os.path.split(file_name) fd, new_name = tempfile.mkstemp('.new', base_name, dir_name) @@ -896,18 +884,50 @@ def WriteFile(file_name, fn=None, data=None, os.chown(new_name, uid, gid) if mode: os.chmod(new_name, mode) + if callable(prewrite): + prewrite(fd) if data is not None: os.write(fd, data) else: fn(fd) + if callable(postwrite): + postwrite(fd) os.fsync(fd) if atime is not None and mtime is not None: os.utime(new_name, (atime, mtime)) - os.rename(new_name, file_name) + if not dry_run: + os.rename(new_name, file_name) finally: - os.close(fd) + if close: + os.close(fd) + result = None + else: + result = fd RemoveFile(new_name) + return result + + +def FirstFree(seq, base=0): + """Returns the first non-existing integer from seq. + + The seq argument should be a sorted list of positive integers. The + first time the index of an element is smaller than the element + value, the index will be returned. + + The base argument is used to start at a different offset, + i.e. [3, 4, 6] with offset=3 will return 5. + + Example: [0, 1, 3] will return 2. + + """ + for idx, elem in enumerate(seq): + assert elem >= base, "Passed element is higher than base offset" + if elem > idx + base: + # idx is not used + return idx + base + return None + def all(seq, pred=bool): "Returns True if pred(x) is True for every element in the iterable" @@ -921,3 +941,156 @@ def any(seq, pred=bool): for elem in itertools.ifilter(pred, seq): return True return False + + +def UniqueSequence(seq): + """Returns a list with unique elements. + + Element order is preserved. + """ + seen = set() + return [i for i in seq if i not in seen and not seen.add(i)] + + +def IsValidMac(mac): + """Predicate to check if a MAC address is valid. + + Checks wether the supplied MAC address is formally correct, only + accepts colon separated format. + """ + mac_check = re.compile("^([0-9a-f]{2}(:|$)){6}$") + return mac_check.match(mac) is not None + + +def TestDelay(duration): + """Sleep for a fixed amount of time. + + """ + if duration < 0: + return False + time.sleep(duration) + return True + + +def Daemonize(logfile, noclose_fds=None): + """Daemonize the current process. + + This detaches the current process from the controlling terminal and + runs it in the background as a daemon. + + """ + UMASK = 077 + WORKDIR = "/" + # Default maximum for the number of available file descriptors. + if 'SC_OPEN_MAX' in os.sysconf_names: + try: + MAXFD = os.sysconf('SC_OPEN_MAX') + if MAXFD < 0: + MAXFD = 1024 + except OSError: + MAXFD = 1024 + else: + MAXFD = 1024 + + # this might fail + pid = os.fork() + if (pid == 0): # The first child. + os.setsid() + # this might fail + pid = os.fork() # Fork a second child. + if (pid == 0): # The second child. + os.chdir(WORKDIR) + os.umask(UMASK) + else: + # exit() or _exit()? See below. + os._exit(0) # Exit parent (the first child) of the second child. + else: + os._exit(0) # Exit parent of the first child. + maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (maxfd == resource.RLIM_INFINITY): + maxfd = MAXFD + + # Iterate through and close all file descriptors. + for fd in range(0, maxfd): + if noclose_fds and fd in noclose_fds: + continue + try: + os.close(fd) + except OSError: # ERROR, fd wasn't open to begin with (ignored) + pass + os.open(logfile, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600) + # Duplicate standard input to standard output and standard error. + os.dup2(0, 1) # standard output (1) + os.dup2(0, 2) # standard error (2) + return 0 + + +def FindFile(name, search_path, test=os.path.exists): + """Look for a filesystem object in a given path. + + This is an abstract method to search for filesystem object (files, + dirs) under a given search path. + + Args: + - name: the name to look for + - search_path: list of directory names + - test: the test which the full path must satisfy + (defaults to os.path.exists) + + Returns: + - full path to the item if found + - None otherwise + + """ + for dir_name in search_path: + item_name = os.path.sep.join([dir_name, name]) + if test(item_name): + return item_name + return None + + +def CheckVolumeGroupSize(vglist, vgname, minsize): + """Checks if the volume group list is valid. + + A non-None return value means there's an error, and the return value + is the error message. + + """ + vgsize = vglist.get(vgname, None) + if vgsize is None: + return "volume group '%s' missing" % vgname + elif vgsize < minsize: + return ("volume group '%s' too small (%s MiB required, %d MiB found)" % + (vgname, minsize, vgsize)) + return None + + +def LockedMethod(fn): + """Synchronized object access decorator. + + This decorator is intended to protect access to an object using the + object's own lock which is hardcoded to '_lock'. + + """ + def wrapper(self, *args, **kwargs): + assert hasattr(self, '_lock') + lock = self._lock + lock.acquire() + try: + result = fn(self, *args, **kwargs) + finally: + lock.release() + return result + return wrapper + + +def LockFile(fd): + """Locks a file using POSIX locks. + + """ + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError, err: + if err.errno == errno.EAGAIN: + raise errors.LockError("File already locked") + raise