serializer: Fail if dictionary uses invalid keys
[ganeti-local] / lib / backend.py
index e7c4787..c38380d 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # 02110-1301, USA.
 
 
 # 02110-1301, USA.
 
 
-"""Functions used by the node daemon"""
+"""Functions used by the node daemon
+
+@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
+     the L{UploadFile} function
+@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
+     in the L{_CleanDirectory} function
+
+"""
+
+# pylint: disable=E1103
+
+# E1103: %s %r has no %r member (but some types could not be
+# inferred), because the _TryOSFromDisk returns either (True, os_obj)
+# or (False, "string") which confuses pylint
 
 
 import os
 
 
 import os
@@ -29,12 +42,12 @@ import time
 import stat
 import errno
 import re
 import stat
 import errno
 import re
-import subprocess
 import random
 import logging
 import tempfile
 import zlib
 import base64
 import random
 import logging
 import tempfile
 import zlib
 import base64
+import signal
 
 from ganeti import errors
 from ganeti import utils
 
 from ganeti import errors
 from ganeti import utils
@@ -44,6 +57,27 @@ from ganeti import constants
 from ganeti import bdev
 from ganeti import objects
 from ganeti import ssconf
 from ganeti import bdev
 from ganeti import objects
 from ganeti import ssconf
+from ganeti import serializer
+from ganeti import netutils
+from ganeti import runtime
+
+
+_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
+_ALLOWED_CLEAN_DIRS = frozenset([
+  constants.DATA_DIR,
+  constants.JOB_QUEUE_ARCHIVE_DIR,
+  constants.QUEUE_DIR,
+  constants.CRYPTO_KEYS_DIR,
+  ])
+_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
+_X509_KEY_FILE = "key"
+_X509_CERT_FILE = "cert"
+_IES_STATUS_FILE = "status"
+_IES_PID_FILE = "pid"
+_IES_CA_FILE = "ca"
+
+#: Valid LVS output line regex
+_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
 
 
 class RPCFail(Exception):
 
 
 class RPCFail(Exception):
@@ -53,6 +87,7 @@ class RPCFail(Exception):
 
   """
 
 
   """
 
+
 def _Fail(msg, *args, **kwargs):
   """Log an error and the raise an RPCFail exception.
 
 def _Fail(msg, *args, **kwargs):
   """Log an error and the raise an RPCFail exception.
 
@@ -129,6 +164,10 @@ def _CleanDirectory(path, exclude=None):
       to the empty list
 
   """
       to the empty list
 
   """
+  if path not in _ALLOWED_CLEAN_DIRS:
+    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
+          path)
+
   if not os.path.isdir(path):
     return
   if exclude is None:
   if not os.path.isdir(path):
     return
   if exclude is None:
@@ -138,13 +177,42 @@ def _CleanDirectory(path, exclude=None):
     exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
     exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
-    full_name = os.path.normpath(os.path.join(path, rel_name))
+    full_name = utils.PathJoin(path, rel_name)
     if full_name in exclude:
       continue
     if os.path.isfile(full_name) and not os.path.islink(full_name):
       utils.RemoveFile(full_name)
 
 
     if full_name in exclude:
       continue
     if os.path.isfile(full_name) and not os.path.islink(full_name):
       utils.RemoveFile(full_name)
 
 
+def _BuildUploadFileList():
+  """Build the list of allowed upload files.
+
+  This is abstracted so that it's built only once at module import time.
+
+  """
+  allowed_files = set([
+    constants.CLUSTER_CONF_FILE,
+    constants.ETC_HOSTS,
+    constants.SSH_KNOWN_HOSTS_FILE,
+    constants.VNC_PASSWORD_FILE,
+    constants.RAPI_CERT_FILE,
+    constants.SPICE_CERT_FILE,
+    constants.SPICE_CACERT_FILE,
+    constants.RAPI_USERS_FILE,
+    constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
+    ])
+
+  for hv_name in constants.HYPER_TYPES:
+    hv_class = hypervisor.GetHypervisorClass(hv_name)
+    allowed_files.update(hv_class.GetAncillaryFiles())
+
+  return frozenset(allowed_files)
+
+
+_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
+
+
 def JobQueuePurge():
   """Removes job queue files and archived jobs.
 
 def JobQueuePurge():
   """Removes job queue files and archived jobs.
 
@@ -163,7 +231,7 @@ def GetMasterInfo():
   for consumption here or from the node daemon.
 
   @rtype: tuple
   for consumption here or from the node daemon.
 
   @rtype: tuple
-  @return: master_netdev, master_ip, master_name
+  @return: master_netdev, master_ip, master_name, primary_ip_family
   @raise RPCFail: in case of errors
 
   """
   @raise RPCFail: in case of errors
 
   """
@@ -172,21 +240,23 @@ def GetMasterInfo():
     master_netdev = cfg.GetMasterNetdev()
     master_ip = cfg.GetMasterIP()
     master_node = cfg.GetMasterNode()
     master_netdev = cfg.GetMasterNetdev()
     master_ip = cfg.GetMasterIP()
     master_node = cfg.GetMasterNode()
+    primary_ip_family = cfg.GetPrimaryIPFamily()
   except errors.ConfigurationError, err:
     _Fail("Cluster configuration incomplete: %s", err, exc=True)
   except errors.ConfigurationError, err:
     _Fail("Cluster configuration incomplete: %s", err, exc=True)
-  return (master_netdev, master_ip, master_node)
+  return (master_netdev, master_ip, master_node, primary_ip_family)
 
 
 def StartMaster(start_daemons, no_voting):
   """Activate local node as master node.
 
 
 
 def StartMaster(start_daemons, no_voting):
   """Activate local node as master node.
 
-  The function will always try activate the IP address of the master
-  (unless someone else has it). It will also start the master daemons,
-  based on the start_daemons parameter.
+  The function will either try activate the IP address of the master
+  (unless someone else has it) or also start the master daemons, based
+  on the start_daemons parameter.
 
   @type start_daemons: boolean
 
   @type start_daemons: boolean
-  @param start_daemons: whether to also start the master
-      daemons (ganeti-masterd and ganeti-rapi)
+  @param start_daemons: whether to start the master daemons
+      (ganeti-masterd and ganeti-rapi), or (if false) activate the
+      master ip
   @type no_voting: boolean
   @param no_voting: whether to start ganeti-masterd without a node vote
       (if start_daemons is True), but still non-interactively
   @type no_voting: boolean
   @param no_voting: whether to start ganeti-masterd without a node vote
       (if start_daemons is True), but still non-interactively
@@ -194,48 +264,60 @@ def StartMaster(start_daemons, no_voting):
 
   """
   # GetMasterInfo will raise an exception if not able to return data
 
   """
   # GetMasterInfo will raise an exception if not able to return data
-  master_netdev, master_ip, _ = GetMasterInfo()
+  master_netdev, master_ip, _, family = GetMasterInfo()
 
   err_msgs = []
 
   err_msgs = []
-  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
-    if utils.OwnIpAddress(master_ip):
-      # we already have the ip:
-      logging.debug("Master IP already configured, doing nothing")
+  # either start the master and rapi daemons
+  if start_daemons:
+    if no_voting:
+      masterd_args = "--no-voting --yes-do-it"
     else:
     else:
-      msg = "Someone else has the master ip, not activating"
-      logging.error(msg)
-      err_msgs.append(msg)
-  else:
-    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
-                           "dev", master_netdev, "label",
-                           "%s:0" % master_netdev])
+      masterd_args = ""
+
+    env = {
+      "EXTRA_MASTERD_ARGS": masterd_args,
+      }
+
+    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
     if result.failed:
     if result.failed:
-      msg = "Can't activate master IP: %s" % result.output
+      msg = "Can't start Ganeti master: %s" % result.output
       logging.error(msg)
       err_msgs.append(msg)
       logging.error(msg)
       err_msgs.append(msg)
-
-    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
-                           "-s", master_ip, master_ip])
-    # we'll ignore the exit code of arping
-
-  # and now start the master and rapi daemons
-  if start_daemons:
-    daemons_params = {
-        'ganeti-masterd': [],
-        'ganeti-rapi': [],
-        }
-    if no_voting:
-      daemons_params['ganeti-masterd'].append('--no-voting')
-      daemons_params['ganeti-masterd'].append('--yes-do-it')
-    for daemon in daemons_params:
-      cmd = [daemon]
-      cmd.extend(daemons_params[daemon])
-      result = utils.RunCmd(cmd)
+  # or activate the IP
+  else:
+    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+      if netutils.IPAddress.Own(master_ip):
+        # we already have the ip:
+        logging.debug("Master IP already configured, doing nothing")
+      else:
+        msg = "Someone else has the master ip, not activating"
+        logging.error(msg)
+        err_msgs.append(msg)
+    else:
+      ipcls = netutils.IP4Address
+      if family == netutils.IP6Address.family:
+        ipcls = netutils.IP6Address
+
+      result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
+                             "%s/%d" % (master_ip, ipcls.iplen),
+                             "dev", master_netdev, "label",
+                             "%s:0" % master_netdev])
       if result.failed:
       if result.failed:
-        msg = "Can't start daemon %s: %s" % (daemon, result.output)
+        msg = "Can't activate master IP: %s" % result.output
         logging.error(msg)
         err_msgs.append(msg)
 
         logging.error(msg)
         err_msgs.append(msg)
 
+      # we ignore the exit code of the following cmds
+      if ipcls == netutils.IP4Address:
+        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
+                      master_ip, master_ip])
+      elif ipcls == netutils.IP6Address:
+        try:
+          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
+        except errors.OpExecError:
+          # TODO: Better error reporting
+          logging.warning("Can't execute ndisc6, please install if missing")
+
   if err_msgs:
     _Fail("; ".join(err_msgs))
 
   if err_msgs:
     _Fail("; ".join(err_msgs))
 
@@ -257,66 +339,50 @@ def StopMaster(stop_daemons):
   # need to decide in which case we fail the RPC for this
 
   # GetMasterInfo will raise an exception if not able to return data
   # need to decide in which case we fail the RPC for this
 
   # GetMasterInfo will raise an exception if not able to return data
-  master_netdev, master_ip, _ = GetMasterInfo()
+  master_netdev, master_ip, _, family = GetMasterInfo()
 
 
-  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
+  ipcls = netutils.IP4Address
+  if family == netutils.IP6Address.family:
+    ipcls = netutils.IP6Address
+
+  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
+                         "%s/%d" % (master_ip, ipcls.iplen),
                          "dev", master_netdev])
   if result.failed:
     logging.error("Can't remove the master IP, error: %s", result.output)
     # but otherwise ignore the failure
 
   if stop_daemons:
                          "dev", master_netdev])
   if result.failed:
     logging.error("Can't remove the master IP, error: %s", result.output)
     # but otherwise ignore the failure
 
   if stop_daemons:
-    # stop/kill the rapi and the master daemon
-    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
-      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
-
-
-def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
-  """Joins this node to the cluster.
-
-  This does the following:
-      - updates the hostkeys of the machine (rsa and dsa)
-      - adds the ssh private key to the user
-      - adds the ssh public key to the users' authorized_keys file
-
-  @type dsa: str
-  @param dsa: the DSA private key to write
-  @type dsapub: str
-  @param dsapub: the DSA public key to write
-  @type rsa: str
-  @param rsa: the RSA private key to write
-  @type rsapub: str
-  @param rsapub: the RSA public key to write
-  @type sshkey: str
-  @param sshkey: the SSH private key to write
-  @type sshpub: str
-  @param sshpub: the SSH public key to write
-  @rtype: boolean
-  @return: the success of the operation
-
-  """
-  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
-                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
-                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
-                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
-  for name, content, mode in sshd_keys:
-    utils.WriteFile(name, data=content, mode=mode)
+    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
+    if result.failed:
+      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
+                    " and error %s",
+                    result.cmd, result.exit_code, result.output)
 
 
-  try:
-    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
-                                                    mkdir=True)
-  except errors.OpExecError, err:
-    _Fail("Error while processing user ssh files: %s", err, exc=True)
 
 
-  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
-    utils.WriteFile(name, data=content, mode=0600)
+def EtcHostsModify(mode, host, ip):
+  """Modify a host entry in /etc/hosts.
 
 
-  utils.AddAuthorizedKey(auth_keys, sshpub)
+  @param mode: The mode to operate. Either add or remove entry
+  @param host: The host to operate on
+  @param ip: The ip associated with the entry
 
 
-  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
+  """
+  if mode == constants.ETC_HOSTS_ADD:
+    if not ip:
+      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
+              " present")
+    utils.AddHostToEtcHosts(host, ip)
+  elif mode == constants.ETC_HOSTS_REMOVE:
+    if ip:
+      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
+              " parameter is present")
+    utils.RemoveHostFromEtcHosts(host)
+  else:
+    RPCFail("Mode not supported")
 
 
 
 
-def LeaveCluster():
+def LeaveCluster(modify_ssh_setup):
   """Cleans up and remove the current node.
 
   This function cleans up and prepares the current node to be removed
   """Cleans up and remove the current node.
 
   This function cleans up and prepares the current node to be removed
@@ -326,26 +392,40 @@ def LeaveCluster():
   L{errors.QuitGanetiException} which is used as a special case to
   shutdown the node daemon.
 
   L{errors.QuitGanetiException} which is used as a special case to
   shutdown the node daemon.
 
+  @param modify_ssh_setup: boolean
+
   """
   _CleanDirectory(constants.DATA_DIR)
   """
   _CleanDirectory(constants.DATA_DIR)
+  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
   JobQueuePurge()
 
   JobQueuePurge()
 
-  try:
-    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
-
-    f = open(pub_key, 'r')
+  if modify_ssh_setup:
     try:
     try:
