Add daemon for instance import and export
[ganeti-local] / lib / backend.py
index 2569532..1271681 100644 (file)
@@ -23,6 +23,8 @@
 
 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
      the L{UploadFile} function
+@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
+     in the L{_CleanDirectory} function
 
 """
 
@@ -40,7 +42,6 @@ import time
 import stat
 import errno
 import re
-import subprocess
 import random
 import logging
 import tempfile
@@ -58,6 +59,15 @@ from ganeti import ssconf
 
 
 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
+_ALLOWED_CLEAN_DIRS = frozenset([
+  constants.DATA_DIR,
+  constants.JOB_QUEUE_ARCHIVE_DIR,
+  constants.QUEUE_DIR,
+  constants.CRYPTO_KEYS_DIR,
+  ])
+_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
+_X509_KEY_FILE = "key"
+_X509_CERT_FILE = "cert"
 
 
 class RPCFail(Exception):
@@ -144,6 +154,10 @@ def _CleanDirectory(path, exclude=None):
       to the empty list
 
   """
+  if path not in _ALLOWED_CLEAN_DIRS:
+    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
+          path)
+
   if not os.path.isdir(path):
     return
   if exclude is None:
@@ -153,7 +167,7 @@ def _CleanDirectory(path, exclude=None):
     exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
-    full_name = os.path.normpath(os.path.join(path, rel_name))
+    full_name = utils.PathJoin(path, rel_name)
     if full_name in exclude:
       continue
     if os.path.isfile(full_name) and not os.path.islink(full_name):
@@ -173,7 +187,7 @@ def _BuildUploadFileList():
     constants.VNC_PASSWORD_FILE,
     constants.RAPI_CERT_FILE,
     constants.RAPI_USERS_FILE,
-    constants.HMAC_CLUSTER_KEY,
+    constants.CONFD_HMAC_KEY,
     ])
 
   for hv_name in constants.HYPER_TYPES:
@@ -375,6 +389,7 @@ def LeaveCluster(modify_ssh_setup):
 
   """
   _CleanDirectory(constants.DATA_DIR)
+  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
   JobQueuePurge()
 
   if modify_ssh_setup:
@@ -389,9 +404,9 @@ def LeaveCluster(modify_ssh_setup):
       logging.exception("Error while processing ssh files")
 
   try:
-    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
+    utils.RemoveFile(constants.CONFD_HMAC_KEY)
     utils.RemoveFile(constants.RAPI_CERT_FILE)
-    utils.RemoveFile(constants.SSL_CERT_FILE)
+    utils.RemoveFile(constants.NODED_CERT_FILE)
   except: # pylint: disable-msg=W0702
     logging.exception("Error while removing cluster secrets")
 
@@ -470,7 +485,11 @@ def VerifyNode(what, cluster_name):
   if constants.NV_HYPERVISOR in what:
     result[constants.NV_HYPERVISOR] = tmp = {}
     for hv_name in what[constants.NV_HYPERVISOR]:
-      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+      try:
+        val = hypervisor.GetHypervisor(hv_name).Verify()
+      except errors.HypervisorError, err:
+        val = "Error while checking hypervisor: %s" % str(err)
+      tmp[hv_name] = val
 
   if constants.NV_FILELIST in what:
     result[constants.NV_FILELIST] = utils.FingerprintFiles(
@@ -510,11 +529,19 @@ def VerifyNode(what, cluster_name):
                        " and ".join(fail))
 
   if constants.NV_LVLIST in what:
-    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+    try:
+      val = GetVolumeList(what[constants.NV_LVLIST])
+    except RPCFail, err:
+      val = str(err)
+    result[constants.NV_LVLIST] = val
 
   if constants.NV_INSTANCELIST in what:
-    result[constants.NV_INSTANCELIST] = GetInstanceList(
-      what[constants.NV_INSTANCELIST])
+    # GetInstanceList can fail
+    try:
+      val = GetInstanceList(what[constants.NV_INSTANCELIST])
+    except RPCFail, err:
+      val = str(err)
+    result[constants.NV_INSTANCELIST] = val
 
   if constants.NV_VGLIST in what:
     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
@@ -640,21 +667,23 @@ def NodeVolumes():
           result.output)
 
   def parse_dev(dev):
-    if '(' in dev:
-      return dev.split('(')[0]
-    else:
-      return dev
+    return dev.split('(')[0]
+
+  def handle_dev(dev):
+    return [parse_dev(x) for x in dev.split(",")]
 
   def map_line(line):
-    return {
-      'name': line[0].strip(),
-      'size': line[1].strip(),
-      'dev': parse_dev(line[2].strip()),
-      'vg': line[3].strip(),
-    }
+    line = [v.strip() for v in line]
+    return [{'name': line[0], 'size': line[1],
+             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
 
-  return [map_line(line.split('|')) for line in result.stdout.splitlines()
-          if line.count('|') >= 3]
+  all_devs = []
+  for line in result.stdout.splitlines():
+    if line.count('|') >= 3:
+      all_devs.extend(map_line(line.split('|')))
+    else:
+      logging.warning("Strange line in the output from lvs: '%s'", line)
+  return all_devs
 
 
 def BridgesExist(bridges_list):
@@ -789,24 +818,44 @@ def GetAllInstancesInfo(hypervisor_list):
   return output
 
 
-def InstanceOsAdd(instance, reinstall):
+def _InstanceLogName(kind, os_name, instance):
+  """Compute the OS log filename for a given instance and operation.
+
+  The instance name and os name are passed in as strings since not all
+  operations have these as part of an instance object.
+
+  @type kind: string
+  @param kind: the operation type (e.g. add, import, etc.)
+  @type os_name: string
+  @param os_name: the os name
+  @type instance: string
+  @param instance: the name of the instance being imported/added/etc.
+
+  """
+  base = ("%s-%s-%s-%s.log" %
+          (kind, os_name, instance, utils.TimestampForFilename()))
+  return utils.PathJoin(constants.LOG_OS_DIR, base)
+
+
+def InstanceOsAdd(instance, reinstall, debug):
   """Add an OS to an instance.
 
   @type instance: L{objects.Instance}
   @param instance: Instance whose OS is to be installed
   @type reinstall: boolean
   @param reinstall: whether this is an instance reinstall
+  @type debug: integer
+  @param debug: debug level, passed to the OS scripts
   @rtype: None
 
   """
   inst_os = OSFromDisk(instance.os)
 
-  create_env = OSEnvironment(instance, inst_os)
+  create_env = OSEnvironment(instance, inst_os, debug)
   if reinstall:
     create_env['INSTANCE_REINSTALL'] = "1"
 
-  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
-                                     instance.name, int(time.time()))
+  logfile = _InstanceLogName("add", instance.os, instance.name)
 
   result = utils.RunCmd([inst_os.create_script], env=create_env,
                         cwd=inst_os.path, output=logfile,)
@@ -820,25 +869,26 @@ def InstanceOsAdd(instance, reinstall):
           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
 
 
-def RunRenameInstance(instance, old_name):
+def RunRenameInstance(instance, old_name, debug):
   """Run the OS rename script for an instance.
 
   @type instance: L{objects.Instance}
   @param instance: Instance whose OS is to be installed
   @type old_name: string
   @param old_name: previous instance name
