Fix some bugs and some other coding issues found using pylint
mpoints = []
for entry in self._read_fstable('/proc/mounts'):
if entry.mpoint.startswith(os.path.abspath(target)):
mpoints = []
for entry in self._read_fstable('/proc/mounts'):
if entry.mpoint.startswith(os.path.abspath(target)):
- mpoints.append(entry.mpoint)
+ mpoints.append(entry.mpoint)
mpoints.sort()
for mpoint in reversed(mpoints):
mpoints.sort()
for mpoint in reversed(mpoints):
continue
dirname = mpoint
continue
dirname = mpoint
found_ancestor = False
while dirname != '/':
found_ancestor = False
while dirname != '/':
- (dirname, basename) = os.path.split(dirname)
+ (dirname, _) = os.path.split(dirname)
if dirname in excluded:
found_ancestor = True
break
if dirname in excluded:
found_ancestor = True
break
("File injection", ["EnforcePersonality"], ["windows", "linux"])
]
("File injection", ["EnforcePersonality"], ["windows", "linux"])
]
+SYSPREP_PARAM_MAXLEN = 20
class MetadataMonitor(object):
"""Monitors image metadata chages"""
class MetadataMonitor(object):
"""Monitors image metadata chages"""
if len(Kamaki.get_clouds()):
default_item = "Cloud"
else:
if len(Kamaki.get_clouds()):
default_item = "Cloud"
else:
- default_time = "Add/Edit"
+ default_item = "Add/Edit"
- default_time = "Delete"
+ default_item = "Delete"
elif choice == "Cloud":
default_item = "Cloud"
clouds = Kamaki.get_clouds()
elif choice == "Cloud":
default_item = "Cloud"
clouds = Kamaki.get_clouds()
def sysprep_params(session):
def sysprep_params(session):
+ """Collect the needed sysprep parameters"""
d = session['dialog']
image = session['image']
d = session['dialog']
image = session['image']
for name in names:
param = needed[name]
default = available[name] if name in available else ""
for name in names:
param = needed[name]
default = available[name] if name in available else ""
- fields.append(("%s: " % param.description, default, param.maxlen))
+ fields.append(("%s: " % param.description, default,
+ SYSPREP_PARAM_MAXLEN))
txt = "Please provide the following system preparation parameters:"
code, output = d.form(txt, height=13, width=WIDTH, form_height=len(fields),
txt = "Please provide the following system preparation parameters:"
code, output = d.form(txt, height=13, width=WIDTH, form_height=len(fields),
if code in (d.DIALOG_CANCEL, d.DIALOG_ESC):
return False
if code in (d.DIALOG_CANCEL, d.DIALOG_ESC):
return False
for i in range(len(fields)):
param = needed[names[i]]
if param.validator(output[i]):
image.os.sysprep_params[names[i]] = output[i]
else:
d.msgbox("The value you provided for parameter: %s is not valid" %
for i in range(len(fields)):
param = needed[names[i]]
if param.validator(output[i]):
image.os.sysprep_params[names[i]] = output[i]
else:
d.msgbox("The value you provided for parameter: %s is not valid" %
- name, width=SMALL_WIDTH)
+ names[i], width=SMALL_WIDTH)
PAGE_WIDTH = 70
PAGE_HEIGHT = 10
PAGE_WIDTH = 70
PAGE_HEIGHT = 10
+SYSPREP_PARAM_MAXLEN = 20
class WizardExit(Exception):
class WizardExit(Exception):
field_lenght = len(self.fields())
form_height = field_lenght if field_lenght < PAGE_HEIGHT - 4 \
field_lenght = len(self.fields())
form_height = field_lenght if field_lenght < PAGE_HEIGHT - 4 \
(code, output) = d.form(
self.text, width=PAGE_WIDTH, height=PAGE_HEIGHT,
(code, output) = d.form(
self.text, width=PAGE_WIDTH, height=PAGE_HEIGHT,
for name in param_names:
text = needed[name].description
default = available[name] if name in available else ""
for name in param_names:
text = needed[name].description
default = available[name] if name in available else ""
- fields.append(("%s: " % text, default, needed[name].maxlen))
+ fields.append(("%s: " % text, default, SYSPREP_PARAM_MAXLEN))
return fields
def sysprep_params_validate(answer):
return fields
def sysprep_params_validate(answer):
def add_prefix(target):
def wrapper(self, *args):
prefix = args[0]
def add_prefix(target):
def wrapper(self, *args):
prefix = args[0]
- return map(lambda x: prefix + x, target(self, *args))
+ return [prefix + path for path in target(self, *args)]
return wrapper
def sysprep(message, enabled=True, **kwargs):
"""Decorator for system preparation tasks"""
return wrapper
def sysprep(message, enabled=True, **kwargs):
"""Decorator for system preparation tasks"""
- def wrapper(func):
- func.sysprep = True
- func.enabled = enabled
- func.executed = False
+ def wrapper(method):
+ method.sysprep = True
+ method.enabled = enabled
+ method.executed = False
for key, val in kwargs.items():
for key, val in kwargs.items():
- setattr(func, key, val)
+ setattr(method, key, val)
def inner(self, print_message=True):
if print_message:
self.out.output(message)
def inner(self, print_message=True):
if print_message:
self.out.output(message)
-def add_sysprep_param(name, descr, maxlen, default=None,
- validator=lambda x: True):
- """Decorator for init that adds the definition for a system preparation
- parameter
+def add_sysprep_param(name, default, description, validator=lambda x: True):
+ """Decorator for __init__ that adds the definition for a system preparation
+ parameter in an instance of a os_type class
- def wrapper(func):
-
- @wraps(func)
+ def wrapper(init):
+ @wraps(init)
def inner(self, *args, **kwargs):
def inner(self, *args, **kwargs):
-
- func(self, *args, **kwargs)
-
- if not hasattr(self, 'needed_sysprep_params'):
- self.needed_sysprep_params = {}
- getattr(self, 'needed_sysprep_params')[name] = \
- self.SysprepParam(descr, maxlen, validator)
+ init(self, *args, **kwargs)
+ self.needed_sysprep_params[name] = \
+ self.SysprepParam(default, description, validator)
return wrapper
def del_sysprep_param(name):
return wrapper
def del_sysprep_param(name):
- """Decorator for init that deletes a previously added sysprep parameter
- definition .
+ """Decorator for __init__ that deletes a previously added sysprep parameter
+ definition from an instance of a os_type class.
@wraps(func)
def inner(self, *args, **kwargs):
@wraps(func)
def inner(self, *args, **kwargs):
- del self.needed_sysprep_params[nam]
+ del self.needed_sysprep_params[name]
func(self, *args, **kwargs)
func(self, *args, **kwargs)
return wrapper
class OSBase(object):
"""Basic operating system class"""
return wrapper
class OSBase(object):
"""Basic operating system class"""
- SysprepParam = namedtuple('SysprepParam', 'description maxlen validator')
+ SysprepParam = namedtuple('SysprepParam', 'default description validator')
def __init__(self, image, **kargs):
self.image = image
def __init__(self, image, **kargs):
self.image = image
self.g = image.g
self.out = image.out
self.g = image.g
self.out = image.out
+ self.needed_sysprep_params = {}
self.sysprep_params = \
kargs['sysprep_params'] if 'sysprep_params' in kargs else {}
self.meta = {}
self.sysprep_params = \
kargs['sysprep_params'] if 'sysprep_params' in kargs else {}
self.meta = {}
def collect_metadata(self):
"""Collect metadata about the OS"""
def collect_metadata(self):
"""Collect metadata about the OS"""
"""Print enabled and disabled system preparation operations."""
syspreps = self.list_syspreps()
"""Print enabled and disabled system preparation operations."""
syspreps = self.list_syspreps()
- enabled = filter(lambda x: x.enabled, syspreps)
- disabled = filter(lambda x: not x.enabled, syspreps)
+ enabled = [sysprep for sysprep in syspreps if sysprep.enabled]
+ disabled = [sysprep for sysprep in syspreps if not sysprep.enabled]
wrapper = textwrap.TextWrapper()
wrapper.subsequent_indent = '\t'
wrapper = textwrap.TextWrapper()
wrapper.subsequent_indent = '\t'
self.out.output("Needed system preparation parameters:")
self.out.output("Needed system preparation parameters:")
- params = self.needed_sysprep_params()
-
- if len(params) == 0:
+ if len(self.needed_sysprep_params) == 0:
self.out.output("(none)")
return
self.out.output("(none)")
return
+ for name, param in self.needed_sysprep_params.items():
self.out.output("\t%s (%s): %s" %
self.out.output("\t%s (%s): %s" %
- (param.description, param.name,
- self.sysprep_params[param.name] if param.name in
+ (param.description, name,
+ self.sysprep_params[name] if name in
self.sysprep_params else "(none)"))
def do_sysprep(self):
self.sysprep_params else "(none)"))
def do_sysprep(self):
self.out.output('Preparing system for image creation:')
self.out.output('Preparing system for image creation:')
- tasks = self.list_syspreps()
- enabled = filter(lambda x: x.enabled, tasks)
+ enabled = [task for task in self.list_syspreps() if task.enabled]
size = len(enabled)
cnt = 0
size = len(enabled)
cnt = 0
# libguestfs can't handle correct freebsd partitions on a GUID
# Partition Table. We have to do the translation to linux device names
# ourselves
# libguestfs can't handle correct freebsd partitions on a GUID
# Partition Table. We have to do the translation to linux device names
# ourselves
- guid_device = re.compile('^/dev/((?:ada)|(?:vtbd))(\d+)p(\d+)$')
+ guid_device = re.compile(r'^/dev/((?:ada)|(?:vtbd))(\d+)p(\d+)$')
mopts = "ufstype=ufs2,%s" % ('ro' if readonly else 'rw')
for mp, dev in self._mountpoints():
mopts = "ufstype=ufs2,%s" % ('ro' if readonly else 'rw')
for mp, dev in self._mountpoints():
def _get_passworded_users(self):
"""Returns a list of non-locked user accounts"""
users = []
def _get_passworded_users(self):
"""Returns a list of non-locked user accounts"""
users = []
- regexp = re.compile('(\S+):((?:!\S+)|(?:[^!*]\S+)|):(?:\S*:){6}')
+ regexp = re.compile(r'(\S+):((?:!\S+)|(?:[^!*]\S+)|):(?:\S*:){6}')
for line in self.g.cat('/etc/shadow').splitlines():
match = regexp.match(line)
for line in self.g.cat('/etc/shadow').splitlines():
match = regexp.match(line)
"""This module hosts OS-specific code common to all Unix-like OSs."""
"""This module hosts OS-specific code common to all Unix-like OSs."""
from image_creator.os_type import OSBase, sysprep
from image_creator.os_type import OSBase, sysprep
- @sysprep('Removing files u)nder /var/cache')
+ @sysprep('Removing files under /var/cache')
def cleanup_cache(self):
"""Remove all regular files under /var/cache"""
def cleanup_cache(self):
"""Remove all regular files under /var/cache"""
Windows OSs."""
from image_creator.os_type import OSBase, sysprep, add_sysprep_param
Windows OSs."""
from image_creator.os_type import OSBase, sysprep, add_sysprep_param
-from image_creator.util import FatalError, check_guestfs_version, get_command
+from image_creator.util import FatalError, check_guestfs_version
from image_creator.winexe import WinEXE, WinexeTimeout
import hivex
from image_creator.winexe import WinEXE, WinexeTimeout
import hivex
class Windows(OSBase):
"""OS class for Windows"""
class Windows(OSBase):
"""OS class for Windows"""
- @add_sysprep_param('password', 'Image Administrator Password', 20)
+ @add_sysprep_param('password', None, 'Image Administrator Password')
def __init__(self, image, **kargs):
super(Windows, self).__init__(image, **kargs)
def __init__(self, image, **kargs):
super(Windows, self).__init__(image, **kargs)
assert self.system_drive
self.product_name = self.g.inspect_get_product_name(self.root)
assert self.system_drive
self.product_name = self.g.inspect_get_product_name(self.root)
@sysprep('Disabling IPv6 privacy extensions')
def disable_ipv6_privacy_extensions(self):
@sysprep('Disabling IPv6 privacy extensions')
def disable_ipv6_privacy_extensions(self):
- "cscript \Windows\system32\slmgr.vbs /ipk %s" % setup_key)
+ r"cscript \Windows\system32\slmgr.vbs /ipk %s" % setup_key)
@sysprep('Shrinking the last filesystem')
def shrink(self):
@sysprep('Shrinking the last filesystem')
def shrink(self):
raise FatalError("Image is already syspreped!")
txt = "System preparation parameter: `%s' is needed but missing!"
raise FatalError("Image is already syspreped!")
txt = "System preparation parameter: `%s' is needed but missing!"
- for param in self.needed_sysprep_params:
- if param not in self.sysprep_params:
+ for name, param in self.needed_sysprep_params.items():
+ if name not in self.sysprep_params:
raise FatalError(txt % param)
self.mount(readonly=False)
raise FatalError(txt % param)
self.mount(readonly=False)
# guestfs_shutdown which is the prefered way to shutdown the backend
# process was introduced in version 1.19.16
if check_guestfs_version(self.g, 1, 19, 16) >= 0:
# guestfs_shutdown which is the prefered way to shutdown the backend
# process was introduced in version 1.19.16
if check_guestfs_version(self.g, 1, 19, 16) >= 0:
- ret = self.g.shutdown()
- ret = self.g.kill_subprocess()
+ self.g.kill_subprocess()
self.out.output('Preparing system for image creation:')
tasks = self.list_syspreps()
self.out.output('Preparing system for image creation:')
tasks = self.list_syspreps()
- enabled = filter(lambda x: x.enabled, tasks)
+ enabled = [task for task in tasks if task.enabled]
size = len(enabled)
# Make sure shrink runs in the end, before ms sysprep
size = len(enabled)
# Make sure shrink runs in the end, before ms sysprep
- enabled = filter(lambda x: self.sysprep_info(x).name != 'shrink',
- enabled)
+ enabled = [task for task in enabled if
+ self.sysprep_info(task).name != 'shrink']
if len(enabled) != size:
enabled.append(self.shrink)
if len(enabled) != size:
enabled.append(self.shrink)
# Make sure the ms sysprep is the last task to run if it is enabled
# Make sure the ms sysprep is the last task to run if it is enabled
- enabled = filter(
- lambda x: self.sysprep_info(x).name != 'microsoft-sysprep',
- enabled)
+ enabled = [task for task in enabled if
+ self.sysprep_info(task).name != 'microsoft-sysprep']
ms_sysprep_enabled = False
if len(enabled) != size:
ms_sysprep_enabled = False
if len(enabled) != size:
def _wait_vm_boot(self, vm, fname, msg):
"""Wait until a message appears on a file or the vm process dies"""
def _wait_vm_boot(self, vm, fname, msg):
"""Wait until a message appears on a file or the vm process dies"""
- for i in range(BOOT_TIMEOUT):
+ for _ in range(BOOT_TIMEOUT):
time.sleep(1)
with open(fname) as f:
for line in f:
time.sleep(1)
with open(fname) as f:
for line in f:
path = "%s/system32/config/%s" % (systemroot, regfile)
try:
path = self.g.case_sensitive_path(path)
path = "%s/system32/config/%s" % (systemroot, regfile)
try:
path = self.g.case_sensitive_path(path)
- except RuntimeError as e:
+ except RuntimeError as error:
raise FatalError("Unable to retrieve registry file: %s. Reason: %s"
raise FatalError("Unable to retrieve registry file: %s. Reason: %s"
+ % (regfile, str(error)))
return path
def _enable_os_monitor(self):
return path
def _enable_os_monitor(self):
def _get_users(self):
"""Returns a list of users found in the images"""
def _get_users(self):
"""Returns a list of users found in the images"""
- path = self._registry_file_path('SAM')
samfd, sam = tempfile.mkstemp()
try:
os.close(samfd)
samfd, sam = tempfile.mkstemp()
try:
os.close(samfd)
- self.g.download(path, sam)
+ self.g.download(self._registry_file_path('SAM'), sam)
log.file.write(stdout)
finally:
log.close()
log.file.write(stdout)
finally:
log.close()
- self.out.output("failed! See: `%' for the full output" % log.name)
+ self.out.output("failed! See: `%s' for the full output" % log.name)
if i < CONNECTION_RETRIES - 1:
self.out.output("Retrying ...", False)
raise FatalError("Connection to the VM failed after %d retries" %
if i < CONNECTION_RETRIES - 1:
self.out.output("Retrying ...", False)
raise FatalError("Connection to the VM failed after %d retries" %
self.serial = serial
def random_mac():
self.serial = serial
def random_mac():
+ """creates a random mac address"""
mac = [0x00, 0x16, 0x3e,
random.randint(0x00, 0x7f),
random.randint(0x00, 0xff),
random.randint(0x00, 0xff)]
mac = [0x00, 0x16, 0x3e,
random.randint(0x00, 0x7f),
random.randint(0x00, 0xff),
random.randint(0x00, 0xff)]
- return ':'.join(map(lambda x: "%02x" % x, mac))
+ return ':'.join(['%02x' % x for x in mac])
# Use ganeti's VNC port range for a random vnc port
self.display = random.randint(11000, 14999) - 5900
# Use ganeti's VNC port range for a random vnc port
self.display = random.randint(11000, 14999) - 5900
if self.isalive():
self.process.kill()
self.process.wait()
if self.isalive():
self.process.kill()
self.process.wait()
- self.out.output("timed-out")
raise FatalError("VM destroy timed-out")
signal.signal(signal.SIGALRM, handler)
raise FatalError("VM destroy timed-out")
signal.signal(signal.SIGALRM, handler)