-      utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
-    finally:
-      f.close()
+      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
+
+      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
 
 
-    utils.RemoveFile(priv_key)
-    utils.RemoveFile(pub_key)
-  except errors.OpExecError:
-    logging.exception("Error while processing ssh files")
+      utils.RemoveFile(priv_key)
+      utils.RemoveFile(pub_key)
+    except errors.OpExecError:
+      logging.exception("Error while processing ssh files")
+
+  try:
+    utils.RemoveFile(constants.CONFD_HMAC_KEY)
+    utils.RemoveFile(constants.RAPI_CERT_FILE)
+    utils.RemoveFile(constants.SPICE_CERT_FILE)
+    utils.RemoveFile(constants.SPICE_CACERT_FILE)
+    utils.RemoveFile(constants.NODED_CERT_FILE)
+  except: # pylint: disable=W0702
+    logging.exception("Error while removing cluster secrets")
+
+  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
+  if result.failed:
+    logging.error("Command %s failed with exitcode %s and error %s",
+                  result.cmd, result.exit_code, result.output)
 
   # Raise a custom exception (handled in ganeti-noded)
 
   # Raise a custom exception (handled in ganeti-noded)
-  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
+  raise errors.QuitGanetiException(True, "Shutdown scheduled")
 
 
 def GetNodeInfo(vgname, hypervisor_type):
 
 
 def GetNodeInfo(vgname, hypervisor_type):
@@ -366,20 +446,23 @@ def GetNodeInfo(vgname, hypervisor_type):
 
   """
   outputarray = {}
 
   """
   outputarray = {}
-  vginfo = _GetVGInfo(vgname)
-  outputarray['vg_size'] = vginfo['vg_size']
-  outputarray['vg_free'] = vginfo['vg_free']
 
 
-  hyper = hypervisor.GetHypervisor(hypervisor_type)
-  hyp_info = hyper.GetNodeInfo()
-  if hyp_info is not None:
-    outputarray.update(hyp_info)
+  if vgname is not None:
+    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
+    vg_free = vg_size = None
+    if vginfo:
+      vg_free = int(round(vginfo[0][0], 0))
+      vg_size = int(round(vginfo[0][1], 0))
+    outputarray["vg_size"] = vg_size
+    outputarray["vg_free"] = vg_free
 
 
-  f = open("/proc/sys/kernel/random/boot_id", 'r')
-  try:
-    outputarray["bootid"] = f.read(128).rstrip("\n")
-  finally:
-    f.close()
+  if hypervisor_type is not None:
+    hyper = hypervisor.GetHypervisor(hypervisor_type)
+    hyp_info = hyper.GetNodeInfo()
+    if hyp_info is not None:
+      outputarray.update(hyp_info)
+
+  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
 
   return outputarray
 
 
   return outputarray
 
@@ -414,11 +497,27 @@ def VerifyNode(what, cluster_name):
 
   """
   result = {}
 
   """
   result = {}
+  my_name = netutils.Hostname.GetSysName()
+  port = netutils.GetDaemonPort(constants.NODED)
+  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
 
 
-  if constants.NV_HYPERVISOR in what:
+  if constants.NV_HYPERVISOR in what and vm_capable:
     result[constants.NV_HYPERVISOR] = tmp = {}
     for hv_name in what[constants.NV_HYPERVISOR]:
     result[constants.NV_HYPERVISOR] = tmp = {}
     for hv_name in what[constants.NV_HYPERVISOR]:
-      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+      try:
+        val = hypervisor.GetHypervisor(hv_name).Verify()
+      except errors.HypervisorError, err:
+        val = "Error while checking hypervisor: %s" % str(err)
+      tmp[hv_name] = val
+
+  if constants.NV_HVPARAMS in what and vm_capable:
+    result[constants.NV_HVPARAMS] = tmp = []
+    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
+      try:
+        logging.info("Validating hv %s, %s", hv_name, hvparms)
+        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
+      except errors.HypervisorError, err:
+        tmp.append((source, hv_name, str(err)))
 
   if constants.NV_FILELIST in what:
     result[constants.NV_FILELIST] = utils.FingerprintFiles(
 
   if constants.NV_FILELIST in what:
     result[constants.NV_FILELIST] = utils.FingerprintFiles(
@@ -434,7 +533,6 @@ def VerifyNode(what, cluster_name):
 
   if constants.NV_NODENETTEST in what:
     result[constants.NV_NODENETTEST] = tmp = {}
 
   if constants.NV_NODENETTEST in what:
     result[constants.NV_NODENETTEST] = tmp = {}
-    my_name = utils.HostInfo().name
     my_pip = my_sip = None
     for name, pip, sip in what[constants.NV_NODENETTEST]:
       if name == my_name:
     my_pip = my_sip = None
     for name, pip, sip in what[constants.NV_NODENETTEST]:
       if name == my_name:
@@ -445,37 +543,76 @@ def VerifyNode(what, cluster_name):
       tmp[my_name] = ("Can't find my own primary/secondary IP"
                       " in the node list")
     else:
       tmp[my_name] = ("Can't find my own primary/secondary IP"
                       " in the node list")
     else:
-      port = utils.GetNodeDaemonPort()
       for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
       for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
-        if not utils.TcpPing(pip, port, source=my_pip):
+        if not netutils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
         if sip != pip:
           fail.append("primary")
         if sip != pip:
-          if not utils.TcpPing(sip, port, source=my_sip):
+          if not netutils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
           tmp[name] = ("failure using the %s interface(s)" %
                        " and ".join(fail))
 
             fail.append("secondary")
         if fail:
           tmp[name] = ("failure using the %s interface(s)" %
                        " and ".join(fail))
 
-  if constants.NV_LVLIST in what:
-    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+  if constants.NV_MASTERIP in what:
+    # FIXME: add checks on incoming data structures (here and in the
+    # rest of the function)
+    master_name, master_ip = what[constants.NV_MASTERIP]
+    if master_name == my_name:
+      source = constants.IP4_ADDRESS_LOCALHOST
+    else:
+      source = None
+    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
+                                                  source=source)
+
+  if constants.NV_OOB_PATHS in what:
+    result[constants.NV_OOB_PATHS] = tmp = []
+    for path in what[constants.NV_OOB_PATHS]:
+      try:
+        st = os.stat(path)
+      except OSError, err:
+        tmp.append("error stating out of band helper: %s" % err)
+      else:
+        if stat.S_ISREG(st.st_mode):
+          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
+            tmp.append(None)
+          else:
+            tmp.append("out of band helper %s is not executable" % path)
+        else:
+          tmp.append("out of band helper %s is not a file" % path)
+
+  if constants.NV_LVLIST in what and vm_capable:
+    try:
+      val = GetVolumeList(utils.ListVolumeGroups().keys())
+    except RPCFail, err:
+      val = str(err)
+    result[constants.NV_LVLIST] = val
 
 
-  if constants.NV_INSTANCELIST in what:
-    result[constants.NV_INSTANCELIST] = GetInstanceList(
-      what[constants.NV_INSTANCELIST])
+  if constants.NV_INSTANCELIST in what and vm_capable:
+    # GetInstanceList can fail
+    try:
+      val = GetInstanceList(what[constants.NV_INSTANCELIST])
+    except RPCFail, err:
+      val = str(err)
+    result[constants.NV_INSTANCELIST] = val
 
 
-  if constants.NV_VGLIST in what:
+  if constants.NV_VGLIST in what and vm_capable:
     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
 
     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
 
+  if constants.NV_PVLIST in what and vm_capable:
+    result[constants.NV_PVLIST] = \
+      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
+                                   filter_allocatable=False)
+
   if constants.NV_VERSION in what:
     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
                                     constants.RELEASE_VERSION)
 
   if constants.NV_VERSION in what:
     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
                                     constants.RELEASE_VERSION)
 
-  if constants.NV_HVINFO in what:
+  if constants.NV_HVINFO in what and vm_capable:
     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
 
     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
 
-  if constants.NV_DRBDLIST in what:
+  if constants.NV_DRBDLIST in what and vm_capable:
     try:
       used_minors = bdev.DRBD8.GetUsedDevs().keys()
     except errors.BlockDeviceError, err:
     try:
       used_minors = bdev.DRBD8.GetUsedDevs().keys()
     except errors.BlockDeviceError, err:
@@ -483,44 +620,121 @@ def VerifyNode(what, cluster_name):
       used_minors = str(err)
     result[constants.NV_DRBDLIST] = used_minors
 
       used_minors = str(err)
     result[constants.NV_DRBDLIST] = used_minors
 
+  if constants.NV_DRBDHELPER in what and vm_capable:
+    status = True
+    try:
+      payload = bdev.BaseDRBD.GetUsermodeHelper()
+    except errors.BlockDeviceError, err:
+      logging.error("Can't get DRBD usermode helper: %s", str(err))
+      status = False
+      payload = str(err)
+    result[constants.NV_DRBDHELPER] = (status, payload)
+
+  if constants.NV_NODESETUP in what:
+    result[constants.NV_NODESETUP] = tmpr = []
+    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
+      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
+                  " under /sys, missing required directories /sys/block"
+                  " and /sys/class/net")
+    if (not os.path.isdir("/proc/sys") or
+        not os.path.isfile("/proc/sysrq-trigger")):
+      tmpr.append("The procfs filesystem doesn't seem to be mounted"
+                  " under /proc, missing required directory /proc/sys and"
+                  " the file /proc/sysrq-trigger")
+
+  if constants.NV_TIME in what:
+    result[constants.NV_TIME] = utils.SplitTime(time.time())
+
+  if constants.NV_OSLIST in what and vm_capable:
+    result[constants.NV_OSLIST] = DiagnoseOS()
+
+  if constants.NV_BRIDGES in what and vm_capable:
+    result[constants.NV_BRIDGES] = [bridge
+                                    for bridge in what[constants.NV_BRIDGES]
+                                    if not utils.BridgeExists(bridge)]
   return result
 
 
   return result
 
 
-def GetVolumeList(vg_name):
+def GetBlockDevSizes(devices):
+  """Return the size of the given block devices
+
+  @type devices: list
+  @param devices: list of block device nodes to query
+  @rtype: dict
+  @return:
+    dictionary of all block devices under /dev (key). The value is their
+    size in MiB.
+
+    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
+
+  """
+  DEV_PREFIX = "/dev/"
+  blockdevs = {}
+
+  for devpath in devices:
+    if not utils.IsBelowDir(DEV_PREFIX, devpath):
+      continue
+
+    try:
+      st = os.stat(devpath)
+    except EnvironmentError, err:
+      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
+      continue
+
+    if stat.S_ISBLK(st.st_mode):
+      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
+      if result.failed:
+        # We don't want to fail, just do not list this device as available
+        logging.warning("Cannot get size for block device %s", devpath)
+        continue
+
+      size = int(result.stdout) / (1024 * 1024)
+      blockdevs[devpath] = size
+  return blockdevs
+
+
+def GetVolumeList(vg_names):
   """Compute list of logical volumes and their size.
 
   """Compute list of logical volumes and their size.
 
-  @type vg_name: str
-  @param vg_name: the volume group whose LVs we should list
+  @type vg_names: list
+  @param vg_names: the volume groups whose LVs we should list, or
+      empty for all volume groups
   @rtype: dict
   @return:
       dictionary of all partions (key) with value being a tuple of
       their size (in MiB), inactive and online status::
 
   @rtype: dict
   @return:
       dictionary of all partions (key) with value being a tuple of
       their size (in MiB), inactive and online status::
 
-        {'test1': ('20.06', True, True)}
+        {'xenvg/test1': ('20.06', True, True)}
 
       in case of errors, a string is returned with the error
       details.
 
   """
   lvs = {}
 
       in case of errors, a string is returned with the error
       details.
 
   """
   lvs = {}
-  sep = '|'
+  sep = "|"
+  if not vg_names:
+    vg_names = []
   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
                          "--separator=%s" % sep,
   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
                          "--separator=%s" % sep,
-                         "-olv_name,lv_size,lv_attr", vg_name])
+                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
   if result.failed:
     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
 
   if result.failed:
     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
 
-  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
   for line in result.stdout.splitlines():
     line = line.strip()
   for line in result.stdout.splitlines():
     line = line.strip()
-    match = valid_line_re.match(line)
+    match = _LVSLINE_REGEX.match(line)
     if not match:
       logging.error("Invalid line returned from lvs output: '%s'", line)
       continue
     if not match:
       logging.error("Invalid line returned from lvs output: '%s'", line)
       continue
-    name, size, attr = match.groups()
-    inactive = attr[4] == '-'
-    online = attr[5] == 'o'
-    lvs[name] = (size, inactive, online)
+    vg_name, name, size, attr = match.groups()
+    inactive = attr[4] == "-"
+    online = attr[5] == "o"
+    virtual = attr[0] == "v"
+    if virtual:
+      # we don't want to report such volumes as existing, since they
+      # don't really hold data
+      continue
+    lvs[vg_name + "/" + name] = (size, inactive, online)
 
   return lvs
 
 
   return lvs
 
@@ -563,21 +777,23 @@ def NodeVolumes():
           result.output)
 
   def parse_dev(dev):
           result.output)
 
   def parse_dev(dev):
-    if '(' in dev:
-      return dev.split('(')[0]
-    else:
-      return dev
+    return dev.split("(")[0]
+
+  def handle_dev(dev):
+    return [parse_dev(x) for x in dev.split(",")]
 
   def map_line(line):
 
   def map_line(line):