+  @type debug: integer
+  @param debug: debug level, passed to the OS scripts
   @rtype: boolean
   @return: the success of the operation
 
   """
   inst_os = OSFromDisk(instance.os)
 
-  rename_env = OSEnvironment(instance, inst_os)
+  rename_env = OSEnvironment(instance, inst_os, debug)
   rename_env['OLD_INSTANCE_NAME'] = old_name
 
-  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
-                                           old_name,
-                                           instance.name, int(time.time()))
+  logfile = _InstanceLogName("rename", instance.os,
+                             "%s-%s" % (old_name, instance.name))
 
   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
                         cwd=inst_os.path, output=logfile)
@@ -884,7 +934,7 @@ def _GetVGInfo(vg_name):
         "vg_free": int(round(float(valarr[1]), 0)),
         "pv_count": int(valarr[2]),
         }
-    except ValueError, err:
+    except (TypeError, ValueError), err:
       logging.exception("Fail to parse vgs output: %s", err)
   else:
     logging.error("vgs output has the wrong number of fields (expected"
@@ -893,8 +943,8 @@ def _GetVGInfo(vg_name):
 
 
 def _GetBlockDevSymlinkPath(instance_name, idx):
-  return os.path.join(constants.DISK_LINKS_DIR,
-                      "%s:%d" % (instance_name, idx))
+  return utils.PathJoin(constants.DISK_LINKS_DIR,
+                        "%s:%d" % (instance_name, idx))
 
 
 def _SymlinkBlockDev(instance_name, device_path, idx):
@@ -1431,6 +1481,8 @@ def BlockdevRemovechildren(parent_cdev, new_cdevs):
       else:
         devs.append(bd.dev_path)
     else:
+      if not utils.IsNormAbsPath(rpath):
+        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
       devs.append(rpath)
   parent_bdev.RemoveChildren(devs)
 
@@ -1479,6 +1531,22 @@ def _RecursiveFindBD(disk):
   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
 
 
+def _OpenRealBD(disk):
+  """Opens the underlying block device of a disk.
+
+  @type disk: L{objects.Disk}
+  @param disk: the disk object we want to open
+
+  """
+  real_disk = _RecursiveFindBD(disk)
+  if real_disk is None:
+    _Fail("Block device '%s' is not set up", disk)
+
+  real_disk.Open()
+
+  return real_disk
+
+
 def BlockdevFind(disk):
   """Check if a device is activated.
 
@@ -1542,11 +1610,7 @@ def BlockdevExport(disk, dest_node, dest_path, cluster_name):
   @rtype: None
 
   """
-  real_disk = _RecursiveFindBD(disk)
-  if real_disk is None:
-    _Fail("Block device '%s' is not set up", disk)
-
-  real_disk.Open()
+  real_disk = _OpenRealBD(disk)
 
   # the block size on the read dd is 1MiB to match our units
   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
@@ -1652,7 +1716,7 @@ def _OSOndiskAPIVersion(os_dir):
       data holding either the vaid versions or an error message
 
   """
-  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
+  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
 
   try:
     st = os.stat(api_file)
@@ -1708,7 +1772,7 @@ def DiagnoseOS(top_dirs=None):
         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
         break
       for name in f_names:
-        os_path = os.path.sep.join([dir_name, name])
+        os_path = utils.PathJoin(dir_name, name)
         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
         if status:
           diagnose = ""
@@ -1759,7 +1823,7 @@ def _TryOSFromDisk(name, base_dir=None):
     os_files[constants.OS_VARIANTS_FILE] = ''
 
   for filename in os_files:
-    os_files[filename] = os.path.sep.join([os_dir, filename])
+    os_files[filename] = utils.PathJoin(os_dir, filename)
 
     try:
       st = os.stat(os_files[filename])
@@ -1856,11 +1920,7 @@ def OSEnvironment(instance, inst_os, debug=0):
       variant = inst_os.supported_variants[0]
     result['OS_VARIANT'] = variant
   for idx, disk in enumerate(instance.disks):
-    real_disk = _RecursiveFindBD(disk)
-    if real_disk is None:
-      raise errors.BlockDeviceError("Block device '%s' is not set up" %
-                                    str(disk))
-    real_disk.Open()
+    real_disk = _OpenRealBD(disk)
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
     result['DISK_%d_ACCESS' % idx] = disk.mode
     if constants.HV_DISK_TYPE in instance.hvparams:
@@ -1890,6 +1950,7 @@ def OSEnvironment(instance, inst_os, debug=0):
 
   return result
 
+
 def BlockdevGrow(disk, amount):
   """Grow a stack of block devices.
 
@@ -1926,19 +1987,15 @@ def BlockdevSnapshot(disk):
   @return: snapshot disk path
 
   """
-  if disk.children:
-    if len(disk.children) == 1:
-      # only one child, let's recurse on it
-      return BlockdevSnapshot(disk.children[0])
-    else:
-      # more than one child, choose one that matches
-      for child in disk.children:
-        if child.size == disk.size:
-          # return implies breaking the loop
-          return BlockdevSnapshot(child)
+  if disk.dev_type == constants.LD_DRBD8:
+    if not disk.children:
+      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
+            disk.unique_id)
+    return BlockdevSnapshot(disk.children[0])
   elif disk.dev_type == constants.LD_LV:
     r_dev = _RecursiveFindBD(disk)
     if r_dev is not None:
+      # FIXME: choose a saner value for the snapshot size
       # let's stay on the safe side and ask for the full size, for now
       return r_dev.Snapshot(disk.size)
     else:
@@ -1948,7 +2005,7 @@ def BlockdevSnapshot(disk):
           disk.unique_id, disk.dev_type)
 
 
-def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
+def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
   """Export a block device snapshot to a remote node.
 
   @type disk: L{objects.Disk}
@@ -1962,28 +2019,26 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
   @type idx: int
   @param idx: the index of the disk in the instance's disk list,
       used to export to the OS scripts environment
+  @type debug: integer
+  @param debug: debug level, passed to the OS scripts
   @rtype: None
 
   """
   inst_os = OSFromDisk(instance.os)
-  export_env = OSEnvironment(instance, inst_os)
+  export_env = OSEnvironment(instance, inst_os, debug)
 
   export_script = inst_os.export_script
 
-  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
-                                     instance.name, int(time.time()))
+  logfile = _InstanceLogName("export", inst_os.name, instance.name)
   if not os.path.exists(constants.LOG_OS_DIR):
     os.mkdir(constants.LOG_OS_DIR, 0750)
-  real_disk = _RecursiveFindBD(disk)
-  if real_disk is None:
-    _Fail("Block device '%s' is not set up", disk)
 
-  real_disk.Open()
+  real_disk = _OpenRealBD(disk)
 
   export_env['EXPORT_DEVICE'] = real_disk.dev_path
   export_env['EXPORT_INDEX'] = str(idx)
 
-  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
+  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
   destfile = disk.physical_id[1]
 
   # the target command is built out of three individual commands,
@@ -1994,8 +2049,8 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
 
   comprcmd = "gzip"
 
-  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
-                                destdir, destdir, destfile)
+  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
+                                destdir, utils.PathJoin(destdir, destfile))
   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
                                                    constants.GANETI_RUNAS,
                                                    destcmd)
@@ -2023,8 +2078,8 @@ def FinalizeExport(instance, snap_disks):
   @rtype: None
 
   """
-  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
-  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
+  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
+  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
 
   config = objects.SerializableConfigParser()
 
@@ -2042,6 +2097,7 @@ def FinalizeExport(instance, snap_disks):
   config.set(constants.INISECT_INS, 'vcpus', '%d' %
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
+  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
 
   nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
@@ -2049,8 +2105,9 @@ def FinalizeExport(instance, snap_disks):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
-               '%s' % nic.bridge)
+    for param in constants.NICS_PARAMETER_TYPES:
+      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
+                 '%s' % nic.nicparams.get(param, None))
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
 
@@ -2067,9 +2124,20 @@ def FinalizeExport(instance, snap_disks):
 
   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
 
-  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
+  # New-style hypervisor/backend parameters
+
+  config.add_section(constants.INISECT_HYP)
+  for name, value in instance.hvparams.items():
+    if name not in constants.HVC_GLOBALS:
+      config.set(constants.INISECT_HYP, name, str(value))
+
+  config.add_section(constants.INISECT_BEP)
+  for name, value in instance.beparams.items():
+    config.set(constants.INISECT_BEP, name, str(value))
+
+  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
                   data=config.Dumps())
-  shutil.rmtree(finaldestdir, True)
+  shutil.rmtree(finaldestdir, ignore_errors=True)
   shutil.move(destdir, finaldestdir)
 
 
@@ -2084,7 +2152,7 @@ def ExportInfo(dest):
       export info
 
   """
