X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/04a8d78965d14034d4ca0e939ea656fc343281be..8161a64679c5b3c46a275bc3ae8e13b69c902993:/lib/utils.py diff --git a/lib/utils.py b/lib/utils.py index 4e8cfe7..23fd1eb 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -39,10 +39,11 @@ import itertools import select import fcntl import resource +import logging +import signal from cStringIO import StringIO -from ganeti import logger from ganeti import errors from ganeti import constants @@ -73,13 +74,13 @@ class RunResult(object): "failed", "fail_reason", "cmd"] - def __init__(self, exit_code, signal, stdout, stderr, cmd): + def __init__(self, exit_code, signal_, stdout, stderr, cmd): self.cmd = cmd self.exit_code = exit_code - self.signal = signal + self.signal = signal_ self.stdout = stdout self.stderr = stderr - self.failed = (signal is not None or exit_code != 0) + self.failed = (signal_ is not None or exit_code != 0) if self.signal is not None: self.fail_reason = "terminated by signal %s" % self.signal @@ -88,9 +89,9 @@ class RunResult(object): else: self.fail_reason = "unable to determine termination reason" - if debug and self.failed: - logger.Debug("Command '%s' failed (%s); output: %s" % - (self.cmd, self.fail_reason, self.output)) + if self.failed: + logging.debug("Command '%s' failed (%s); output: %s", + self.cmd, self.fail_reason, self.output) def _GetOutput(self): """Returns the combined stdout and stderr for easier usage. @@ -101,113 +102,6 @@ class RunResult(object): output = property(_GetOutput, None, None, "Return full output") -def _GetLockFile(subsystem): - """Compute the file name for a given lock name.""" - return "%s/ganeti_lock_%s" % (constants.LOCK_DIR, subsystem) - - -def Lock(name, max_retries=None, debug=False): - """Lock a given subsystem. - - In case the lock is already held by an alive process, the function - will sleep indefintely and poll with a one second interval. - - When the optional integer argument 'max_retries' is passed with a - non-zero value, the function will sleep only for this number of - times, and then it will will raise a LockError if the lock can't be - acquired. Passing in a negative number will cause only one try to - get the lock. Passing a positive number will make the function retry - for approximately that number of seconds. - - """ - lockfile = _GetLockFile(name) - - if name in _locksheld: - raise errors.LockError('Lock "%s" already held!' % (name,)) - - errcount = 0 - - retries = 0 - while True: - try: - fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR | os.O_SYNC) - break - except OSError, creat_err: - if creat_err.errno != errno.EEXIST: - raise errors.LockError("Can't create the lock file. Error '%s'." % - str(creat_err)) - - try: - pf = open(lockfile, 'r') - except IOError, open_err: - errcount += 1 - if errcount >= 5: - raise errors.LockError("Lock file exists but cannot be opened." - " Error: '%s'." % str(open_err)) - time.sleep(1) - continue - - try: - pid = int(pf.read()) - except ValueError: - raise errors.LockError("Invalid pid string in %s" % - (lockfile,)) - - if not IsProcessAlive(pid): - raise errors.LockError("Stale lockfile %s for pid %d?" % - (lockfile, pid)) - - if max_retries and max_retries <= retries: - raise errors.LockError("Can't acquire lock during the specified" - " time, aborting.") - if retries == 5 and (debug or sys.stdin.isatty()): - logger.ToStderr("Waiting for '%s' lock from pid %d..." % (name, pid)) - - time.sleep(1) - retries += 1 - continue - - os.write(fd, '%d\n' % (os.getpid(),)) - os.close(fd) - - _locksheld.append(name) - - -def Unlock(name): - """Unlock a given subsystem. - - """ - lockfile = _GetLockFile(name) - - try: - fd = os.open(lockfile, os.O_RDONLY) - except OSError: - raise errors.LockError('Lock "%s" not held.' % (name,)) - - f = os.fdopen(fd, 'r') - pid_str = f.read() - - try: - pid = int(pid_str) - except ValueError: - raise errors.LockError('Unable to determine PID of locking process.') - - if pid != os.getpid(): - raise errors.LockError('Lock not held by me (%d != %d)' % - (os.getpid(), pid,)) - - os.unlink(lockfile) - _locksheld.remove(name) - - -def LockCleanup(): - """Remove all locks. - - """ - for lock in _locksheld: - Unlock(lock) - - def RunCmd(cmd): """Execute a (shell) command. @@ -230,6 +124,7 @@ def RunCmd(cmd): else: strcmd = cmd shell = True + logging.debug("RunCmd '%s'", strcmd) env = os.environ.copy() env["LC_ALL"] = "C" poller = select.poll() @@ -273,36 +168,12 @@ def RunCmd(cmd): status = child.wait() if status >= 0: exitcode = status - signal = None + signal_ = None else: exitcode = None - signal = -status - - return RunResult(exitcode, signal, out, err, strcmd) - - -def RunCmdUnlocked(cmd): - """Execute a shell command without the 'cmd' lock. - - This variant of `RunCmd()` drops the 'cmd' lock before running the - command and re-aquires it afterwards, thus it can be used to call - other ganeti commands. - - The argument and return values are the same as for the `RunCmd()` - function. + signal_ = -status - Args: - cmd - command to run. (str) - - Returns: - `RunResult` - - """ - Unlock('cmd') - ret = RunCmd(cmd) - Lock('cmd') - - return ret + return RunResult(exitcode, signal_, out, err, strcmd) def RemoveFile(filename): @@ -389,8 +260,7 @@ def CheckDict(target, template, logname=None): target[k] = template[k] if missing and logname: - logger.Debug('%s missing keys %s' % - (logname, ', '.join(missing))) + logging.warning('%s missing keys %s', logname, ', '.join(missing)) def IsProcessAlive(pid): @@ -398,9 +268,13 @@ def IsProcessAlive(pid): Returns: true or false, depending on if the pid exists or not - Remarks: zombie processes treated as not alive + Remarks: zombie processes treated as not alive, and giving a pid <= + 0 makes the function to return False. """ + if pid <= 0: + return False + try: f = open("/proc/%d/status" % pid) except IOError, err: @@ -420,6 +294,32 @@ def IsProcessAlive(pid): return alive +def ReadPidFile(pidfile): + """Read the pid from a file. + + @param pidfile: Path to a file containing the pid to be checked + @type pidfile: string (filename) + @return: The process id, if the file exista and contains a valid PID, + otherwise 0 + @rtype: int + + """ + try: + pf = open(pidfile, 'r') + except EnvironmentError, err: + if err.errno != errno.ENOENT: + logging.exception("Can't read pid file?!") + return 0 + + try: + pid = int(pf.read()) + except ValueError, err: + logging.info("Can't parse pid file contents", exc_info=True) + return 0 + + return pid + + def MatchNameComponent(key, name_list): """Try to match a name against a list. @@ -517,7 +417,7 @@ def ListVolumeGroups(): name, size = line.split() size = int(float(size)) except (IndexError, ValueError), err: - logger.Error("Invalid output from vgs (%s): %s" % (err, line)) + logging.error("Invalid output from vgs (%s): %s", err, line) continue retval[name] = size @@ -1039,6 +939,27 @@ def WriteFile(file_name, fn=None, data=None, return result +def FirstFree(seq, base=0): + """Returns the first non-existing integer from seq. + + The seq argument should be a sorted list of positive integers. The + first time the index of an element is smaller than the element + value, the index will be returned. + + The base argument is used to start at a different offset, + i.e. [3, 4, 6] with offset=3 will return 5. + + Example: [0, 1, 3] will return 2. + + """ + for idx, elem in enumerate(seq): + assert elem >= base, "Passed element is higher than base offset" + if elem > idx + base: + # idx is not used + return idx + base + return None + + def all(seq, pred=bool): "Returns True if pred(x) is True for every element in the iterable" for elem in itertools.ifilterfalse(pred, seq): @@ -1135,6 +1056,71 @@ def Daemonize(logfile, noclose_fds=None): return 0 +def DaemonPidFileName(name): + """Compute a ganeti pid file absolute path, given the daemon name. + + """ + return os.path.join(constants.RUN_GANETI_DIR, "%s.pid" % name) + + +def WritePidFile(name): + """Write the current process pidfile. + + The file will be written to constants.RUN_GANETI_DIR/name.pid + + """ + pid = os.getpid() + pidfilename = DaemonPidFileName(name) + if IsProcessAlive(ReadPidFile(pidfilename)): + raise errors.GenericError("%s contains a live process" % pidfilename) + + WriteFile(pidfilename, data="%d\n" % pid) + + +def RemovePidFile(name): + """Remove the current process pidfile. + + Any errors are ignored. + + """ + pid = os.getpid() + pidfilename = DaemonPidFileName(name) + # TODO: we could check here that the file contains our pid + try: + RemoveFile(pidfilename) + except: + pass + + +def KillProcess(pid, signal_=signal.SIGTERM, timeout=30): + """Kill a process given by its pid. + + @type pid: int + @param pid: The PID to terminate. + @type signal_: int + @param signal_: The signal to send, by default SIGTERM + @type timeout: int + @param timeout: The timeout after which, if the process is still alive, + a SIGKILL will be sent. If not positive, no such checking + will be done + + """ + if pid <= 0: + # kill with pid=0 == suicide + raise errors.ProgrammerError("Invalid pid given '%s'" % pid) + + if not IsProcessAlive(pid): + return + os.kill(pid, signal_) + if timeout <= 0: + return + end = time.time() + timeout + while time.time() < end and IsProcessAlive(pid): + time.sleep(0.1) + if IsProcessAlive(pid): + os.kill(pid, signal.SIGKILL) + + def FindFile(name, search_path, test=os.path.exists): """Look for a filesystem object in a given path. @@ -1173,3 +1159,102 @@ def CheckVolumeGroupSize(vglist, vgname, minsize): return ("volume group '%s' too small (%s MiB required, %d MiB found)" % (vgname, minsize, vgsize)) return None + + +def LockedMethod(fn): + """Synchronized object access decorator. + + This decorator is intended to protect access to an object using the + object's own lock which is hardcoded to '_lock'. + + """ + def wrapper(self, *args, **kwargs): + assert hasattr(self, '_lock') + lock = self._lock + lock.acquire() + try: + result = fn(self, *args, **kwargs) + finally: + lock.release() + return result + return wrapper + + +def LockFile(fd): + """Locks a file using POSIX locks. + + """ + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError, err: + if err.errno == errno.EAGAIN: + raise errors.LockError("File already locked") + raise + + +class SignalHandler(object): + """Generic signal handler class. + + It automatically restores the original handler when deconstructed or when + Reset() is called. You can either pass your own handler function in or query + the "called" attribute to detect whether the signal was sent. + + """ + def __init__(self, signum): + """Constructs a new SignalHandler instance. + + @param signum: Single signal number or set of signal numbers + + """ + if isinstance(signum, (int, long)): + self.signum = set([signum]) + else: + self.signum = set(signum) + + self.called = False + + self._previous = {} + try: + for signum in self.signum: + # Setup handler + prev_handler = signal.signal(signum, self._HandleSignal) + try: + self._previous[signum] = prev_handler + except: + # Restore previous handler + signal.signal(signum, prev_handler) + raise + except: + # Reset all handlers + self.Reset() + # Here we have a race condition: a handler may have already been called, + # but there's not much we can do about it at this point. + raise + + def __del__(self): + self.Reset() + + def Reset(self): + """Restore previous handler. + + """ + for signum, prev_handler in self._previous.items(): + signal.signal(signum, prev_handler) + # If successful, remove from dict + del self._previous[signum] + + def Clear(self): + """Unsets "called" flag. + + This function can be used in case a signal may arrive several times. + + """ + self.called = False + + def _HandleSignal(self, signum, frame): + """Actual signal handling function. + + """ + # This is not nice and not absolutely atomic, but it appears to be the only + # solution in Python -- there are no atomic types. + self.called = True