-    return {
-      'name': line[0].strip(),
-      'size': line[1].strip(),
-      'dev': parse_dev(line[2].strip()),
-      'vg': line[3].strip(),
-    }
+    line = [v.strip() for v in line]
+    return [{"name": line[0], "size": line[1],
+             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
 
 
-  return [map_line(line.split('|')) for line in result.stdout.splitlines()
-          if line.count('|') >= 3]
+  all_devs = []
+  for line in result.stdout.splitlines():
+    if line.count("|") >= 3:
+      all_devs.extend(map_line(line.split("|")))
+    else:
+      logging.warning("Strange line in the output from lvs: '%s'", line)
+  return all_devs
 
 
 def BridgesExist(bridges_list):
 
 
 def BridgesExist(bridges_list):
@@ -593,7 +809,7 @@ def BridgesExist(bridges_list):
       missing.append(bridge)
 
   if missing:
       missing.append(bridge)
 
   if missing:
-    _Fail("Missing bridges %s", ", ".join(missing))
+    _Fail("Missing bridges %s", utils.CommaJoin(missing))
 
 
 def GetInstanceList(hypervisor_list):
 
 
 def GetInstanceList(hypervisor_list):
@@ -639,9 +855,9 @@ def GetInstanceInfo(instance, hname):
 
   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
   if iinfo is not None:
 
   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
   if iinfo is not None:
-    output['memory'] = iinfo[2]
-    output['state'] = iinfo[4]
-    output['time'] = iinfo[5]
+    output["memory"] = iinfo[2]
+    output["state"] = iinfo[4]
+    output["time"] = iinfo[5]
 
   return output
 
 
   return output
 
@@ -666,7 +882,8 @@ def GetInstanceMigratable(instance):
   for idx in range(len(instance.disks)):
     link_name = _GetBlockDevSymlinkPath(iname, idx)
     if not os.path.islink(link_name):
   for idx in range(len(instance.disks)):
     link_name = _GetBlockDevSymlinkPath(iname, idx)
     if not os.path.islink(link_name):
-      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
+      logging.warning("Instance %s is missing symlink %s for disk %d",
+                      iname, link_name, idx)
 
 
 def GetAllInstancesInfo(hypervisor_list):
 
 
 def GetAllInstancesInfo(hypervisor_list):
@@ -694,16 +911,16 @@ def GetAllInstancesInfo(hypervisor_list):
     if iinfo:
       for name, _, memory, vcpus, state, times in iinfo:
         value = {
     if iinfo:
       for name, _, memory, vcpus, state, times in iinfo:
         value = {
-          'memory': memory,
-          'vcpus': vcpus,
-          'state': state,
-          'time': times,
+          "memory": memory,
+          "vcpus": vcpus,
+          "state": state,
+          "time": times,
           }
         if name in output:
           # we only check static parameters, like memory and vcpus,
           # and not state and time which can change between the
           # invocations of the different hypervisors
           }
         if name in output:
           # we only check static parameters, like memory and vcpus,
           # and not state and time which can change between the
           # invocations of the different hypervisors
-          for key in 'memory', 'vcpus':
+          for key in "memory", "vcpus":
             if value[key] != output[name][key]:
               _Fail("Instance %s is running twice"
                     " with different parameters", name)
             if value[key] != output[name][key]:
               _Fail("Instance %s is running twice"
                     " with different parameters", name)
@@ -712,27 +929,56 @@ def GetAllInstancesInfo(hypervisor_list):
   return output
 
 
   return output
 
 
-def InstanceOsAdd(instance, reinstall):
+def _InstanceLogName(kind, os_name, instance, component):
+  """Compute the OS log filename for a given instance and operation.
+
+  The instance name and os name are passed in as strings since not all
+  operations have these as part of an instance object.
+
+  @type kind: string
+  @param kind: the operation type (e.g. add, import, etc.)
+  @type os_name: string
+  @param os_name: the os name
+  @type instance: string
+  @param instance: the name of the instance being imported/added/etc.
+  @type component: string or None
+  @param component: the name of the component of the instance being
+      transferred
+
+  """
+  # TODO: Use tempfile.mkstemp to create unique filename
+  if component:
+    assert "/" not in component
+    c_msg = "-%s" % component
+  else:
+    c_msg = ""
+  base = ("%s-%s-%s%s-%s.log" %
+          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
+  return utils.PathJoin(constants.LOG_OS_DIR, base)
+
+
+def InstanceOsAdd(instance, reinstall, debug):
   """Add an OS to an instance.
 
   @type instance: L{objects.Instance}
   @param instance: Instance whose OS is to be installed
   @type reinstall: boolean
   @param reinstall: whether this is an instance reinstall
   """Add an OS to an instance.
 
   @type instance: L{objects.Instance}
   @param instance: Instance whose OS is to be installed
   @type reinstall: boolean
   @param reinstall: whether this is an instance reinstall
+  @type debug: integer
+  @param debug: debug level, passed to the OS scripts
   @rtype: None
 
   """
   inst_os = OSFromDisk(instance.os)
 
   @rtype: None
 
   """
   inst_os = OSFromDisk(instance.os)
 
-  create_env = OSEnvironment(instance, inst_os)
+  create_env = OSEnvironment(instance, inst_os, debug)
   if reinstall:
   if reinstall:
-    create_env['INSTANCE_REINSTALL'] = "1"
+    create_env["INSTANCE_REINSTALL"] = "1"
 
 
-  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
-                                     instance.name, int(time.time()))
+  logfile = _InstanceLogName("add", instance.os, instance.name, None)
 
   result = utils.RunCmd([inst_os.create_script], env=create_env,
 
   result = utils.RunCmd([inst_os.create_script], env=create_env,
-                        cwd=inst_os.path, output=logfile,)
+                        cwd=inst_os.path, output=logfile, reset_env=True)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
@@ -743,28 +989,29 @@ def InstanceOsAdd(instance, reinstall):
           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
 
 
           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
 
 
-def RunRenameInstance(instance, old_name):
+def RunRenameInstance(instance, old_name, debug):
   """Run the OS rename script for an instance.
 
   @type instance: L{objects.Instance}
   @param instance: Instance whose OS is to be installed
   @type old_name: string
   @param old_name: previous instance name
   """Run the OS rename script for an instance.
 
   @type instance: L{objects.Instance}
   @param instance: Instance whose OS is to be installed
   @type old_name: string
   @param old_name: previous instance name
+  @type debug: integer
+  @param debug: debug level, passed to the OS scripts
   @rtype: boolean
   @return: the success of the operation
 
   """
   inst_os = OSFromDisk(instance.os)
 
   @rtype: boolean
   @return: the success of the operation
 
   """
   inst_os = OSFromDisk(instance.os)
 
-  rename_env = OSEnvironment(instance, inst_os)
-  rename_env['OLD_INSTANCE_NAME'] = old_name
+  rename_env = OSEnvironment(instance, inst_os, debug)
+  rename_env["OLD_INSTANCE_NAME"] = old_name
 
 
-  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
-                                           old_name,
-                                           instance.name, int(time.time()))
+  logfile = _InstanceLogName("rename", instance.os,
+                             "%s-%s" % (old_name, instance.name), None)
 
   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
 
   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
-                        cwd=inst_os.path, output=logfile)
+                        cwd=inst_os.path, output=logfile, reset_env=True)
 
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
 
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
@@ -775,49 +1022,9 @@ def RunRenameInstance(instance, old_name):
           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
 
 
           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
 
 
-def _GetVGInfo(vg_name):
-  """Get information about the volume group.
-
-  @type vg_name: str
-  @param vg_name: the volume group which we query
-  @rtype: dict
-  @return:
-    A dictionary with the following keys:
-      - C{vg_size} is the total size of the volume group in MiB
-      - C{vg_free} is the free size of the volume group in MiB
-      - C{pv_count} are the number of physical disks in that VG
-
-    If an error occurs during gathering of data, we return the same dict
-    with keys all set to None.
-
-  """
-  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
-
-  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
-                         "--nosuffix", "--units=m", "--separator=:", vg_name])
-
-  if retval.failed:
-    logging.error("volume group %s not present", vg_name)
-    return retdic
-  valarr = retval.stdout.strip().rstrip(':').split(':')
-  if len(valarr) == 3:
-    try:
-      retdic = {
-        "vg_size": int(round(float(valarr[0]), 0)),
-        "vg_free": int(round(float(valarr[1]), 0)),
-        "pv_count": int(valarr[2]),
-        }
-    except ValueError, err:
-      logging.exception("Fail to parse vgs output: %s", err)
-  else:
-    logging.error("vgs output has the wrong number of fields (expected"
-                  " three): %s", str(valarr))
-  return retdic
-
-
 def _GetBlockDevSymlinkPath(instance_name, idx):
 def _GetBlockDevSymlinkPath(instance_name, idx):
-  return os.path.join(constants.DISK_LINKS_DIR,
-                      "%s:%d" % (instance_name, idx))
+  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
+                        (instance_name, constants.DISK_SEPARATOR, idx))
 
 
 def _SymlinkBlockDev(instance_name, device_path, idx):
 
 
 def _SymlinkBlockDev(instance_name, device_path, idx):
@@ -891,11 +1098,13 @@ def _GatherAndLinkBlockDevs(instance):
   return block_devices
 
 
   return block_devices
 
 
-def StartInstance(instance):
+def StartInstance(instance, startup_paused):
   """Start an instance.
 
   @type instance: L{objects.Instance}
   @param instance: the instance object
   """Start an instance.
 
   @type instance: L{objects.Instance}
   @param instance: the instance object
+  @type startup_paused: bool
+  @param instance: pause instance at startup?
   @rtype: None
 
   """
   @rtype: None
 
   """
@@ -908,7 +1117,7 @@ def StartInstance(instance):
   try:
     block_devices = _GatherAndLinkBlockDevs(instance)
     hyper = hypervisor.GetHypervisor(instance.hypervisor)
   try:
     block_devices = _GatherAndLinkBlockDevs(instance)
     hyper = hypervisor.GetHypervisor(instance.hypervisor)
-    hyper.StartInstance(instance, block_devices)
+    hyper.StartInstance(instance, block_devices, startup_paused)
   except errors.BlockDeviceError, err:
     _Fail("Block device error: %s", err, exc=True)
   except errors.HypervisorError, err:
   except errors.BlockDeviceError, err:
     _Fail("Block device error: %s", err, exc=True)
   except errors.HypervisorError, err:
@@ -916,54 +1125,76 @@ def StartInstance(instance):
     _Fail("Hypervisor error: %s", err, exc=True)
 
 
     _Fail("Hypervisor error: %s", err, exc=True)
 
 
-def InstanceShutdown(instance):
+def InstanceShutdown(instance, timeout):
   """Shut an instance down.
 
   @note: this functions uses polling with a hardcoded timeout.
 
   @type instance: L{objects.Instance}
   @param instance: the instance object
   """Shut an instance down.
 
   @note: this functions uses polling with a hardcoded timeout.
 
   @type instance: L{objects.Instance}
   @param instance: the instance object
+  @type timeout: integer
+  @param timeout: maximum timeout for soft shutdown
   @rtype: None
 
   """
   hv_name = instance.hypervisor
   @rtype: None
 
   """
   hv_name = instance.hypervisor
-  running_instances = GetInstanceList([hv_name])
+  hyper = hypervisor.GetHypervisor(hv_name)
   iname = instance.name
 
   iname = instance.name
 
-  if iname not in running_instances:
+  if instance.name not in hyper.ListInstances():
     logging.info("Instance %s not running, doing nothing", iname)
     return
 
     logging.info("Instance %s not running, doing nothing", iname)
     return
 
-  hyper = hypervisor.GetHypervisor(hv_name)
-  try:
-    hyper.StopInstance(instance)
-  except errors.HypervisorError, err:
-    _Fail("Failed to stop instance %s: %s", iname, err)
+  class _TryShutdown:
+    def __init__(self):
+      self.tried_once = False
 
 
-  # test every 10secs for 2min
+    def __call__(self):
+      if iname not in hyper.ListInstances():
+        return
 
 
-  time.sleep(1)
-  for _ in range(11):
-    if instance.name not in GetInstanceList([hv_name]):
-      break
-    time.sleep(10)
-  else:
+      try:
+        hyper.StopInstance(instance, retry=self.tried_once)
+      except errors.HypervisorError, err:
+        if iname not in hyper.ListInstances():
+          # if the instance is no longer existing, consider this a
+          # success and go to cleanup
+          return
+
+        _Fail("Failed to stop instance %s: %s", iname, err)
+
+      self.tried_once = True
+
+      raise utils.RetryAgain()
+
+  try:
+    utils.Retry(_TryShutdown(), 5, timeout)
+  except utils.RetryTimeout:
     # the shutdown did not succeed
     # the shutdown did not succeed
-    logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
+    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      _Fail("Failed to force stop instance %s: %s", iname, err)
+      if iname in hyper.ListInstances():
+        # only raise an error if the instance still exists, otherwise
+        # the error could simply be "instance ... unknown"!
+        _Fail("Failed to force stop instance %s: %s", iname, err)
 
     time.sleep(1)
 
     time.sleep(1)
-    if instance.name in GetInstanceList([hv_name]):
+
+    if iname in hyper.ListInstances():
       _Fail("Could not shutdown instance %s even by destroy", iname)
 
       _Fail("Could not shutdown instance %s even by destroy", iname)
 
+  try:
+    hyper.CleanupInstance(instance.name)
+  except errors.HypervisorError, err:
+    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
+
   _RemoveBlockDevLinks(iname, instance.disks)
 
 
   _RemoveBlockDevLinks(iname, instance.disks)
 
 
-def InstanceReboot(instance, reboot_type):
+def InstanceReboot(instance, reboot_type, shutdown_timeout):
   """Reboot an instance.
 
   @type instance: L{objects.Instance}
   """Reboot an instance.
 
   @type instance: L{objects.Instance}
@@ -979,6 +1210,8 @@ def InstanceReboot(instance, reboot_type):
         not accepted here, since that mode is handled differently, in
         cmdlib, and translates into full stop and start of the
         instance (instead of a call_instance_reboot RPC)
         not accepted here, since that mode is handled differently, in
         cmdlib, and translates into full stop and start of the
         instance (instead of a call_instance_reboot RPC)
+  @type shutdown_timeout: integer
+  @param shutdown_timeout: maximum timeout for soft shutdown
   @rtype: None
 
   """
   @rtype: None
 
   """
@@ -995,8 +1228,8 @@ def InstanceReboot(instance, reboot_type):
       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