-  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
+  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
 
   config = objects.SerializableConfigParser()
   config.read(cff)
@@ -2096,7 +2164,7 @@ def ExportInfo(dest):
   return config.Dumps()
 
 
-def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
+def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
   """Import an os image into an instance.
 
   @type instance: L{objects.Instance}
@@ -2105,16 +2173,17 @@ def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
   @param src_node: source node for the disk images
   @type src_images: list of string
   @param src_images: absolute paths of the disk images
+  @type debug: integer
+  @param debug: debug level, passed to the OS scripts
   @rtype: list of boolean
   @return: each boolean represent the success of importing the n-th disk
 
   """
   inst_os = OSFromDisk(instance.os)
-  import_env = OSEnvironment(instance, inst_os)
+  import_env = OSEnvironment(instance, inst_os, debug)
   import_script = inst_os.import_script
 
-  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
-                                        instance.name, int(time.time()))
+  logfile = _InstanceLogName("import", instance.os, instance.name)
   if not os.path.exists(constants.LOG_OS_DIR):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
@@ -2165,7 +2234,7 @@ def RemoveExport(export):
   @rtype: None
 
   """
-  target = os.path.join(constants.EXPORT_DIR, export)
+  target = utils.PathJoin(constants.EXPORT_DIR, export)
 
   try:
     shutil.rmtree(target)
@@ -2227,10 +2296,12 @@ def _TransformFileStorageDir(file_storage_dir):
   @return: the normalized path if valid, None otherwise
 
   """
+  if not constants.ENABLE_FILE_STORAGE:
+    _Fail("File storage disabled at configure time")
   cfg = _GetConfig()
   file_storage_dir = os.path.normpath(file_storage_dir)
   base_file_storage_dir = cfg.GetFileStorageDir()
