X-Git-Url: https://code.grnet.gr/git/snf-image-creator/blobdiff_plain/797e0df72ff246b0084ca244deded8fd4972bc6b..a38447ba998e839f4b79aebcbce40a1cc5e2b7d0:/image_creator/os_type/__init__.py?ds=sidebyside diff --git a/image_creator/os_type/__init__.py b/image_creator/os_type/__init__.py index 063b410..5298569 100644 --- a/image_creator/os_type/__init__.py +++ b/image_creator/os_type/__init__.py @@ -41,6 +41,8 @@ from image_creator.util import FatalError import textwrap import re +from collections import namedtuple +from functools import wraps def os_cls(distro, osfamily): @@ -60,33 +62,87 @@ def os_cls(distro, osfamily): def add_prefix(target): + """Decorator that adds a prefix to the result of a function""" def wrapper(self, *args): prefix = args[0] - return map(lambda x: prefix + x, target(self, *args)) + return [prefix + path for path in target(self, *args)] return wrapper -def sysprep(enabled=True): +def sysprep(message, enabled=True, **kwargs): """Decorator for system preparation tasks""" + def wrapper(method): + method.sysprep = True + method.enabled = enabled + method.executed = False + + for key, val in kwargs.items(): + setattr(method, key, val) + + @wraps(method) + def inner(self, print_message=True): + if print_message: + self.out.output(message) + return method(self) + + return inner + return wrapper + + +def add_sysprep_param(name, type, default, descr, validate=lambda x: True): + """Decorator for __init__ that adds the definition for a system preparation + parameter in an instance of a os_type class + """ + def wrapper(init): + @wraps(init) + def inner(self, *args, **kwargs): + init(self, *args, **kwargs) + self.needed_sysprep_params[name] = \ + self.SysprepParam(type, default, descr, validate) + if default is not None: + self.sysprep_params[name] = default + return inner + return wrapper + + +def del_sysprep_param(name): + """Decorator for __init__ that deletes a previously added sysprep parameter + definition from an instance of a os_type class. + """ def wrapper(func): - func.sysprep = True - func.enabled = enabled - func.executed = False - return func + @wraps(func) + def inner(self, *args, **kwargs): + del self.needed_sysprep_params[name] + func(self, *args, **kwargs) + return inner return wrapper class OSBase(object): """Basic operating system class""" - def __init__(self, image): + SysprepParam = namedtuple('SysprepParam', + ['type', 'default', 'description', 'validate']) + + def __init__(self, image, **kargs): self.image = image self.root = image.root - self.g = image.g self.out = image.out + self.needed_sysprep_params = {} + self.sysprep_params = \ + kargs['sysprep_params'] if 'sysprep_params' in kargs else {} + self.meta = {} + self.mounted = False + + # Many guestfs compilations don't support scrub + self._scrub_support = True + try: + self.image.g.available(['scrub']) + except RuntimeError: + self._scrub_support = False def collect_metadata(self): """Collect metadata about the OS""" @@ -113,7 +169,10 @@ class OSBase(object): """Returns information about a sysprep object""" assert self._is_sysprep(obj), "Object is not a sysprep" - return (obj.__name__.replace('_', '-'), textwrap.dedent(obj.__doc__)) + SysprepInfo = namedtuple("SysprepInfo", "name description") + + return SysprepInfo(obj.__name__.replace('_', '-'), + textwrap.dedent(obj.__doc__)) def get_sysprep_by_name(self, name): """Returns the sysprep object with the given name""" @@ -144,8 +203,8 @@ class OSBase(object): """Print enabled and disabled system preparation operations.""" syspreps = self.list_syspreps() - enabled = filter(lambda x: x.enabled, syspreps) - disabled = filter(lambda x: not x.enabled, syspreps) + enabled = [sysprep for sysprep in syspreps if sysprep.enabled] + disabled = [sysprep for sysprep in syspreps if not sysprep.enabled] wrapper = textwrap.TextWrapper() wrapper.subsequent_indent = '\t' @@ -170,6 +229,21 @@ class OSBase(object): descr = wrapper.fill(textwrap.dedent(sysprep.__doc__)) self.out.output(' %s:\n%s\n' % (name, descr)) + def print_sysprep_params(self): + """Print the system preparation parameter the user may use""" + + self.out.output("Needed system preparation parameters:") + + if len(self.needed_sysprep_params) == 0: + self.out.output("(none)") + return + + for name, param in self.needed_sysprep_params.items(): + self.out.output("\t%s (%s): %s" % + (param.description, name, + self.sysprep_params[name] if name in + self.sysprep_params else "(none)")) + def do_sysprep(self): """Prepare system for image creation.""" @@ -179,8 +253,7 @@ class OSBase(object): self.out.output('Preparing system for image creation:') - tasks = self.list_syspreps() - enabled = filter(lambda x: x.enabled, tasks) + enabled = [task for task in self.list_syspreps() if task.enabled] size = len(enabled) cnt = 0 @@ -214,7 +287,7 @@ class OSBase(object): """Umount all mounted filesystems.""" self.out.output("Umounting the media ...", False) - self.g.umount_all() + self.image.g.umount_all() self.mounted = False self.out.success('done') @@ -225,12 +298,12 @@ class OSBase(object): @add_prefix def _ls(self, directory): """List the name of all files under a directory""" - return self.g.ls(directory) + return self.image.g.ls(directory) @add_prefix def _find(self, directory): """List the name of all files recursively under a directory""" - return self.g.find(directory) + return self.image.g.find(directory) def _foreach_file(self, directory, action, **kargs): """Perform an action recursively on all files under a directory. @@ -247,6 +320,10 @@ class OSBase(object): * exclude: Exclude all files that follow this pattern. """ + if not self.image.g.is_dir(directory): + self.out.warn("Directory: `%s' does not exist!" % directory) + return + maxdepth = None if 'maxdepth' not in kargs else kargs['maxdepth'] if maxdepth == 0: return @@ -259,7 +336,7 @@ class OSBase(object): ftype = None if 'ftype' not in kargs else kargs['ftype'] has_ftype = lambda x, y: y is None and True or x['ftyp'] == y - for f in self.g.readdir(directory): + for f in self.image.g.readdir(directory): if f['name'] in ('.', '..'): continue @@ -276,17 +353,20 @@ class OSBase(object): def _do_collect_metadata(self): """helper method for collect_metadata""" - self.meta['ROOT_PARTITION'] = "%d" % self.g.part_to_partnum(self.root) - self.meta['OSFAMILY'] = self.g.inspect_get_type(self.root) - self.meta['OS'] = self.g.inspect_get_distro(self.root) + self.meta['ROOT_PARTITION'] = \ + "%d" % self.image.g.part_to_partnum(self.root) + self.meta['OSFAMILY'] = self.image.g.inspect_get_type(self.root) + self.meta['OS'] = self.image.g.inspect_get_distro(self.root) if self.meta['OS'] == "unknown": self.meta['OS'] = self.meta['OSFAMILY'] - self.meta['DESCRIPTION'] = self.g.inspect_get_product_name(self.root) + self.meta['DESCRIPTION'] = \ + self.image.g.inspect_get_product_name(self.root) def _do_mount(self, readonly): """helper method for mount""" try: - self.g.mount_options('ro' if readonly else 'rw', self.root, '/') + self.image.g.mount_options( + 'ro' if readonly else 'rw', self.root, '/') except RuntimeError as msg: self.out.warn("unable to mount the root partition: %s" % msg) return False