-      InstanceShutdown(instance)
-      return StartInstance(instance)
+      InstanceShutdown(instance, shutdown_timeout)
+      return StartInstance(instance, False)
     except errors.HypervisorError, err:
       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
   else:
     except errors.HypervisorError, err:
       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
   else:
@@ -1029,10 +1262,21 @@ def AcceptInstance(instance, info, target):
   @param target: target host (usually ip), on this node
 
   """
   @param target: target host (usually ip), on this node
 
   """
+  # TODO: why is this required only for DTS_EXT_MIRROR?
+  if instance.disk_template in constants.DTS_EXT_MIRROR:
+    # Create the symlinks, as the disks are not active
+    # in any way
+    try:
+      _GatherAndLinkBlockDevs(instance)
+    except errors.BlockDeviceError, err:
+      _Fail("Block device error: %s", err, exc=True)
+
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
   try:
     hyper.AcceptInstance(instance, info, target)
   except errors.HypervisorError, err:
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
   try:
     hyper.AcceptInstance(instance, info, target)
   except errors.HypervisorError, err:
+    if instance.disk_template in constants.DTS_EXT_MIRROR:
+      _RemoveBlockDevLinks(instance.name, instance.disks)
     _Fail("Failed to accept instance: %s", err, exc=True)
 
 
     _Fail("Failed to accept instance: %s", err, exc=True)
 
 
@@ -1064,16 +1308,13 @@ def MigrateInstance(instance, target, live):
   @type live: boolean
   @param live: whether the migration should be done live or not (the
       interpretation of this parameter is left to the hypervisor)
   @type live: boolean
   @param live: whether the migration should be done live or not (the
       interpretation of this parameter is left to the hypervisor)
-  @rtype: tuple
-  @return: a tuple of (success, msg) where:
-      - succes is a boolean denoting the success/failure of the operation
-      - msg is a string with details in case of failure
+  @raise RPCFail: if migration fails for some reason
 
   """
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
 
   """
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
-    hyper.MigrateInstance(instance.name, target, live)
+    hyper.MigrateInstance(instance, target, live)
   except errors.HypervisorError, err:
     _Fail("Failed to migrate instance: %s", err, exc=True)
 
   except errors.HypervisorError, err:
     _Fail("Failed to migrate instance: %s", err, exc=True)
 
@@ -1099,6 +1340,8 @@ def BlockdevCreate(disk, size, owner, on_primary, info):
       it's not required to return anything.
 
   """
       it's not required to return anything.
 
   """
+  # TODO: remove the obsolete "size" argument
+  # pylint: disable=W0613
   clist = []
   if disk.children:
     for child in disk.children:
   clist = []
   if disk.children:
     for child in disk.children:
@@ -1110,6 +1353,7 @@ def BlockdevCreate(disk, size, owner, on_primary, info):
         # we need the children open in case the device itself has to
         # be assembled
         try:
         # we need the children open in case the device itself has to
         # be assembled
         try:
+          # pylint: disable=E1103
           crdev.Open()
         except errors.BlockDeviceError, err:
           _Fail("Can't make child '%s' read-write: %s", child, err)
           crdev.Open()
         except errors.BlockDeviceError, err:
           _Fail("Can't make child '%s' read-write: %s", child, err)
@@ -1139,6 +1383,87 @@ def BlockdevCreate(disk, size, owner, on_primary, info):
   return device.unique_id
 
 
   return device.unique_id
 
 
+def _WipeDevice(path, offset, size):
+  """This function actually wipes the device.
+
+  @param path: The path to the device to wipe
+  @param offset: The offset in MiB in the file
+  @param size: The size in MiB to write
+
+  """
+  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
+         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
+         "count=%d" % size]
+  result = utils.RunCmd(cmd)
+
+  if result.failed:
+    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
+          result.fail_reason, result.output)
+
+
+def BlockdevWipe(disk, offset, size):
+  """Wipes a block device.
+
+  @type disk: L{objects.Disk}
+  @param disk: the disk object we want to wipe
+  @type offset: int
+  @param offset: The offset in MiB in the file
+  @type size: int
+  @param size: The size in MiB to write
+
+  """
+  try:
+    rdev = _RecursiveFindBD(disk)
+  except errors.BlockDeviceError:
+    rdev = None
+
+  if not rdev:
+    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
+
+  # Do cross verify some of the parameters
+  if offset > rdev.size:
+    _Fail("Offset is bigger than device size")
+  if (offset + size) > rdev.size:
+    _Fail("The provided offset and size to wipe is bigger than device size")
+
+  _WipeDevice(rdev.dev_path, offset, size)
+
+
+def BlockdevPauseResumeSync(disks, pause):
+  """Pause or resume the sync of the block device.
+
+  @type disks: list of L{objects.Disk}
+  @param disks: the disks object we want to pause/resume
+  @type pause: bool
+  @param pause: Wheater to pause or resume
+
+  """
+  success = []
+  for disk in disks:
+    try:
+      rdev = _RecursiveFindBD(disk)
+    except errors.BlockDeviceError:
+      rdev = None
+
+    if not rdev:
+      success.append((False, ("Cannot change sync for device %s:"
+                              " device not found" % disk.iv_name)))
+      continue
+
+    result = rdev.PauseResumeSync(pause)
+
+    if result:
+      success.append((result, None))
+    else:
+      if pause:
+        msg = "Pause"
+      else:
+        msg = "Resume"
+      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
+
+  return success
+
+
 def BlockdevRemove(disk):
   """Remove a block device.
 
 def BlockdevRemove(disk):
   """Remove a block device.
 
@@ -1231,7 +1556,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
   return result
 
 
   return result
 
 
-def BlockdevAssemble(disk, owner, as_primary):
+def BlockdevAssemble(disk, owner, as_primary, idx):
   """Activate a block device for an instance.
 
   This is a wrapper over _RecursiveAssembleBD.
   """Activate a block device for an instance.
 
   This is a wrapper over _RecursiveAssembleBD.
@@ -1244,9 +1569,14 @@ def BlockdevAssemble(disk, owner, as_primary):
   try:
     result = _RecursiveAssembleBD(disk, owner, as_primary)
     if isinstance(result, bdev.BlockDev):
   try:
     result = _RecursiveAssembleBD(disk, owner, as_primary)
     if isinstance(result, bdev.BlockDev):
+      # pylint: disable=E1103
       result = result.dev_path
       result = result.dev_path
+      if as_primary:
+        _SymlinkBlockDev(owner, result, idx)
   except errors.BlockDeviceError, err:
     _Fail("Error while assembling disk: %s", err, exc=True)
   except errors.BlockDeviceError, err:
     _Fail("Error while assembling disk: %s", err, exc=True)
+  except OSError, err:
+    _Fail("Error while symlinking disk: %s", err, exc=True)
 
   return result
 
 
   return result
 
@@ -1331,6 +1661,8 @@ def BlockdevRemovechildren(parent_cdev, new_cdevs):
       else:
         devs.append(bd.dev_path)
     else:
       else:
         devs.append(bd.dev_path)
     else:
+      if not utils.IsNormAbsPath(rpath):
+        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
       devs.append(rpath)
   parent_bdev.RemoveChildren(devs)
 
       devs.append(rpath)
   parent_bdev.RemoveChildren(devs)
 
@@ -1341,9 +1673,7 @@ def BlockdevGetmirrorstatus(disks):
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks which we should query
   @rtype: disk
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks which we should query
   @rtype: disk
-  @return:
-      a list of (mirror_done, estimated_time) tuples, which
-      are the result of L{bdev.BlockDev.CombinedSyncStatus}
+  @return: List of L{objects.BlockDevStatus}, one for each disk
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
@@ -1353,10 +1683,43 @@ def BlockdevGetmirrorstatus(disks):
     rbd = _RecursiveFindBD(dsk)
     if rbd is None:
       _Fail("Can't find device %s", dsk)
     rbd = _RecursiveFindBD(dsk)
     if rbd is None:
       _Fail("Can't find device %s", dsk)
+
     stats.append(rbd.CombinedSyncStatus())
     stats.append(rbd.CombinedSyncStatus())
+
   return stats
 
 
   return stats
 
 
+def BlockdevGetmirrorstatusMulti(disks):
+  """Get the mirroring status of a list of devices.
+
+  @type disks: list of L{objects.Disk}
+  @param disks: the list of disks which we should query
+  @rtype: disk
+  @return: List of tuples, (bool, status), one for each disk; bool denotes
+    success/failure, status is L{objects.BlockDevStatus} on success, string
+    otherwise
+
+  """
+  result = []
+  for disk in disks:
+    try:
+      rbd = _RecursiveFindBD(disk)
+      if rbd is None:
+        result.append((False, "Can't find device %s" % disk))
+        continue
+
+      status = rbd.CombinedSyncStatus()
+    except errors.BlockDeviceError, err:
+      logging.exception("Error while getting disk status")
+      result.append((False, str(err)))
+    else:
+      result.append((True, status))
+
+  assert len(disks) == len(result)
+
+  return result
+
+
 def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
 def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
@@ -1377,6 +1740,22 @@ def _RecursiveFindBD(disk):
   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
 
 
   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
 
 
+def _OpenRealBD(disk):
+  """Opens the underlying block device of a disk.
+
+  @type disk: L{objects.Disk}
+  @param disk: the disk object we want to open
+
+  """
+  real_disk = _RecursiveFindBD(disk)
+  if real_disk is None:
+    _Fail("Block device '%s' is not set up", disk)
+
+  real_disk.Open()
+
+  return real_disk
+
+
 def BlockdevFind(disk):
   """Check if a device is activated.
 
 def BlockdevFind(disk):
   """Check if a device is activated.
 
@@ -1384,19 +1763,90 @@ def BlockdevFind(disk):
 
   @type disk: L{objects.Disk}
   @param disk: the disk to find
 
   @type disk: L{objects.Disk}
   @param disk: the disk to find
-  @rtype: None or tuple
-  @return: None if the disk cannot be found, otherwise a
-      tuple (device_path, major, minor, sync_percent,
-      estimated_time, is_degraded)
+  @rtype: None or objects.BlockDevStatus
+  @return: None if the disk cannot be found, otherwise a the current
+           information
 
   """
   try:
     rbd = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     _Fail("Failed to find device: %s", err, exc=True)
 
   """
   try:
     rbd = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     _Fail("Failed to find device: %s", err, exc=True)
+
   if rbd is None:
     return None
   if rbd is None:
     return None
-  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
+
+  return rbd.GetSyncStatus()
+
+
+def BlockdevGetsize(disks):
+  """Computes the size of the given disks.
+
+  If a disk is not found, returns None instead.
+
+  @type disks: list of L{objects.Disk}
+  @param disks: the list of disk to compute the size for
+  @rtype: list
+  @return: list with elements None if the disk cannot be found,
+      otherwise the size
+
+  """
+  result = []
+  for cf in disks:
+    try:
+      rbd = _RecursiveFindBD(cf)
+    except errors.BlockDeviceError:
+      result.append(None)
+      continue
+    if rbd is None:
+      result.append(None)
+    else:
+      result.append(rbd.GetActualSize())
+  return result
+
+
+def BlockdevExport(disk, dest_node, dest_path, cluster_name):
+  """Export a block device to a remote node.
+
+  @type disk: L{objects.Disk}
+  @param disk: the description of the disk to export
+  @type dest_node: str
+  @param dest_node: the destination node to export to
+  @type dest_path: str
+  @param dest_path: the destination path on the target node
+  @type cluster_name: str
+  @param cluster_name: the cluster name, needed for SSH hostalias
+  @rtype: None
+
+  """
+  real_disk = _OpenRealBD(disk)
+
+  # the block size on the read dd is 1MiB to match our units
+  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
+                               "dd if=%s bs=1048576 count=%s",
+                               real_disk.dev_path, str(disk.size))
+
+  # we set here a smaller block size as, due to ssh buffering, more
+  # than 64-128k will mostly ignored; we use nocreat to fail if the
+  # device is not already there or we pass a wrong path; we use
+  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
+  # to not buffer too much memory; this means that at best, we flush
+  # every 64k, which will not be very fast
+  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
+                                " oflag=dsync", dest_path)
+
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
+
+  # all commands have been checked, so we're safe to combine them
+  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
+
+  result = utils.RunCmd(["bash", "-c", command])
+
+  if result.failed:
+    _Fail("Disk copy command '%s' returned error: %s"
+          " output: %s", command, result.fail_reason, result.output)
 
 
 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
 
 
 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
@@ -1411,10 +1861,10 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
   @param data: the new contents of the file
   @type mode: int
   @param mode: the mode to give the file (can be None)
   @param data: the new contents of the file
   @type mode: int
   @param mode: the mode to give the file (can be None)
-  @type uid: int
-  @param uid: the owner of the file (can be -1 for default)
-  @type gid: int
-  @param gid: the group of the file (can be -1 for default)
+  @type uid: string
+  @param uid: the owner of the file
+  @type gid: string
+  @param gid: the group of the file
   @type atime: float
   @param atime: the atime to set on the file (can be None)
   @type mtime: float
   @type atime: float
   @param atime: the atime to set on the file (can be None)
   @type mtime: float
@@ -1425,27 +1875,43 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
   if not os.path.isabs(file_name):
     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
 
   if not os.path.isabs(file_name):
     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
 
-  allowed_files = set([
-    constants.CLUSTER_CONF_FILE,
-    constants.ETC_HOSTS,
-    constants.SSH_KNOWN_HOSTS_FILE,
-    constants.VNC_PASSWORD_FILE,
-    constants.RAPI_CERT_FILE,
-    constants.RAPI_USERS_FILE,
-    ])
-
-  for hv_name in constants.HYPER_TYPES:
-    hv_class = hypervisor.GetHypervisor(hv_name)
-    allowed_files.update(hv_class.GetAncillaryFiles())
-
-  if file_name not in allowed_files:
+  if file_name not in _ALLOWED_UPLOAD_FILES:
     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
           file_name)
 
   raw_data = _Decompress(data)
 
     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
           file_name)
 
   raw_data = _Decompress(data)
 
