import textwrap
import re
from collections import namedtuple
+from functools import wraps
def os_cls(distro, osfamily):
def add_prefix(target):
+ """Decorator that adds a prefix to the result of a function"""
def wrapper(self, *args):
prefix = args[0]
- return map(lambda x: prefix + x, target(self, *args))
+ return [prefix + path for path in target(self, *args)]
return wrapper
-def sysprep(enabled=True):
+def sysprep(message, enabled=True, **kwargs):
"""Decorator for system preparation tasks"""
+ def wrapper(method):
+ method.sysprep = True
+ method.enabled = enabled
+ method.executed = False
+
+ for key, val in kwargs.items():
+ setattr(method, key, val)
+
+ @wraps(method)
+ def inner(self, print_message=True):
+ if print_message:
+ self.out.output(message)
+ return method(self)
+
+ return inner
+ return wrapper
+
+
+def add_sysprep_param(name, type, default, descr, validate=lambda x: True):
+ """Decorator for __init__ that adds the definition for a system preparation
+ parameter in an instance of a os_type class
+ """
+ def wrapper(init):
+ @wraps(init)
+ def inner(self, *args, **kwargs):
+ init(self, *args, **kwargs)
+ self.needed_sysprep_params[name] = \
+ self.SysprepParam(type, default, descr, validate)
+ if default is not None:
+ self.sysprep_params[name] = default
+ return inner
+ return wrapper
+
+
+def del_sysprep_param(name):
+ """Decorator for __init__ that deletes a previously added sysprep parameter
+ definition from an instance of a os_type class.
+ """
def wrapper(func):
- func.sysprep = True
- func.enabled = enabled
- func.executed = False
- return func
+ @wraps(func)
+ def inner(self, *args, **kwargs):
+ del self.needed_sysprep_params[name]
+ func(self, *args, **kwargs)
+ return inner
return wrapper
"""Basic operating system class"""
SysprepParam = namedtuple('SysprepParam',
- 'name description length validator')
+ ['type', 'default', 'description', 'validate'])
def __init__(self, image, **kargs):
self.image = image
self.root = image.root
- self.g = image.g
self.out = image.out
+ self.needed_sysprep_params = {}
self.sysprep_params = \
kargs['sysprep_params'] if 'sysprep_params' in kargs else {}
self.meta = {}
+ self.mounted = False
+
+ # Many guestfs compilations don't support scrub
+ self._scrub_support = True
+ try:
+ self.image.g.available(['scrub'])
+ except RuntimeError:
+ self._scrub_support = False
def collect_metadata(self):
"""Collect metadata about the OS"""
self.out.output()
- def needed_sysprep_params(self):
- """Returns a list of needed sysprep parameters. Each element in the
- list is a SysprepParam object.
- """
- return []
-
def list_syspreps(self):
"""Returns a list of sysprep objects"""
objs = [getattr(self, name) for name in dir(self)
"""Returns information about a sysprep object"""
assert self._is_sysprep(obj), "Object is not a sysprep"
- return (obj.__name__.replace('_', '-'), textwrap.dedent(obj.__doc__))
+ SysprepInfo = namedtuple("SysprepInfo", "name description")
+
+ return SysprepInfo(obj.__name__.replace('_', '-'),
+ textwrap.dedent(obj.__doc__))
def get_sysprep_by_name(self, name):
"""Returns the sysprep object with the given name"""
"""Print enabled and disabled system preparation operations."""
syspreps = self.list_syspreps()
- enabled = filter(lambda x: x.enabled, syspreps)
- disabled = filter(lambda x: not x.enabled, syspreps)
+ enabled = [sysprep for sysprep in syspreps if sysprep.enabled]
+ disabled = [sysprep for sysprep in syspreps if not sysprep.enabled]
wrapper = textwrap.TextWrapper()
wrapper.subsequent_indent = '\t'
self.out.output("Needed system preparation parameters:")
- params = self.needed_sysprep_params()
-
- if len(params) == 0:
+ if len(self.needed_sysprep_params) == 0:
self.out.output("(none)")
return
- for param in params:
+ for name, param in self.needed_sysprep_params.items():
self.out.output("\t%s (%s): %s" %
- (param.description, param.name,
- self.sysprep_params[param.name] if param.name in
+ (param.description, name,
+ self.sysprep_params[name] if name in
self.sysprep_params else "(none)"))
def do_sysprep(self):
self.out.output('Preparing system for image creation:')
- tasks = self.list_syspreps()
- enabled = filter(lambda x: x.enabled, tasks)
+ enabled = [task for task in self.list_syspreps() if task.enabled]
size = len(enabled)
cnt = 0
"""Umount all mounted filesystems."""
self.out.output("Umounting the media ...", False)
- self.g.umount_all()
+ self.image.g.umount_all()
self.mounted = False
self.out.success('done')
@add_prefix
def _ls(self, directory):
"""List the name of all files under a directory"""
- return self.g.ls(directory)
+ return self.image.g.ls(directory)
@add_prefix
def _find(self, directory):
"""List the name of all files recursively under a directory"""
- return self.g.find(directory)
+ return self.image.g.find(directory)
def _foreach_file(self, directory, action, **kargs):
"""Perform an action recursively on all files under a directory.
ftype = None if 'ftype' not in kargs else kargs['ftype']
has_ftype = lambda x, y: y is None and True or x['ftyp'] == y
- for f in self.g.readdir(directory):
+ for f in self.image.g.readdir(directory):
if f['name'] in ('.', '..'):
continue
def _do_collect_metadata(self):
"""helper method for collect_metadata"""
- self.meta['ROOT_PARTITION'] = "%d" % self.g.part_to_partnum(self.root)
- self.meta['OSFAMILY'] = self.g.inspect_get_type(self.root)
- self.meta['OS'] = self.g.inspect_get_distro(self.root)
+ self.meta['ROOT_PARTITION'] = \
+ "%d" % self.image.g.part_to_partnum(self.root)
+ self.meta['OSFAMILY'] = self.image.g.inspect_get_type(self.root)
+ self.meta['OS'] = self.image.g.inspect_get_distro(self.root)
if self.meta['OS'] == "unknown":
self.meta['OS'] = self.meta['OSFAMILY']
- self.meta['DESCRIPTION'] = self.g.inspect_get_product_name(self.root)
+ self.meta['DESCRIPTION'] = \
+ self.image.g.inspect_get_product_name(self.root)
def _do_mount(self, readonly):
"""helper method for mount"""
try:
- self.g.mount_options('ro' if readonly else 'rw', self.root, '/')
+ self.image.g.mount_options(
+ 'ro' if readonly else 'rw', self.root, '/')
except RuntimeError as msg:
self.out.warn("unable to mount the root partition: %s" % msg)
return False