-  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
+  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
       base_file_storage_dir):
     _Fail("File storage directory '%s' is not under base file"
           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
@@ -2467,6 +2538,65 @@ def DemoteFromMC():
   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
 
 
+def _GetX509Filenames(cryptodir, name):
+  """Returns the full paths for the private key and certificate.
+
+  """
+  return (utils.PathJoin(cryptodir, name),
+          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
+          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
+
+
+def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
+  """Creates a new X509 certificate for SSL/TLS.
+
+  @type validity: int
+  @param validity: Validity in seconds
+  @rtype: tuple; (string, string)
+  @return: Certificate name and public part
+
+  """
+  (key_pem, cert_pem) = \
+    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
+                                     min(validity, _MAX_SSL_CERT_VALIDITY))
+
+  cert_dir = tempfile.mkdtemp(dir=cryptodir,
+                              prefix="x509-%s-" % utils.TimestampForFilename())
+  try:
+    name = os.path.basename(cert_dir)
+    assert len(name) > 5
+
+    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
+
+    utils.WriteFile(key_file, mode=0400, data=key_pem)
+    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
+
+    # Never return private key as it shouldn't leave the node
+    return (name, cert_pem)
+  except Exception:
+    shutil.rmtree(cert_dir, ignore_errors=True)
+    raise
+
+
+def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
+  """Removes a X509 certificate.
+
+  @type name: string
+  @param name: Certificate name
+
+  """
+  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
+
+  utils.RemoveFile(key_file)
+  utils.RemoveFile(cert_file)
+
+  try:
+    os.rmdir(cert_dir)
+  except EnvironmentError, err:
+    _Fail("Cannot remove certificate directory '%s': %s",
+          cert_dir, err)
+
+
 def _FindDisks(nodes_ip, disks):
   """Sets the physical ID on disks and returns the block devices.
 
@@ -2633,56 +2763,6 @@ class HooksRunner(object):
     # constant
     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
 
-  @staticmethod
-  def ExecHook(script, env):
-    """Exec one hook script.
-
-    @type script: str
-    @param script: the full path to the script
-    @type env: dict
-    @param env: the environment with which to exec the script
-    @rtype: tuple (success, message)
-    @return: a tuple of success and message, where success
-        indicates the succes of the operation, and message
-        which will contain the error details in case we
-        failed
-
-    """
-    # exec the process using subprocess and log the output
-    fdstdin = None
-    try:
-      fdstdin = open("/dev/null", "r")
-      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
-                               stderr=subprocess.STDOUT, close_fds=True,
-                               shell=False, cwd="/", env=env)
-      output = ""
-      try:
-        output = child.stdout.read(4096)
-        child.stdout.close()
-      except EnvironmentError, err:
-        output += "Hook script error: %s" % str(err)
-
-      while True:
-        try:
-          result = child.wait()
-          break
-        except EnvironmentError, err:
-          if err.errno == errno.EINTR:
-            continue
-          raise
-    finally:
-      # try not to leak fds
-      for fd in (fdstdin, ):
-        if fd is not None:
-          try:
-            fd.close()
-          except EnvironmentError, err:
-            # just log the error
-            #logging.exception("Error while closing fd %s", fd)
-            pass
-
-    return result == 0, utils.SafeEncode(output.strip())
-
   def RunHooks(self, hpath, phase, env):
     """Run the scripts in the hooks directory.
 
@@ -2712,34 +2792,35 @@ class HooksRunner(object):
     else:
       _Fail("Unknown hooks phase '%s'", phase)
 
-    rr = []
 
     subdir = "%s-%s.d" % (hpath, suffix)
-    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
-    try:
-      dir_contents = utils.ListVisibleFiles(dir_name)
-    except OSError:
-      # FIXME: must log output in case of failures
-      return rr
-
-    # we use the standard python sort order,
-    # so 00name is the recommended naming scheme
-    dir_contents.sort()
-    for relname in dir_contents:
-      fname = os.path.join(dir_name, relname)
-      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
-              constants.EXT_PLUGIN_MASK.match(relname) is not None):
+    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
+
+    results = []
+
+    if not os.path.isdir(dir_name):
+      # for non-existing/non-dirs, we simply exit instead of logging a
+      # warning at every operation
+      return results
+
+    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
+
+    for (relname, relstatus, runresult)  in runparts_results:
+      if relstatus == constants.RUNPARTS_SKIP:
         rrval = constants.HKR_SKIP
         output = ""
-      else:
-        result, output = self.ExecHook(fname, env)
-        if not result:
+      elif relstatus == constants.RUNPARTS_ERR:
+        rrval = constants.HKR_FAIL
+        output = "Hook script execution error: %s" % runresult
+      elif relstatus == constants.RUNPARTS_RUN:
+        if runresult.failed:
           rrval = constants.HKR_FAIL
         else:
           rrval = constants.HKR_SUCCESS
-      rr.append(("%s/%s" % (subdir, relname), rrval, output))
+        output = utils.SafeEncode(runresult.output.strip())
+      results.append(("%s/%s" % (subdir, relname), rrval, output))
 
-    return rr
+    return results
 
 
 class IAllocatorRunner(object):
@@ -2806,7 +2887,7 @@ class DevCacheManager(object):
     if dev_path.startswith(cls._DEV_PREFIX):
       dev_path = dev_path[len(cls._DEV_PREFIX):]
     dev_path = dev_path.replace("/", "_")
-    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
+    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
     return fpath
 
   @classmethod