-  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
-                  atime=atime, mtime=mtime)
+  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
+    _Fail("Invalid username/groupname type")
+
+  getents = runtime.GetEnts()
+  uid = getents.LookupUser(uid)
+  gid = getents.LookupGroup(gid)
+
+  utils.SafeWriteFile(file_name, None,
+                      data=raw_data, mode=mode, uid=uid, gid=gid,
+                      atime=atime, mtime=mtime)
+
+
+def RunOob(oob_program, command, node, timeout):
+  """Executes oob_program with given command on given node.
+
+  @param oob_program: The path to the executable oob_program
+  @param command: The command to invoke on oob_program
+  @param node: The node given as an argument to the program
+  @param timeout: Timeout after which we kill the oob program
+
+  @return: stdout
+  @raise RPCFail: If execution fails for some reason
+
+  """
+  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
+
+  if result.failed:
+    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
+          result.fail_reason, result.output)
+
+  return result.stdout
 
 
 def WriteSsconfFiles(values):
 
 
 def WriteSsconfFiles(values):
@@ -1468,39 +1934,37 @@ def _ErrnoOrStr(err):
   @param err: the exception to format
 
   """
   @param err: the exception to format
 
   """
-  if hasattr(err, 'errno'):
+  if hasattr(err, "errno"):
     detail = errno.errorcode[err.errno]
   else:
     detail = str(err)
   return detail
 
 
     detail = errno.errorcode[err.errno]
   else:
     detail = str(err)
   return detail
 
 
-def _OSOndiskAPIVersion(name, os_dir):
+def _OSOndiskAPIVersion(os_dir):
   """Compute and return the API version of a given OS.
 
   """Compute and return the API version of a given OS.
 
-  This function will try to read the API version of the OS given by
-  the 'name' parameter and residing in the 'os_dir' directory.
+  This function will try to read the API version of the OS residing in
+  the 'os_dir' directory.
 
 
-  @type name: str
-  @param name: the OS name we should look for
   @type os_dir: str
   @type os_dir: str
-  @param os_dir: the directory inwhich we should look for the OS
+  @param os_dir: the directory in which we should look for the OS
   @rtype: tuple
   @return: tuple (status, data) with status denoting the validity and
       data holding either the vaid versions or an error message
 
   """
   @rtype: tuple
   @return: tuple (status, data) with status denoting the validity and
       data holding either the vaid versions or an error message
 
   """
-  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
+  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
 
   try:
     st = os.stat(api_file)
   except EnvironmentError, err:
 
   try:
     st = os.stat(api_file)
   except EnvironmentError, err:
-    return False, ("Required file 'ganeti_api_version' file not"
-                   " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
+    return False, ("Required file '%s' not found under path %s: %s" %
+                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
 
   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
 
   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
-    return False, ("File 'ganeti_api_version' file at %s is not"
-                   " a regular file" % os_dir)
+    return False, ("File '%s' in %s is not a regular file" %
+                   (constants.OS_API_FILE, os_dir))
 
   try:
     api_versions = utils.ReadFile(api_file).splitlines()
 
   try:
     api_versions = utils.ReadFile(api_file).splitlines()
@@ -1525,12 +1989,16 @@ def DiagnoseOS(top_dirs=None):
       search (if not given defaults to
       L{constants.OS_SEARCH_PATH})
   @rtype: list of L{objects.OS}
       search (if not given defaults to
       L{constants.OS_SEARCH_PATH})
   @rtype: list of L{objects.OS}
-  @return: a list of tuples (name, path, status, diagnose)
-      for all (potential) OSes under all search paths, where:
+  @return: a list of tuples (name, path, status, diagnose, variants,
+      parameters, api_version) for all (potential) OSes under all
+      search paths, where:
           - name is the (potential) OS name
           - path is the full path to the OS
           - status True/False is the validity of the OS
           - diagnose is the error message for an invalid OS, otherwise empty
           - name is the (potential) OS name
           - path is the full path to the OS
           - status True/False is the validity of the OS
           - diagnose is the error message for an invalid OS, otherwise empty
+          - variants is a list of supported OS variants, if any
+          - parameters is a list of (name, help) parameters, if any
+          - api_version is a list of support OS API versions
 
   """
   if top_dirs is None:
 
   """
   if top_dirs is None:
@@ -1545,13 +2013,18 @@ def DiagnoseOS(top_dirs=None):
         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
         break
       for name in f_names:
         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
         break
       for name in f_names:
-        os_path = os.path.sep.join([dir_name, name])
+        os_path = utils.PathJoin(dir_name, name)
         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
         if status:
           diagnose = ""
         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
         if status:
           diagnose = ""
+          variants = os_inst.supported_variants
+          parameters = os_inst.supported_parameters
+          api_versions = os_inst.api_versions
         else:
           diagnose = os_inst
         else:
           diagnose = os_inst
-        result.append((name, os_path, status, diagnose))
+          variants = parameters = api_versions = []
+        result.append((name, os_path, status, diagnose, variants,
+                       parameters, api_versions))
 
   return result
 
 
   return result
 
@@ -1572,12 +2045,13 @@ def _TryOSFromDisk(name, base_dir=None):
   """
   if base_dir is None:
     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
   """
   if base_dir is None:
     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
-    if os_dir is None:
-      return False, "Directory for OS %s not found in search path" % name
   else:
   else:
-    os_dir = os.path.sep.join([base_dir, name])
+    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
 
 
-  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
+  if os_dir is None:
+    return False, "Directory for OS %s not found in search path" % name
+
+  status, api_versions = _OSOndiskAPIVersion(os_dir)
   if not status:
     # push the error up
     return status, api_versions
   if not status:
     # push the error up
     return status, api_versions
@@ -1586,31 +2060,70 @@ def _TryOSFromDisk(name, base_dir=None):
     return False, ("API version mismatch for path '%s': found %s, want %s." %
                    (os_dir, api_versions, constants.OS_API_VERSIONS))
 
     return False, ("API version mismatch for path '%s': found %s, want %s." %
                    (os_dir, api_versions, constants.OS_API_VERSIONS))
 
-  # OS Scripts dictionary, we will populate it with the actual script names
-  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
+  # OS Files dictionary, we will populate it with the absolute path
+  # names; if the value is True, then it is a required file, otherwise
+  # an optional one
+  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
 
 
-  for script in os_scripts:
-    os_scripts[script] = os.path.sep.join([os_dir, script])
+  if max(api_versions) >= constants.OS_API_V15:
+    os_files[constants.OS_VARIANTS_FILE] = False
+
+  if max(api_versions) >= constants.OS_API_V20:
+    os_files[constants.OS_PARAMETERS_FILE] = True
+  else:
+    del os_files[constants.OS_SCRIPT_VERIFY]
+
+  for (filename, required) in os_files.items():
+    os_files[filename] = utils.PathJoin(os_dir, filename)
 
     try:
 
     try:
-      st = os.stat(os_scripts[script])
+      st = os.stat(os_files[filename])
     except EnvironmentError, err:
     except EnvironmentError, err:
-      return False, ("Script '%s' under path '%s' is missing (%s)" %
-                     (script, os_dir, _ErrnoOrStr(err)))
-
-    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
-      return False, ("Script '%s' under path '%s' is not executable" %
-                     (script, os_dir))
+      if err.errno == errno.ENOENT and not required:
+        del os_files[filename]
+        continue
+      return False, ("File '%s' under path '%s' is missing (%s)" %
+                     (filename, os_dir, _ErrnoOrStr(err)))
 
     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
 
     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
-      return False, ("Script '%s' under path '%s' is not a regular file" %
-                     (script, os_dir))
+      return False, ("File '%s' under path '%s' is not a regular file" %
+                     (filename, os_dir))
+
+    if filename in constants.OS_SCRIPTS:
+      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
+        return False, ("File '%s' under path '%s' is not executable" %
+                       (filename, os_dir))
+
+  variants = []
+  if constants.OS_VARIANTS_FILE in os_files:
+    variants_file = os_files[constants.OS_VARIANTS_FILE]
+    try:
+      variants = utils.ReadFile(variants_file).splitlines()
+    except EnvironmentError, err:
+      # we accept missing files, but not other errors
+      if err.errno != errno.ENOENT:
+        return False, ("Error while reading the OS variants file at %s: %s" %
+                       (variants_file, _ErrnoOrStr(err)))
+
+  parameters = []
+  if constants.OS_PARAMETERS_FILE in os_files:
+    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
+    try:
+      parameters = utils.ReadFile(parameters_file).splitlines()
+    except EnvironmentError, err:
+      return False, ("Error while reading the OS parameters file at %s: %s" %
+                     (parameters_file, _ErrnoOrStr(err)))
+    parameters = [v.split(None, 1) for v in parameters]
 
   os_obj = objects.OS(name=name, path=os_dir,
 
   os_obj = objects.OS(name=name, path=os_dir,
-                      create_script=os_scripts[constants.OS_SCRIPT_CREATE],
-                      export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
-                      import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
-                      rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
+                      create_script=os_files[constants.OS_SCRIPT_CREATE],
+                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
+                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
+                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
+                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
+                                                 None),
+                      supported_variants=variants,
+                      supported_parameters=parameters,
                       api_versions=api_versions)
   return True, os_obj
 
                       api_versions=api_versions)
   return True, os_obj
 
@@ -1633,7 +2146,8 @@ def OSFromDisk(name, base_dir=None):
   @raise RPCFail: if we don't find a valid OS
 
   """
   @raise RPCFail: if we don't find a valid OS
 
   """
-  status, payload = _TryOSFromDisk(name, base_dir)
+  name_only = objects.OS.GetName(name)
+  status, payload = _TryOSFromDisk(name_only, base_dir)
 
   if not status:
     _Fail(payload)
 
   if not status:
     _Fail(payload)
@@ -1641,13 +2155,53 @@ def OSFromDisk(name, base_dir=None):
   return payload
 
 
   return payload
 
 
-def OSEnvironment(instance, os, debug=0):
+def OSCoreEnv(os_name, inst_os, os_params, debug=0):
+  """Calculate the basic environment for an os script.
+
+  @type os_name: str
+  @param os_name: full operating system name (including variant)
+  @type inst_os: L{objects.OS}
+  @param inst_os: operating system for which the environment is being built
+  @type os_params: dict
+  @param os_params: the OS parameters
+  @type debug: integer
+  @param debug: debug level (0 or 1, for OS Api 10)
+  @rtype: dict
+  @return: dict of environment variables
+  @raise errors.BlockDeviceError: if the block device
+      cannot be found
+
+  """
+  result = {}
+  api_version = \
+    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
+  result["OS_API_VERSION"] = "%d" % api_version
+  result["OS_NAME"] = inst_os.name
+  result["DEBUG_LEVEL"] = "%d" % debug
+
+  # OS variants
+  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
+    variant = objects.OS.GetVariant(os_name)
+    if not variant:
+      variant = inst_os.supported_variants[0]
+  else:
+    variant = ""
+  result["OS_VARIANT"] = variant
+
+  # OS params
+  for pname, pvalue in os_params.items():
+    result["OSP_%s" % pname.upper()] = pvalue
+
+  return result
+
+
+def OSEnvironment(instance, inst_os, debug=0):
   """Calculate the environment for an os script.
 
   @type instance: L{objects.Instance}
   @param instance: target instance for the os script run
   """Calculate the environment for an os script.
 
   @type instance: L{objects.Instance}
   @param instance: target instance for the os script run
-  @type os: L{objects.OS}
-  @param os: operating system for which the environment is being built
+  @type inst_os: L{objects.OS}
+  @param inst_os: operating system for which the environment is being built
   @type debug: integer
   @param debug: debug level (0 or 1, for OS Api 10)
   @rtype: dict
   @type debug: integer
   @param debug: debug level (0 or 1, for OS Api 10)
   @rtype: dict
@@ -1656,51 +2210,54 @@ def OSEnvironment(instance, os, debug=0):
       cannot be found
 
   """
       cannot be found
 
   """
-  result = {}
-  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
-  result['OS_API_VERSION'] = '%d' % api_version
-  result['INSTANCE_NAME'] = instance.name
-  result['INSTANCE_OS'] = instance.os
-  result['HYPERVISOR'] = instance.hypervisor
-  result['DISK_COUNT'] = '%d' % len(instance.disks)
-  result['NIC_COUNT'] = '%d' % len(instance.nics)
-  result['DEBUG_LEVEL'] = '%d' % debug
+  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
+
+  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
+    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
+
+  result["HYPERVISOR"] = instance.hypervisor
+  result["DISK_COUNT"] = "%d" % len(instance.disks)
+  result["NIC_COUNT"] = "%d" % len(instance.nics)
+  result["INSTANCE_SECONDARY_NODES"] = \
+      ("%s" % " ".join(instance.secondary_nodes))
+
+  # Disks
   for idx, disk in enumerate(instance.disks):
   for idx, disk in enumerate(instance.disks):
-    real_disk = _RecursiveFindBD(disk)
-    if real_disk is None:
-      raise errors.BlockDeviceError("Block device '%s' is not set up" %
-                                    str(disk))
-    real_disk.Open()
-    result['DISK_%d_PATH' % idx] = real_disk.dev_path
-    result['DISK_%d_ACCESS' % idx] = disk.mode
+    real_disk = _OpenRealBD(disk)
+    result["DISK_%d_PATH" % idx] = real_disk.dev_path
+    result["DISK_%d_ACCESS" % idx] = disk.mode
     if constants.HV_DISK_TYPE in instance.hvparams:
     if constants.HV_DISK_TYPE in instance.hvparams:
-      result['DISK_%d_FRONTEND_TYPE' % idx] = \
+      result["DISK_%d_FRONTEND_TYPE" % idx] = \
         instance.hvparams[constants.HV_DISK_TYPE]
     if disk.dev_type in constants.LDS_BLOCK:
         instance.hvparams[constants.HV_DISK_TYPE]
     if disk.dev_type in constants.LDS_BLOCK:
-      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
+      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
     elif disk.dev_type == constants.LD_FILE:
     elif disk.dev_type == constants.LD_FILE:
-      result['DISK_%d_BACKEND_TYPE' % idx] = \
-        'file:%s' % disk.physical_id[0]
+      result["DISK_%d_BACKEND_TYPE" % idx] = \
+        "file:%s" % disk.physical_id[0]
+
+  # NICs
   for idx, nic in enumerate(instance.nics):
   for idx, nic in enumerate(instance.nics):
-    result['NIC_%d_MAC' % idx] = nic.mac
+    result["NIC_%d_MAC" % idx] = nic.mac
     if nic.ip:
     if nic.ip:
-      result['NIC_%d_IP' % idx] = nic.ip
-    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
+      result["NIC_%d_IP" % idx] = nic.ip
+    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
-      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
+      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
     if nic.nicparams[constants.NIC_LINK]:
     if nic.nicparams[constants.NIC_LINK]:
-      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
+      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
     if constants.HV_NIC_TYPE in instance.hvparams:
     if constants.HV_NIC_TYPE in instance.hvparams:
-      result['NIC_%d_FRONTEND_TYPE' % idx] = \
+      result["NIC_%d_FRONTEND_TYPE" % idx] = \
         instance.hvparams[constants.HV_NIC_TYPE]
 
         instance.hvparams[constants.HV_NIC_TYPE]
 
+  # HV/BE params
   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
     for key, value in source.items():
       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
 
   return result
 
   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
     for key, value in source.items():
       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
 
   return result
 
-def BlockdevGrow(disk, amount):
+
+def BlockdevGrow(disk, amount, dryrun):
   """Grow a stack of block devices.
 
   This function is called recursively, with the childrens being the
   """Grow a stack of block devices.
 
   This function is called recursively, with the childrens being the
@@ -1708,10 +2265,14 @@ def BlockdevGrow(disk, amount):
 
   @type disk: L{objects.Disk}
   @param disk: the disk to be grown
 
   @type disk: L{objects.Disk}
   @param disk: the disk to be grown
+  @type amount: integer
+  @param amount: the amount (in mebibytes) to grow with
+  @type dryrun: boolean
+  @param dryrun: whether to execute the operation in simulation mode
+      only, without actually increasing the size
   @rtype: (status, result)
   @rtype: (status, result)
-  @return: a tuple with the status of the operation
-      (True/False), and the errors message if status
-      is False
+  @return: a tuple with the status of the operation (True/False), and
+      the errors message if status is False
 
   """
   r_dev = _RecursiveFindBD(disk)
 
   """
   r_dev = _RecursiveFindBD(disk)
@@ -1719,7 +2280,7 @@ def BlockdevGrow(disk, amount):
     _Fail("Cannot find block device %s", disk)
 
   try:
     _Fail("Cannot find block device %s", disk)
 
   try:
-    r_dev.Grow(amount)
+    r_dev.Grow(amount, dryrun)
   except errors.BlockDeviceError, err:
     _Fail("Failed to grow block device: %s", err, exc=True)
 
   except errors.BlockDeviceError, err:
     _Fail("Failed to grow block device: %s", err, exc=True)
 
@@ -1733,22 +2294,18 @@ def BlockdevSnapshot(disk):
   @type disk: L{objects.Disk}
   @param disk: the disk to be snapshotted
   @rtype: string
   @type disk: L{objects.Disk}
   @param disk: the disk to be snapshotted
   @rtype: string
-  @return: snapshot disk path
+  @return: snapshot disk ID as (vg, lv)
 
   """
 
   """
-  if disk.children:
-    if len(disk.children) == 1:
-      # only one child, let's recurse on it
-      return BlockdevSnapshot(disk.children[0])
-    else:
-      # more than one child, choose one that matches
-      for child in disk.children:
-        if child.size == disk.size:
-          # return implies breaking the loop
-          return BlockdevSnapshot(child)
+  if disk.dev_type == constants.LD_DRBD8:
+    if not disk.children:
+      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
+            disk.unique_id)
+    return BlockdevSnapshot(disk.children[0])
   elif disk.dev_type == constants.LD_LV:
     r_dev = _RecursiveFindBD(disk)
     if r_dev is not None:
   elif disk.dev_type == constants.LD_LV:
     r_dev = _RecursiveFindBD(disk)
     if r_dev is not None:
+      # FIXME: choose a saner value for the snapshot size
       # let's stay on the safe side and ask for the full size, for now
       return r_dev.Snapshot(disk.size)
     else:
       # let's stay on the safe side and ask for the full size, for now
       return r_dev.Snapshot(disk.size)
     else:
@@ -1758,68 +2315,6 @@ def BlockdevSnapshot(disk):
           disk.unique_id, disk.dev_type)
 
 
           disk.unique_id, disk.dev_type)
 
 
-def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
-  """Export a block device snapshot to a remote node.
-
-  @type disk: L{objects.Disk}
-  @param disk: the description of the disk to export
-  @type dest_node: str
-  @param dest_node: the destination node to export to
-  @type instance: L{objects.Instance}
-  @param instance: the instance object to whom the disk belongs
-  @type cluster_name: str
-  @param cluster_name: the cluster name, needed for SSH hostalias
-  @type idx: int
-  @param idx: the index of the disk in the instance's disk list,
-      used to export to the OS scripts environment
-  @rtype: None
-
-  """
-  inst_os = OSFromDisk(instance.os)
-  export_env = OSEnvironment(instance, inst_os)
-
-  export_script = inst_os.export_script
-
-  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
-                                     instance.name, int(time.time()))
-  if not os.path.exists(constants.LOG_OS_DIR):
-    os.mkdir(constants.LOG_OS_DIR, 0750)
-  real_disk = _RecursiveFindBD(disk)
-  if real_disk is None:
-    _Fail("Block device '%s' is not set up", disk)
-
-  real_disk.Open()
-
-  export_env['EXPORT_DEVICE'] = real_disk.dev_path
-  export_env['EXPORT_INDEX'] = str(idx)
-
-  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
-  destfile = disk.physical_id[1]
-
-  # the target command is built out of three individual commands,
-  # which are joined by pipes; we check each individual command for
-  # valid parameters
-  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
-                               export_script, logfile)
-
-  comprcmd = "gzip"
-
-  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
-                                destdir, destdir, destfile)
-  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
-                                                   constants.GANETI_RUNAS,
-                                                   destcmd)
-
-  # all commands have been checked, so we're safe to combine them
-  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
-
-  result = utils.RunCmd(command, env=export_env)
-
-  if result.failed:
-    _Fail("OS snapshot export command '%s' returned error: %s"
-          " output: %s", command, result.fail_reason, result.output)
-
-
 def FinalizeExport(instance, snap_disks):
   """Write out the export configuration information.
 
 def FinalizeExport(instance, snap_disks):
   """Write out the export configuration information.
 
@@ -1833,53 +2328,71 @@ def FinalizeExport(instance, snap_disks):
   @rtype: None
 
   """
   @rtype: None
 
   """
-  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
-  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
+  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
+  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
 
   config = objects.SerializableConfigParser()
 
   config.add_section(constants.INISECT_EXP)
 
   config = objects.SerializableConfigParser()
 
   config.add_section(constants.INISECT_EXP)
-  config.set(constants.INISECT_EXP, 'version', '0')
-  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
-  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
-  config.set(constants.INISECT_EXP, 'os', instance.os)
-  config.set(constants.INISECT_EXP, 'compression', 'gzip')
+  config.set(constants.INISECT_EXP, "version", "0")
+  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
+  config.set(constants.INISECT_EXP, "source", instance.primary_node)
+  config.set(constants.INISECT_EXP, "os", instance.os)
+  config.set(constants.INISECT_EXP, "compression", "none")
 
   config.add_section(constants.INISECT_INS)
 
   config.add_section(constants.INISECT_INS)
-  config.set(constants.INISECT_INS, 'name', instance.name)
-  config.set(constants.INISECT_INS, 'memory', '%d' %
+  config.set(constants.INISECT_INS, "name", instance.name)
+  config.set(constants.INISECT_INS, "memory", "%d" %
              instance.beparams[constants.BE_MEMORY])
              instance.beparams[constants.BE_MEMORY])
-  config.set(constants.INISECT_INS, 'vcpus', '%d' %
+  config.set(constants.INISECT_INS, "vcpus", "%d" %
              instance.beparams[constants.BE_VCPUS])
              instance.beparams[constants.BE_VCPUS])
-  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
+  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
+  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
+  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
 
   nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
     nic_total += 1
 
   nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
     nic_total += 1
-    config.set(constants.INISECT_INS, 'nic%d_mac' %
-               nic_count, '%s' % nic.mac)
-    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
-               '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, "nic%d_mac" %
+               nic_count, "%s" % nic.mac)
+    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
+    for param in constants.NICS_PARAMETER_TYPES:
+      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
+                 "%s" % nic.nicparams.get(param, None))
   # TODO: redundant: on load can read nics until it doesn't exist
   # TODO: redundant: on load can read nics until it doesn't exist
-  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
+  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
     if disk:
       disk_total += 1
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
     if disk:
       disk_total += 1
-      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
-                 ('%s' % disk.iv_name))
-      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
-                 ('%s' % disk.physical_id[1]))
-      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
-                 ('%d' % disk.size))
+      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
+                 ("%s" % disk.iv_name))
+      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
+                 ("%s" % disk.physical_id[1]))
+      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
+                 ("%d" % disk.size))
+
+  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
+
+  # New-style hypervisor/backend parameters
+
+  config.add_section(constants.INISECT_HYP)
+  for name, value in instance.hvparams.items():
+    if name not in constants.HVC_GLOBALS:
+      config.set(constants.INISECT_HYP, name, str(value))
+
+  config.add_section(constants.INISECT_BEP)
+  for name, value in instance.beparams.items():
+    config.set(constants.INISECT_BEP, name, str(value))
 
 
-  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
+  config.add_section(constants.INISECT_OSP)
+  for name, value in instance.osparams.items():
+    config.set(constants.INISECT_OSP, name, str(value))
 
 
-  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
+  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
                   data=config.Dumps())
                   data=config.Dumps())
-  shutil.rmtree(finaldestdir, True)
+  shutil.rmtree(finaldestdir, ignore_errors=True)
   shutil.move(destdir, finaldestdir)
 
 
   shutil.move(destdir, finaldestdir)
 
 
@@ -1894,7 +2407,7 @@ def ExportInfo(dest):
       export info
 
   """
       export info
 
   """
-  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
+  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
 
   config = objects.SerializableConfigParser()
   config.read(cff)
 
   config = objects.SerializableConfigParser()
   config.read(cff)
@@ -1906,54 +2419,6 @@ def ExportInfo(dest):
   return config.Dumps()
 
 
   return config.Dumps()
 
 
-def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
-  """Import an os image into an instance.
-
-  @type instance: L{objects.Instance}
-  @param instance: instance to import the disks into
-  @type src_node: string
-  @param src_node: source node for the disk images
-  @type src_images: list of string
-  @param src_images: absolute paths of the disk images
-  @rtype: list of boolean
-  @return: each boolean represent the success of importing the n-th disk
-
-  """
-  inst_os = OSFromDisk(instance.os)
-  import_env = OSEnvironment(instance, inst_os)
-  import_script = inst_os.import_script
-
-  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
-                                        instance.name, int(time.time()))
-  if not os.path.exists(constants.LOG_OS_DIR):
-    os.mkdir(constants.LOG_OS_DIR, 0750)
-
-  comprcmd = "gunzip"
-  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
-                               import_script, logfile)
-
-  final_result = []
-  for idx, image in enumerate(src_images):
-    if image:
-      destcmd = utils.BuildShellCmd('cat %s', image)
-      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
-                                                       constants.GANETI_RUNAS,
-                                                       destcmd)
-      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
-      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
-      import_env['IMPORT_INDEX'] = str(idx)
-      result = utils.RunCmd(command, env=import_env)
-      if result.failed:
-        logging.error("Disk import command '%s' returned error: %s"
-                      " output: %s", command, result.fail_reason,
-                      result.output)
-        final_result.append("error importing disk %d: %s, %s" %
-                            (idx, result.fail_reason, result.output[-100]))
-
-  if final_result:
-    _Fail("; ".join(final_result), log=False)
-
-
 def ListExports():
   """Return a list of exports currently available on this machine.
 
 def ListExports():
   """Return a list of exports currently available on this machine.
 
@@ -1962,7 +2427,7 @@ def ListExports():
 
   """
   if os.path.isdir(constants.EXPORT_DIR):
 
   """
   if os.path.isdir(constants.EXPORT_DIR):
-    return utils.ListVisibleFiles(constants.EXPORT_DIR)
+    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
   else:
     _Fail("No exports directory")
 
   else:
     _Fail("No exports directory")
 
@@ -1975,7 +2440,7 @@ def RemoveExport(export):
   @rtype: None
 
   """
   @rtype: None
 
   """
-  target = os.path.join(constants.EXPORT_DIR, export)
+  target = utils.PathJoin(constants.EXPORT_DIR, export)
 
   try:
     shutil.rmtree(target)
 
   try:
     shutil.rmtree(target)
@@ -2024,27 +2489,31 @@ def BlockdevRename(devlist):
     _Fail("; ".join(msgs))
 
 
     _Fail("; ".join(msgs))
 
 
-def _TransformFileStorageDir(file_storage_dir):
+def _TransformFileStorageDir(fs_dir):
   """Checks whether given file_storage_dir is valid.
 
   """Checks whether given file_storage_dir is valid.
 
-  Checks wheter the given file_storage_dir is within the cluster-wide
-  default file_storage_dir stored in SimpleStore. Only paths under that
-  directory are allowed.
+  Checks wheter the given fs_dir is within the cluster-wide default
+  file_storage_dir or the shared_file_storage_dir, which are stored in
+  SimpleStore. Only paths under those directories are allowed.
 
 
-  @type file_storage_dir: str
-  @param file_storage_dir: the path to check
+  @type fs_dir: str
+  @param fs_dir: the path to check
 
   @return: the normalized path if valid, None otherwise
 
   """
 
   @return: the normalized path if valid, None otherwise
 
   """
+  if not constants.ENABLE_FILE_STORAGE:
+    _Fail("File storage disabled at configure time")
   cfg = _GetConfig()
   cfg = _GetConfig()
-  file_storage_dir = os.path.normpath(file_storage_dir)
-  base_file_storage_dir = cfg.GetFileStorageDir()
-  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
-      base_file_storage_dir):
+  fs_dir = os.path.normpath(fs_dir)
+  base_fstore = cfg.GetFileStorageDir()
+  base_shared = cfg.GetSharedFileStorageDir()
+  if not (utils.IsBelowDir(base_fstore, fs_dir) or
+          utils.IsBelowDir(base_shared, fs_dir)):
     _Fail("File storage directory '%s' is not under base file"
     _Fail("File storage directory '%s' is not under base file"
-          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
-  return file_storage_dir
+          " storage directory '%s' or shared storage directory '%s'",
+          fs_dir, base_fstore, base_shared)
+  return fs_dir
 
 
 def CreateFileStorageDir(file_storage_dir):
 
 
 def CreateFileStorageDir(file_storage_dir):
@@ -2146,7 +2615,7 @@ def _EnsureJobQueueFile(file_name):
 def JobQueueUpdate(file_name, content):
   """Updates a file in the queue directory.
 
 def JobQueueUpdate(file_name, content):
   """Updates a file in the queue directory.
 
-  This is just a wrapper over L{utils.WriteFile}, with proper
+  This is just a wrapper over L{utils.io.WriteFile}, with proper
   checking.
 
   @type file_name: str
   checking.
 
   @type file_name: str
@@ -2158,9 +2627,11 @@ def JobQueueUpdate(file_name, content):
 
   """
   _EnsureJobQueueFile(file_name)
 
   """
   _EnsureJobQueueFile(file_name)
+  getents = runtime.GetEnts()
 
   # Write and replace the file atomically
 
   # Write and replace the file atomically
-  utils.WriteFile(file_name, data=_Decompress(content))
+  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
+                  gid=getents.masterd_gid)
 
 
 def JobQueueRename(old, new):
 
 
 def JobQueueRename(old, new):
@@ -2182,24 +2653,6 @@ def JobQueueRename(old, new):
   utils.RenameFile(old, new, mkdir=True)
 
 
   utils.RenameFile(old, new, mkdir=True)
 
 
-def JobQueueSetDrainFlag(drain_flag):
-  """Set the drain flag for the queue.
-
-  This will set or unset the queue drain flag.
-
-  @type drain_flag: boolean
-  @param drain_flag: if True, will set the drain flag, otherwise reset it.
-  @rtype: truple
-  @return: always True, None
-  @warning: the function always returns True
-
-  """
-  if drain_flag:
-    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
-  else:
-    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
-
-
 def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
 def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
@@ -2254,6 +2707,70 @@ def ValidateHVParams(hvname, hvparams):
     _Fail(str(err), log=False)
 
 
     _Fail(str(err), log=False)
 
 
+def _CheckOSPList(os_obj, parameters):
+  """Check whether a list of parameters is supported by the OS.
+
+  @type os_obj: L{objects.OS}
+  @param os_obj: OS object to check
+  @type parameters: list
+  @param parameters: the list of parameters to check
+
+  """
+  supported = [v[0] for v in os_obj.supported_parameters]
+  delta = frozenset(parameters).difference(supported)
+  if delta:
+    _Fail("The following parameters are not supported"
+          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
+
+
+def ValidateOS(required, osname, checks, osparams):
+  """Validate the given OS' parameters.
+
+  @type required: boolean
+  @param required: whether absence of the OS should translate into
+      failure or not
+  @type osname: string
+  @param osname: the OS to be validated
+  @type checks: list
+  @param checks: list of the checks to run (currently only 'parameters')
+  @type osparams: dict
+  @param osparams: dictionary with OS parameters
+  @rtype: boolean
+  @return: True if the validation passed, or False if the OS was not
+      found and L{required} was false
+
+  """
+  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
+    _Fail("Unknown checks required for OS %s: %s", osname,
+          set(checks).difference(constants.OS_VALIDATE_CALLS))
+
+  name_only = objects.OS.GetName(osname)
+  status, tbv = _TryOSFromDisk(name_only, None)
+
+  if not status:
+    if required:
+      _Fail(tbv)
+    else:
+      return False
+
+  if max(tbv.api_versions) < constants.OS_API_V20:
+    return True
+
+  if constants.OS_VALIDATE_PARAMETERS in checks:
+    _CheckOSPList(tbv, osparams.keys())
+
+  validate_env = OSCoreEnv(osname, tbv, osparams)
+  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
+                        cwd=tbv.path, reset_env=True)
+  if result.failed:
+    logging.error("os validate command '%s' returned error: %s output: %s",
+                  result.cmd, result.fail_reason, result.output)
+    _Fail("OS validation script failed (%s), output: %s",
+          result.fail_reason, result.output, log=False)
+
+  return True
+
+
 def DemoteFromMC():
   """Demotes the current node from master candidate role.
 
 def DemoteFromMC():
   """Demotes the current node from master candidate role.
 
@@ -2262,24 +2779,402 @@ def DemoteFromMC():
   master, myself = ssconf.GetMasterAndMyself()
   if master == myself:
     _Fail("ssconf status shows I'm the master node, will not demote")
   master, myself = ssconf.GetMasterAndMyself()
   if master == myself:
     _Fail("ssconf status shows I'm the master node, will not demote")
-  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
-  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
+
+  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
+  if not result.failed:
     _Fail("The master daemon is running, will not demote")
     _Fail("The master daemon is running, will not demote")
+
   try:
     if os.path.isfile(constants.CLUSTER_CONF_FILE):
       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
   except EnvironmentError, err:
     if err.errno != errno.ENOENT:
       _Fail("Error while backing up cluster file: %s", err, exc=True)
   try:
     if os.path.isfile(constants.CLUSTER_CONF_FILE):
       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
   except EnvironmentError, err:
     if err.errno != errno.ENOENT:
       _Fail("Error while backing up cluster file: %s", err, exc=True)
+
   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
 
 
   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
 
 
+def _GetX509Filenames(cryptodir, name):
+  """Returns the full paths for the private key and certificate.
+
+  """
+  return (utils.PathJoin(cryptodir, name),
+          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
+          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
+
+
+def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
+  """Creates a new X509 certificate for SSL/TLS.
+
+  @type validity: int
+  @param validity: Validity in seconds
+  @rtype: tuple; (string, string)
+  @return: Certificate name and public part
+
+  """
+  (key_pem, cert_pem) = \
+    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
+                                     min(validity, _MAX_SSL_CERT_VALIDITY))
+
+  cert_dir = tempfile.mkdtemp(dir=cryptodir,
+                              prefix="x509-%s-" % utils.TimestampForFilename())
+  try:
+    name = os.path.basename(cert_dir)
+    assert len(name) > 5
+
+    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
+
+    utils.WriteFile(key_file, mode=0400, data=key_pem)
+    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
+
+    # Never return private key as it shouldn't leave the node
+    return (name, cert_pem)
+  except Exception:
+    shutil.rmtree(cert_dir, ignore_errors=True)
+    raise
+
+
+def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
+  """Removes a X509 certificate.
+
+  @type name: string
+  @param name: Certificate name
+
+  """
+  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
+
+  utils.RemoveFile(key_file)
+  utils.RemoveFile(cert_file)
+
+  try:
+    os.rmdir(cert_dir)
+  except EnvironmentError, err:
+    _Fail("Cannot remove certificate directory '%s': %s",
+          cert_dir, err)
+
+
+def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
+  """Returns the command for the requested input/output.
+
+  @type instance: L{objects.Instance}
+  @param instance: The instance object
+  @param mode: Import/export mode
+  @param ieio: Input/output type
+  @param ieargs: Input/output arguments
+
+  """
+  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
+
+  env = None
+  prefix = None
+  suffix = None
+  exp_size = None
+
+  if ieio == constants.IEIO_FILE:
+    (filename, ) = ieargs
+
+    if not utils.IsNormAbsPath(filename):
+      _Fail("Path '%s' is not normalized or absolute", filename)
+
+    real_filename = os.path.realpath(filename)
+    directory = os.path.dirname(real_filename)
+
+    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
+      _Fail("File '%s' is not under exports directory '%s': %s",
+            filename, constants.EXPORT_DIR, real_filename)
+
+    # Create directory
+    utils.Makedirs(directory, mode=0750)
+
+    quoted_filename = utils.ShellQuote(filename)
+
+    if mode == constants.IEM_IMPORT:
+      suffix = "> %s" % quoted_filename
+    elif mode == constants.IEM_EXPORT:
+      suffix = "< %s" % quoted_filename
+
+      # Retrieve file size
+      try:
+        st = os.stat(filename)
+      except EnvironmentError, err:
+        logging.error("Can't stat(2) %s: %s", filename, err)
+      else:
+        exp_size = utils.BytesToMebibyte(st.st_size)
+
+  elif ieio == constants.IEIO_RAW_DISK:
+    (disk, ) = ieargs
+
+    real_disk = _OpenRealBD(disk)
+
+    if mode == constants.IEM_IMPORT:
+      # we set here a smaller block size as, due to transport buffering, more
+      # than 64-128k will mostly ignored; we use nocreat to fail if the device
+      # is not already there or we pass a wrong path; we use notrunc to no
+      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
+      # much memory; this means that at best, we flush every 64k, which will
+      # not be very fast
+      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
+                                    " bs=%s oflag=dsync"),
+                                    real_disk.dev_path,
+                                    str(64 * 1024))
+
+    elif mode == constants.IEM_EXPORT:
+      # the block size on the read dd is 1MiB to match our units
+      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
+                                   real_disk.dev_path,
+                                   str(1024 * 1024), # 1 MB
+                                   str(disk.size))
+      exp_size = disk.size
+
+  elif ieio == constants.IEIO_SCRIPT:
+    (disk, disk_index, ) = ieargs
+
+    assert isinstance(disk_index, (int, long))
+
+    real_disk = _OpenRealBD(disk)
+
+    inst_os = OSFromDisk(instance.os)
+    env = OSEnvironment(instance, inst_os)
+
+    if mode == constants.IEM_IMPORT:
+      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
+      env["IMPORT_INDEX"] = str(disk_index)
+      script = inst_os.import_script
+
+    elif mode == constants.IEM_EXPORT:
+      env["EXPORT_DEVICE"] = real_disk.dev_path
+      env["EXPORT_INDEX"] = str(disk_index)
+      script = inst_os.export_script
+
+    # TODO: Pass special environment only to script
+    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
+
+    if mode == constants.IEM_IMPORT:
+      suffix = "| %s" % script_cmd
+
+    elif mode == constants.IEM_EXPORT:
+      prefix = "%s |" % script_cmd
+
+    # Let script predict size
+    exp_size = constants.IE_CUSTOM_SIZE
+
+  else:
+    _Fail("Invalid %s I/O mode %r", mode, ieio)
+
+  return (env, prefix, suffix, exp_size)
+
+
+def _CreateImportExportStatusDir(prefix):
+  """Creates status directory for import/export.
+
+  """
+  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
+                          prefix=("%s-%s-" %
+                                  (prefix, utils.TimestampForFilename())))
+
+
+def StartImportExportDaemon(mode, opts, host, port, instance, component,
+                            ieio, ieioargs):
+  """Starts an import or export daemon.
+
+  @param mode: Import/output mode
+  @type opts: L{objects.ImportExportOptions}
+  @param opts: Daemon options
+  @type host: string
+  @param host: Remote host for export (None for import)
+  @type port: int
+  @param port: Remote port for export (None for import)
+  @type instance: L{objects.Instance}
+  @param instance: Instance object
+  @type component: string
+  @param component: which part of the instance is transferred now,
+      e.g. 'disk/0'
+  @param ieio: Input/output type
+  @param ieioargs: Input/output arguments
+
+  """
+  if mode == constants.IEM_IMPORT:
+    prefix = "import"
+
+    if not (host is None and port is None):
+      _Fail("Can not specify host or port on import")
+
+  elif mode == constants.IEM_EXPORT:
+    prefix = "export"
+
+    if host is None or port is None:
+      _Fail("Host and port must be specified for an export")
+
+  else:
+    _Fail("Invalid mode %r", mode)
+
+  if (opts.key_name is None) ^ (opts.ca_pem is None):
+    _Fail("Cluster certificate can only be used for both key and CA")
+
+  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
+    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
+
+  if opts.key_name is None:
+    # Use server.pem
+    key_path = constants.NODED_CERT_FILE
+    cert_path = constants.NODED_CERT_FILE
+    assert opts.ca_pem is None
+  else:
+    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
+                                                 opts.key_name)
+    assert opts.ca_pem is not None
+
+  for i in [key_path, cert_path]:
+    if not os.path.exists(i):
+      _Fail("File '%s' does not exist" % i)
+
+  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
+  try:
+    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
+    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
+    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
+
+    if opts.ca_pem is None:
+      # Use server.pem
+      ca = utils.ReadFile(constants.NODED_CERT_FILE)
+    else:
+      ca = opts.ca_pem
+
+    # Write CA file
+    utils.WriteFile(ca_file, data=ca, mode=0400)
+
+    cmd = [
+      constants.IMPORT_EXPORT_DAEMON,
+      status_file, mode,
+      "--key=%s" % key_path,
+      "--cert=%s" % cert_path,
+      "--ca=%s" % ca_file,
+      ]
+
+    if host:
+      cmd.append("--host=%s" % host)
+
+    if port:
+      cmd.append("--port=%s" % port)
+
+    if opts.ipv6:
+      cmd.append("--ipv6")
+    else:
+      cmd.append("--ipv4")
+
+    if opts.compress:
+      cmd.append("--compress=%s" % opts.compress)
+
+    if opts.magic:
+      cmd.append("--magic=%s" % opts.magic)
+
+    if exp_size is not None:
+      cmd.append("--expected-size=%s" % exp_size)
+
+    if cmd_prefix:
+      cmd.append("--cmd-prefix=%s" % cmd_prefix)
+
+    if cmd_suffix:
+      cmd.append("--cmd-suffix=%s" % cmd_suffix)
+
+    if mode == constants.IEM_EXPORT:
+      # Retry connection a few times when connecting to remote peer
+      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
+      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
+    elif opts.connect_timeout is not None:
+      assert mode == constants.IEM_IMPORT
+      # Overall timeout for establishing connection while listening
+      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
+
+    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
+
+    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
+    # support for receiving a file descriptor for output
+    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
+                      output=logfile)
+
+    # The import/export name is simply the status directory name
+    return os.path.basename(status_dir)
+
+  except Exception:
+    shutil.rmtree(status_dir, ignore_errors=True)
+    raise
+
+
+def GetImportExportStatus(names):
+  """Returns import/export daemon status.
+
+  @type names: sequence
+  @param names: List of names
+  @rtype: List of dicts
+  @return: Returns a list of the state of each named import/export or None if a
+           status couldn't be read
+
+  """
+  result = []
+
+  for name in names:
+    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
+                                 _IES_STATUS_FILE)
+
+    try:
+      data = utils.ReadFile(status_file)
+    except EnvironmentError, err:
+      if err.errno != errno.ENOENT:
+        raise
+      data = None
+
+    if not data:
+      result.append(None)
+      continue
+
+    result.append(serializer.LoadJson(data))
+
+  return result
+
+
+def AbortImportExport(name):
+  """Sends SIGTERM to a running import/export daemon.
+
+  """
+  logging.info("Abort import/export %s", name)
+
+  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
+  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
+
+  if pid:
+    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
+                 name, pid)
+    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
+
+
+def CleanupImportExport(name):
+  """Cleanup after an import or export.
+
+  If the import/export daemon is still running it's killed. Afterwards the
+  whole status directory is removed.
+
+  """
+  logging.info("Finalizing import/export %s", name)
+
+  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
+
+  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
+
+  if pid:
+    logging.info("Import/export %s is still running with PID %s",
+                 name, pid)
+    utils.KillProcess(pid, waitpid=False)
+
+  shutil.rmtree(status_dir, ignore_errors=True)
+
+
 def _FindDisks(nodes_ip, disks):
   """Sets the physical ID on disks and returns the block devices.
 
   """
   # set the correct physical ID
 def _FindDisks(nodes_ip, disks):
   """Sets the physical ID on disks and returns the block devices.
 
   """
   # set the correct physical ID
-  my_name = utils.HostInfo().name
+  my_name = netutils.Hostname.GetSysName()
   for cf in disks:
     cf.SetPhysicalID(my_name, nodes_ip)
 
   for cf in disks:
     cf.SetPhysicalID(my_name, nodes_ip)
 
@@ -2327,20 +3222,22 @@ def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
       rd.AttachNet(multimaster)
     except errors.BlockDeviceError, err:
       _Fail("Can't change network configuration: %s", err)
       rd.AttachNet(multimaster)
     except errors.BlockDeviceError, err:
       _Fail("Can't change network configuration: %s", err)
+
   # wait until the disks are connected; we need to retry the re-attach
   # if the device becomes standalone, as this might happen if the one
   # node disconnects and reconnects in a different mode before the
   # other node reconnects; in this case, one or both of the nodes will
   # decide it has wrong configuration and switch to standalone
   # wait until the disks are connected; we need to retry the re-attach
   # if the device becomes standalone, as this might happen if the one
   # node disconnects and reconnects in a different mode before the
   # other node reconnects; in this case, one or both of the nodes will
   # decide it has wrong configuration and switch to standalone
-  RECONNECT_TIMEOUT = 2 * 60
-  sleep_time = 0.100 # start with 100 miliseconds
-  timeout_limit = time.time() + RECONNECT_TIMEOUT
-  while time.time() < timeout_limit:
+
+  def _Attach():
     all_connected = True
     all_connected = True
+
     for rd in bdevs:
       stats = rd.GetProcStatus()
     for rd in bdevs:
       stats = rd.GetProcStatus()
-      if not (stats.is_connected or stats.is_in_resync):
-        all_connected = False
+
+      all_connected = (all_connected and
+                       (stats.is_connected or stats.is_in_resync))
+
       if stats.is_standalone:
         # peer had different config info and this node became
         # standalone, even though this should not happen with the
       if stats.is_standalone:
         # peer had different config info and this node became
         # standalone, even though this should not happen with the
@@ -2349,12 +3246,16 @@ def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
           rd.AttachNet(multimaster)
         except errors.BlockDeviceError, err:
           _Fail("Can't change network configuration: %s", err)
           rd.AttachNet(multimaster)
         except errors.BlockDeviceError, err:
           _Fail("Can't change network configuration: %s", err)
-    if all_connected:
-      break
-    time.sleep(sleep_time)
-    sleep_time = min(5, sleep_time * 1.5)
-  if not all_connected:
+
+    if not all_connected:
+      raise utils.RetryAgain()
+
+  try:
+    # Start with a delay of 100 miliseconds and go up to 5 seconds
+    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
+  except utils.RetryTimeout:
     _Fail("Timeout in disk reconnecting")
     _Fail("Timeout in disk reconnecting")
+
   if multimaster:
     # change to primary mode
     for rd in bdevs:
   if multimaster:
     # change to primary mode
     for rd in bdevs:
@@ -2368,14 +3269,25 @@ def DrbdWaitSync(nodes_ip, disks):
   """Wait until DRBDs have synchronized.
 
   """
   """Wait until DRBDs have synchronized.
 
   """
+  def _helper(rd):
+    stats = rd.GetProcStatus()
+    if not (stats.is_connected or stats.is_in_resync):
+      raise utils.RetryAgain()
+    return stats
+
   bdevs = _FindDisks(nodes_ip, disks)
 
   min_resync = 100
   alldone = True
   for rd in bdevs:
   bdevs = _FindDisks(nodes_ip, disks)
 
   min_resync = 100
   alldone = True
   for rd in bdevs:
-    stats = rd.GetProcStatus()
-    if not (stats.is_connected or stats.is_in_resync):
-      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
+    try:
+      # poll each second for 15 seconds
+      stats = utils.Retry(_helper, 1, 15, args=[rd])
+    except utils.RetryTimeout:
+      stats = rd.GetProcStatus()
+      # last check
+      if not (stats.is_connected or stats.is_in_resync):
+        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
     alldone = alldone and (not stats.is_in_resync)
     if stats.sync_percent is not None:
       min_resync = min(min_resync, stats.sync_percent)
     alldone = alldone and (not stats.is_in_resync)
     if stats.sync_percent is not None:
       min_resync = min(min_resync, stats.sync_percent)
@@ -2383,6 +3295,16 @@ def DrbdWaitSync(nodes_ip, disks):
   return (alldone, min_resync)
 
 
   return (alldone, min_resync)
 
 
+def GetDrbdUsermodeHelper():
+  """Returns DRBD usermode helper currently configured.
+
+  """
+  try:
+    return bdev.BaseDRBD.GetUsermodeHelper()
+  except errors.BlockDeviceError, err:
+    _Fail(str(err))
+
+
 def PowercycleNode(hypervisor_type):
   """Hard-powercycle the node.
 
 def PowercycleNode(hypervisor_type):
   """Hard-powercycle the node.
 
@@ -2398,6 +3320,11 @@ def PowercycleNode(hypervisor_type):
     pid = 0
   if pid > 0:
     return "Reboot scheduled in 5 seconds"
     pid = 0
   if pid > 0:
     return "Reboot scheduled in 5 seconds"
+  # ensure the child is running on ram
+  try:
+    utils.Mlockall()
+  except Exception: # pylint: disable=W0703
+    pass
   time.sleep(5)
   hyper.PowercycleNode()
 
   time.sleep(5)
   hyper.PowercycleNode()
 
@@ -2409,8 +3336,6 @@ class HooksRunner(object):
   on the master side.
 
   """
   on the master side.
 
   """
-  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
-
   def __init__(self, hooks_base_dir=None):
     """Constructor for hooks runner.
 
   def __init__(self, hooks_base_dir=None):
     """Constructor for hooks runner.
 
@@ -2421,57 +3346,9 @@ class HooksRunner(object):
     """
     if hooks_base_dir is None:
       hooks_base_dir = constants.HOOKS_BASE_DIR
     """
     if hooks_base_dir is None:
       hooks_base_dir = constants.HOOKS_BASE_DIR
-    self._BASE_DIR = hooks_base_dir
-
-  @staticmethod
-  def ExecHook(script, env):
-    """Exec one hook script.
-
-    @type script: str
-    @param script: the full path to the script
-    @type env: dict
-    @param env: the environment with which to exec the script
-    @rtype: tuple (success, message)
-    @return: a tuple of success and message, where success
-        indicates the succes of the operation, and message
-        which will contain the error details in case we
-        failed
-
-    """
-    # exec the process using subprocess and log the output
-    fdstdin = None
-    try:
-      fdstdin = open("/dev/null", "r")
-      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
-                               stderr=subprocess.STDOUT, close_fds=True,
-                               shell=False, cwd="/", env=env)
-      output = ""
-      try:
-        output = child.stdout.read(4096)
-        child.stdout.close()
-      except EnvironmentError, err:
-        output += "Hook script error: %s" % str(err)
-
-      while True:
-        try:
-          result = child.wait()
-          break
-        except EnvironmentError, err:
-          if err.errno == errno.EINTR:
-            continue
-          raise
-    finally:
-      # try not to leak fds
-      for fd in (fdstdin, ):
-        if fd is not None:
-          try:
-            fd.close()
-          except EnvironmentError, err:
-            # just log the error
-            #logging.exception("Error while closing fd %s", fd)
-            pass
-
-    return result == 0, utils.SafeEncode(output.strip())
+    # yeah, _BASE_DIR is not valid for attributes, we use it like a
+    # constant
+    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
 
   def RunHooks(self, hpath, phase, env):
     """Run the scripts in the hooks directory.
 
   def RunHooks(self, hpath, phase, env):
     """Run the scripts in the hooks directory.
@@ -2502,34 +3379,34 @@ class HooksRunner(object):
     else:
       _Fail("Unknown hooks phase '%s'", phase)
 
     else:
       _Fail("Unknown hooks phase '%s'", phase)
 
-    rr = []
-
     subdir = "%s-%s.d" % (hpath, suffix)
     subdir = "%s-%s.d" % (hpath, suffix)
-    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
-    try:
-      dir_contents = utils.ListVisibleFiles(dir_name)
-    except OSError:
-      # FIXME: must log output in case of failures
-      return rr
-
-    # we use the standard python sort order,
-    # so 00name is the recommended naming scheme
-    dir_contents.sort()
-    for relname in dir_contents:
-      fname = os.path.join(dir_name, relname)
-      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
-          self.RE_MASK.match(relname) is not None):
+    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
+
+    results = []
+
+    if not os.path.isdir(dir_name):
+      # for non-existing/non-dirs, we simply exit instead of logging a
+      # warning at every operation
+      return results
+
+    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
+
+    for (relname, relstatus, runresult)  in runparts_results:
+      if relstatus == constants.RUNPARTS_SKIP:
         rrval = constants.HKR_SKIP
         output = ""
         rrval = constants.HKR_SKIP
         output = ""
-      else:
-        result, output = self.ExecHook(fname, env)
-        if not result:
+      elif relstatus == constants.RUNPARTS_ERR:
+        rrval = constants.HKR_FAIL
+        output = "Hook script execution error: %s" % runresult
+      elif relstatus == constants.RUNPARTS_RUN:
+        if runresult.failed:
           rrval = constants.HKR_FAIL
         else:
           rrval = constants.HKR_SUCCESS
           rrval = constants.HKR_FAIL
         else:
           rrval = constants.HKR_SUCCESS
-      rr.append(("%s/%s" % (subdir, relname), rrval, output))
+        output = utils.SafeEncode(runresult.output.strip())
+      results.append(("%s/%s" % (subdir, relname), rrval, output))
 
 
-    return rr
+    return results
 
 
 class IAllocatorRunner(object):
 
 
 class IAllocatorRunner(object):
@@ -2539,7 +3416,8 @@ class IAllocatorRunner(object):
   the master side.
 
   """
   the master side.
 
   """
-  def Run(self, name, idata):
+  @staticmethod
+  def Run(name, idata):
     """Run an iallocator script.
 
     @type name: str
     """Run an iallocator script.
 
     @type name: str
@@ -2595,7 +3473,7 @@ class DevCacheManager(object):
     if dev_path.startswith(cls._DEV_PREFIX):
       dev_path = dev_path[len(cls._DEV_PREFIX):]
     dev_path = dev_path.replace("/", "_")
     if dev_path.startswith(cls._DEV_PREFIX):
       dev_path = dev_path[len(cls._DEV_PREFIX):]
     dev_path = dev_path.replace("/", "_")
-    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
+    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
     return fpath
 
   @classmethod
     return fpath
 
   @classmethod
@@ -2636,7 +3514,7 @@ class DevCacheManager(object):
   def RemoveCache(cls, dev_path):
     """Remove data for a dev_path.
 
   def RemoveCache(cls, dev_path):
     """Remove data for a dev_path.
 
-    This is just a wrapper over L{utils.RemoveFile} with a converted
+    This is just a wrapper over L{utils.io.RemoveFile} with a converted
     path name and logging.
 
     @type dev_path: str
     path name and logging.
 
     @type dev_path: str