Merge remote branch 'origin/devel-2.1'
authorMichael Hanselmann <hansmi@google.com>
Mon, 15 Mar 2010 12:54:53 +0000 (13:54 +0100)
committerMichael Hanselmann <hansmi@google.com>
Mon, 15 Mar 2010 14:25:50 +0000 (15:25 +0100)
* origin/devel-2.1: (116 commits)
  Implement replacing cluster certs and keys via “gnt-cluster renew-crypto”
  cli: Add helper function to stop and start whole cluster
  cfgupgrade: Use new bootstrap function for certs and keys
  bootstrap: Add new function to create cluster certs and keys
  utils.CreateBackup: Use human-readable instead of seconds since Epoch
  Add unittest for daemon-util
  Add support for non-Python unittests
  daemon-util: Generate daemon path in separate function
  daemon-util: Use “return” instead of “exit” in all functions
  daemon-util: Add function to start and stop all daemons
  ganeti.initd: Move all daemon names from init script to daemon-util
  ganeti.initd: Move code checking daemon exit code to daemon-util
  ganeti.initd: Move code checking config to daemon-util
  daemon-util: Require dashes in commands
  Improve ganeti.serializer unittests
  Add unittests for ganeti.errors
  Verify cluster certificates in LUVerifyCluster
  utils: Add function to extract X509 cert validity
  Add constant with cluster X509 certificates
  Release version 2.1.1
  ...

Conflicts:
lib/backend.py: Trivial
lib/bootstrap.py: Trivial
lib/constants.py: Trivial
lib/http/server.py: Trivial
lib/utils.py: RunCmd parameter “reset_env”
test/ganeti.utils_unittest.py: Trivial
tools/cfgupgrade: Trivial

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>

16 files changed:
1  2 
Makefile.am
NEWS
daemons/ganeti-rapi
lib/bootstrap.py
lib/cli.py
lib/cmdlib.py
lib/constants.py
lib/http/__init__.py
lib/http/server.py
lib/luxi.py
lib/objects.py
lib/opcodes.py
lib/utils.py
qa/ganeti-qa.py
qa/qa_cluster.py
test/ganeti.utils_unittest.py

diff --combined Makefile.am
@@@ -54,7 -54,7 +54,7 @@@ MAINTAINERCLEANFILES = 
        doc/news.rst
  
  maintainer-clean-local:
-       rm -rf doc/api doc/html
+       rm -rf doc/api doc/html doc/coverage
  
  CLEANFILES = \
        autotools/replace_vars.sed \
@@@ -148,8 -148,6 +148,8 @@@ docrst = 
        doc/admin.rst \
        doc/design-2.0.rst \
        doc/design-2.1.rst \
 +      doc/design-2.2.rst \
 +      doc/design-cluster-merger.rst \
        doc/devnotes.rst \
        doc/glossary.rst \
        doc/hooks.rst \
@@@ -235,6 -233,7 +235,7 @@@ dist_tools_SCRIPTS = 
        tools/burnin \
        tools/cfgshell \
        tools/cfgupgrade \
+       tools/cluster-merge \
        tools/lvmstrap
  
  pkglib_SCRIPTS = \
@@@ -247,6 -246,8 +248,8 @@@ EXTRA_DIST = 
        autotools/check-python-code \
        autotools/check-man \
        autotools/docbook-wrapper \
+       autotools/gen-coverage \
+       autotools/testrunner \
        $(RUN_IN_TEMPDIR) \
        daemons/daemon-util.in \
        daemons/ganeti-cleaner.in \
@@@ -313,23 -314,25 +316,26 @@@ TEST_FILES = 
        test/data/bdev-8.3-both.txt \
        test/data/bdev-disk.txt \
        test/data/bdev-net.txt \
+       test/data/cert1.pem \
        test/data/proc_drbd8.txt \
        test/data/proc_drbd80-emptyline.txt \
        test/data/proc_drbd83.txt
  
- dist_TESTS = \
+ python_tests = \
        test/ganeti.bdev_unittest.py \
        test/ganeti.cli_unittest.py \
        test/ganeti.cmdlib_unittest.py \
        test/ganeti.confd_client_unittest.py \
        test/ganeti.config_unittest.py \
        test/ganeti.constants_unittest.py \
+       test/ganeti.errors_unittest.py \
        test/ganeti.hooks_unittest.py \
        test/ganeti.http_unittest.py \
        test/ganeti.locking_unittest.py \
 +      test/ganeti.luxi_unittest.py \
        test/ganeti.mcpu_unittest.py \
        test/ganeti.objects_unittest.py \
+       test/ganeti.opcodes_unittest.py \
        test/ganeti.rapi.resources_unittest.py \
        test/ganeti.serializer_unittest.py \
        test/ganeti.ssh_unittest.py \
        test/docs_unittest.py \
        test/tempfile_fork_unittest.py
  
+ dist_TESTS = \
+       test/daemon-util_unittest.bash \
+       $(python_tests)
  nodist_TESTS =
  
  TESTS = $(dist_TESTS) $(nodist_TESTS)
  
+ # Environment for all tests
+ PLAIN_TESTS_ENVIRONMENT = \
+       PYTHONPATH=. TOP_SRCDIR=$(abs_top_srcdir) PYTHON=$(PYTHON) $(RUN_IN_TEMPDIR)
+ # Environment for tests run by automake
  TESTS_ENVIRONMENT = \
-       PYTHONPATH=. TOP_SRCDIR=$(abs_top_srcdir) \
-       $(RUN_IN_TEMPDIR) $(PYTHON)
+       $(PLAIN_TESTS_ENVIRONMENT) $(abs_top_srcdir)/autotools/testrunner
  
  all_python_code = \
        $(dist_sbin_SCRIPTS) \
        $(dist_tools_SCRIPTS) \
-       $(dist_TESTS) \
+       $(python_tests) \
        $(pkgpython_PYTHON) \
        $(hypervisor_PYTHON) \
        $(rapi_PYTHON) \
  
  srclink_files = \
        man/footer.sgml \
+       test/daemon-util_unittest.bash \
        $(all_python_code)
  
  check_python_code = \
@@@ -371,6 -383,8 +386,8 @@@ lint_python_code = 
        $(dist_tools_SCRIPTS) \
        $(BUILD_BASH_COMPLETION)
  
+ test/daemon-util_unittest.bash: daemons/daemon-util
  devel/upload: devel/upload.in $(REPLACE_VARS_SED)
        sed -f $(REPLACE_VARS_SED) < $< > $@
        chmod u+x $@
@@@ -442,12 -456,14 +459,14 @@@ lib/_autoconf.py: Makefile stamp-direct
          echo "VERSION_FULL = '$(VERSION_FULL)'"; \
          echo "LOCALSTATEDIR = '$(localstatedir)'"; \
          echo "SYSCONFDIR = '$(sysconfdir)'"; \
+         echo "SSH_CONFIG_DIR = '$(SSH_CONFIG_DIR)'"; \
          echo "EXPORT_DIR = '$(EXPORT_DIR)'"; \
          echo "OS_SEARCH_PATH = [$(OS_SEARCH_PATH)]"; \
          echo "XEN_BOOTLOADER = '$(XEN_BOOTLOADER)'"; \
          echo "XEN_KERNEL = '$(XEN_KERNEL)'"; \
          echo "XEN_INITRD = '$(XEN_INITRD)'"; \
          echo "FILE_STORAGE_DIR = '$(FILE_STORAGE_DIR)'"; \
+         echo "ENABLE_FILE_STORAGE = $(ENABLE_FILE_STORAGE)"; \
          echo "IALLOCATOR_SEARCH_PATH = [$(IALLOCATOR_SEARCH_PATH)]"; \
          echo "KVM_PATH = '$(KVM_PATH)'"; \
          echo "SOCAT_PATH = '$(SOCAT)'"; \
@@@ -537,4 -553,14 +556,14 @@@ TAGS: $(BUILT_SOURCES
          -path 'daemons/ganeti-*' -o -path 'tools/*' | \
          etags -
  
+ .PHONY: coverage
+ coverage: $(BUILT_SOURCES) $(python_tests)
+       set -e; \
+       mkdir -p doc/coverage; \
+       COVERAGE_FILE=$(CURDIR)/doc/coverage/data \
+       TEXT_COVERAGE=$(CURDIR)/doc/coverage/report.txt \
+       HTML_COVERAGE=$(CURDIR)/doc/coverage \
+       $(PLAIN_TESTS_ENVIRONMENT) $(abs_top_srcdir)/autotools/gen-coverage \
+       $(python_tests)
  # vim: set noet :
diff --combined NEWS
--- 1/NEWS
--- 2/NEWS
+++ b/NEWS
  News
  ====
  
 +Version 2.2.0
 +-------------
 +
 +- RAPI now requires a Content-Type header for requests with a body (e.g.
 +  ``PUT`` or ``POST``) which must be set to ``application/json`` (see
 +  RFC2616 (HTTP/1.1), section 7.2.1)
 +
  
+ Version 2.1.1
+ -------------
+ During the 2.1.0 long release candidate cycle, a lot of improvements and
+ changes have accumulated with were released later as 2.1.1.
+ Major changes
+ ~~~~~~~~~~~~~
+ The node evacuate command (``gnt-node evacuate``) was significantly
+ rewritten, and as such the IAllocator protocol was changed - a new
+ request type has been added. This unfortunate change during a stable
+ series is designed to improve performance of node evacuations; on
+ clusters with more than about five nodes and which are well-balanced,
+ evacuation should proceed in parallel for all instances of the node
+ being evacuated. As such, any existing IAllocator scripts need to be
+ updated, otherwise the above command will fail due to the unknown
+ request. The provided "dumb" allocator has not been updated; but the
+ ganeti-htools package supports the new protocol since version 0.2.4.
+ Another important change is increased validation of node and instance
+ names. This might create problems in special cases, if invalid host
+ names are being used.
+ Also, a new layer of hypervisor parameters has been added, that sits at
+ OS level between the cluster defaults and the instance ones. This allows
+ customisation of virtualization parameters depending on the installed
+ OS. For example instances with OS 'X' may have a different KVM kernel
+ (or any other parameter) than the cluster defaults. This is intended to
+ help managing a multiple OSes on the same cluster, without manual
+ modification of each instance's parameters.
+ A tool for merging clusters, ``cluster-merge``, has been added in the
+ tools sub-directory.
+ Bug fixes
+ ~~~~~~~~~
+ - Improved the int/float conversions that should make the code more
+   robust in face of errors from the node daemons
+ - Fixed the remove node code in case of internal configuration errors
+ - Fixed the node daemon behaviour in face of inconsistent queue
+   directory (e.g. read-only file-system where we can't open the files
+   read-write, etc.)
+ - Fixed the behaviour of gnt-node modify for master candidate demotion;
+   now it either aborts cleanly or, if given the new “auto_promote”
+   parameter, will automatically promote other nodes as needed
+ - Fixed compatibility with (unreleased yet) Python 2.6.5 that would
+   completely prevent Ganeti from working
+ - Fixed bug for instance export when not all disks were successfully
+   exported
+ - Fixed behaviour of node add when the new node is slow in starting up
+   the node daemon
+ - Fixed handling of signals in the LUXI client, which should improve
+   behaviour of command-line scripts
+ - Added checks for invalid node/instance names in the configuration (now
+   flagged during cluster verify)
+ - Fixed watcher behaviour for disk activation errors
+ - Fixed two potentially endless loops in http library, which led to the
+   RAPI daemon hanging and consuming 100% CPU in some cases
+ - Fixed bug in RAPI daemon related to hashed passwords
+ - Fixed bug for unintended qemu-level bridging of multi-NIC KVM
+   instances
+ - Enhanced compatibility with non-Debian OSes, but not using absolute
+   path in some commands and allowing customisation of the ssh
+   configuration directory
+ - Fixed possible future issue with new Python versions by abiding to the
+   proper use of ``__slots__`` attribute on classes
+ - Added checks that should prevent directory traversal attacks
+ - Many documentation fixes based on feedback from users
+ New features
+ ~~~~~~~~~~~~
+ - Added an “early_release” more for instance replace disks and node
+   evacuate, where we release locks earlier and thus allow higher
+   parallelism within the cluster
+ - Added watcher hooks, intended to allow the watcher to restart other
+   daemons (e.g. from the ganeti-nbma project), but they can be used of
+   course for any other purpose
+ - Added a compile-time disable for DRBD barriers, to increase
+   performance if the administrator trusts the power supply or the
+   storage system to not lose writes
+ - Added the option of using syslog for logging instead of, or in
+   addition to, Ganeti's own log files
+ - Removed boot restriction for paravirtual NICs for KVM, recent versions
+   can indeed boot from a paravirtual NIC
+ - Added a generic debug level for many operations; while this is not
+   used widely yet, it allows one to pass the debug value all the way to
+   the OS scripts
+ - Enhanced the hooks environment for instance moves (failovers,
+   migrations) where the primary/secondary nodes changed during the
+   operation, by adding {NEW,OLD}_{PRIMARY,SECONDARY} vars
+ - Enhanced data validations for many user-supplied values; one important
+   item is the restrictions imposed on instance and node names, which
+   might reject some (invalid) host names
+ - Add a configure-time option to disable file-based storage, if it's not
+   needed; this allows greater security separation between the master
+   node and the other nodes from the point of view of the inter-node RPC
+   protocol
+ - Added user notification in interactive tools if job is waiting in the
+   job queue or trying to acquire locks
+ - Added log messages when a job is waiting for locks
+ - Added filtering by node tags in instance operations which admit
+   multiple instances (start, stop, reboot, reinstall)
+ - Added a new tool for cluster mergers, ``cluster-merge``
+ - Parameters from command line which are of the form ``a=b,c=d`` can now
+   use backslash escapes to pass in values which contain commas,
+   e.g. ``a=b\\c,d=e`` where the 'a' parameter would get the value
+   ``b,c``
+ - For KVM, the instance name is the first parameter passed to KVM, so
+   that it's more visible in the process list
  Version 2.1.0
  -------------
  
@@@ -108,6 -215,15 +222,15 @@@ Detail
  - Improved burnin
  
  
+ Version 2.0.6
+ -------------
+ - Fix cleaner behaviour on nodes not in a cluster (Debian bug 568105)
+ - Fix a string formatting bug
+ - Improve safety of the code in some error paths
+ - Improve data validation in the master of values returned from nodes
  Version 2.0.5
  -------------
  
diff --combined daemons/ganeti-rapi
@@@ -52,14 -52,13 +52,14 @@@ class RemoteApiRequestContext(object)
      self.handler = None
      self.handler_fn = None
      self.handler_access = None
 +    self.body_data = None
  
  
  class JsonErrorRequestExecutor(http.server.HttpServerRequestExecutor):
    """Custom Request Executor class that formats HTTP errors in JSON.
  
    """
 -  error_content_type = "application/json"
 +  error_content_type = http.HTTP_APP_JSON
  
    def _FormatErrorMessage(self, values):
      """Formats the body of an error message.
@@@ -117,22 -116,20 +117,23 @@@ class RemoteApiHttpServer(http.auth.Htt
        if ctx.handler_access is None:
          raise AssertionError("Permissions definition missing")
  
 +      # This is only made available in HandleRequest
 +      ctx.body_data = None
 +
        req.private = ctx
  
+     # Check for expected attributes
+     assert req.private.handler
+     assert req.private.handler_fn
+     assert req.private.handler_access is not None
      return req.private
  
-   def GetAuthRealm(self, req):
-     """Override the auth realm for queries.
+   def AuthenticationRequired(self, req):
+     """Determine whether authentication is required.
  
      """
-     ctx = self._GetRequestContext(req)
-     if ctx.handler_access:
-       return self.AUTH_REALM
-     else:
-       return None
+     return bool(self._GetRequestContext(req).handler_access)
  
    def Authenticate(self, req, username, password):
      """Checks whether a user can access a resource.
      """
      ctx = self._GetRequestContext(req)
  
 +    # Deserialize request parameters
 +    if req.request_body:
 +      # RFC2616, 7.2.1: Any HTTP/1.1 message containing an entity-body SHOULD
 +      # include a Content-Type header field defining the media type of that
 +      # body. [...] If the media type remains unknown, the recipient SHOULD
 +      # treat it as type "application/octet-stream".
 +      req_content_type = req.request_headers.get(http.HTTP_CONTENT_TYPE,
 +                                                 http.HTTP_APP_OCTET_STREAM)
 +      if req_content_type.lower() != http.HTTP_APP_JSON.lower():
 +        raise http.HttpUnsupportedMediaType()
 +
 +      try:
 +        ctx.body_data = serializer.LoadJson(req.request_body)
 +      except Exception:
 +        raise http.HttpBadRequest(message="Unable to parse JSON data")
 +    else:
 +      ctx.body_data = None
 +
      try:
        result = ctx.handler_fn()
 -      sn = ctx.handler.getSerialNumber()
 -      if sn:
 -        req.response_headers[http.HTTP_ETAG] = str(sn)
      except luxi.TimeoutError:
        raise http.HttpGatewayTimeout()
      except luxi.ProtocolError, err:
        logging.exception("Error while handling the %s request", method)
        raise
  
 -    return result
 +    req.resp_headers[http.HTTP_CONTENT_TYPE] = http.HTTP_APP_JSON
 +
 +    return serializer.DumpJson(result)
  
  
  def CheckRapi(options, args):
@@@ -244,9 -224,7 +245,9 @@@ def main()
  
    dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
    dirs.append((constants.LOG_OS_DIR, 0750))
 -  daemon.GenericMain(constants.RAPI, parser, dirs, CheckRapi, ExecRapi)
 +  daemon.GenericMain(constants.RAPI, parser, dirs, CheckRapi, ExecRapi,
 +                     default_ssl_cert=constants.RAPI_CERT_FILE,
 +                     default_ssl_key=constants.RAPI_CERT_FILE)
  
  
  if __name__ == "__main__":
diff --combined lib/bootstrap.py
@@@ -27,6 -27,7 +27,6 @@@ import o
  import os.path
  import re
  import logging
 -import tempfile
  import time
  
  from ganeti import rpc
@@@ -65,6 -66,40 +65,6 @@@ def _InitSSHSetup()
    utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
  
  
 -def GenerateSelfSignedSslCert(file_name, validity=(365 * 5)):
 -  """Generates a self-signed SSL certificate.
 -
 -  @type file_name: str
 -  @param file_name: Path to output file
 -  @type validity: int
 -  @param validity: Validity for certificate in days
 -
 -  """
 -  (fd, tmp_file_name) = tempfile.mkstemp(dir=os.path.dirname(file_name))
 -  try:
 -    try:
 -      # Set permissions before writing key
 -      os.chmod(tmp_file_name, 0600)
 -
 -      result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
 -                             "-days", str(validity), "-nodes", "-x509",
 -                             "-keyout", tmp_file_name, "-out", tmp_file_name,
 -                             "-batch"])
 -      if result.failed:
 -        raise errors.OpExecError("Could not generate SSL certificate, command"
 -                                 " %s had exitcode %s and error message %s" %
 -                                 (result.cmd, result.exit_code, result.output))
 -
 -      # Make read-only
 -      os.chmod(tmp_file_name, 0400)
 -
 -      os.rename(tmp_file_name, file_name)
 -    finally:
 -      utils.RemoveFile(tmp_file_name)
 -  finally:
 -    os.close(fd)
 -
 -
  def GenerateHmacKey(file_name):
    """Writes a new HMAC key.
  
    @param file_name: Path to output file
  
    """
-   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400)
+   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
+                   backup=True)
+ def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_hmac_key,
+                           rapi_cert_pem=None):
+   """Updates the cluster certificates, keys and secrets.
+   @type new_cluster_cert: bool
+   @param new_cluster_cert: Whether to generate a new cluster certificate
+   @type new_rapi_cert: bool
+   @param new_rapi_cert: Whether to generate a new RAPI certificate
+   @type new_hmac_key: bool
+   @param new_hmac_key: Whether to generate a new HMAC key
+   @type rapi_cert_pem: string
+   @param rapi_cert_pem: New RAPI certificate in PEM format
+   """
+   # SSL certificate
+   cluster_cert_exists = os.path.exists(constants.SSL_CERT_FILE)
+   if new_cluster_cert or not cluster_cert_exists:
+     if cluster_cert_exists:
+       utils.CreateBackup(constants.SSL_CERT_FILE)
+     logging.debug("Generating new cluster certificate at %s",
+                   constants.SSL_CERT_FILE)
 -    GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
++    utils.GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
+   # HMAC key
+   if new_hmac_key or not os.path.exists(constants.HMAC_CLUSTER_KEY):
+     logging.debug("Writing new HMAC key to %s", constants.HMAC_CLUSTER_KEY)
+     GenerateHmacKey(constants.HMAC_CLUSTER_KEY)
+   # RAPI
+   rapi_cert_exists = os.path.exists(constants.RAPI_CERT_FILE)
+   if rapi_cert_pem:
+     # Assume rapi_pem contains a valid PEM-formatted certificate and key
+     logging.debug("Writing RAPI certificate at %s",
+                   constants.RAPI_CERT_FILE)
+     utils.WriteFile(constants.RAPI_CERT_FILE, data=rapi_cert_pem, backup=True)
+   elif new_rapi_cert or not rapi_cert_exists:
+     if rapi_cert_exists:
+       utils.CreateBackup(constants.RAPI_CERT_FILE)
+     logging.debug("Generating new RAPI certificate at %s",
+                   constants.RAPI_CERT_FILE)
 -    GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
++    utils.GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
  
  
  def _InitGanetiServerSetup(master_name):
    the cluster and also generates the SSL certificate.
  
    """
-   utils.GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
-   # Don't overwrite existing file
-   if not os.path.exists(constants.RAPI_CERT_FILE):
-     utils.GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
-   if not os.path.exists(constants.HMAC_CLUSTER_KEY):
-     GenerateHmacKey(constants.HMAC_CLUSTER_KEY)
+   # Generate cluster secrets
+   GenerateClusterCrypto(True, False, False)
  
    result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
    if result.failed:
                               " had exitcode %s and error %s" %
                               (result.cmd, result.exit_code, result.output))
  
-   # Wait for node daemon to become responsive
+   _WaitForNodeDaemon(master_name)
+ def _WaitForNodeDaemon(node_name):
+   """Wait for node daemon to become responsive.
+   """
    def _CheckNodeDaemon():
-     result = rpc.RpcRunner.call_version([master_name])[master_name]
+     result = rpc.RpcRunner.call_version([node_name])[node_name]
      if result.fail_msg:
        raise utils.RetryAgain()
  
    try:
      utils.Retry(_CheckNodeDaemon, 1.0, 10.0)
    except utils.RetryTimeout:
-     raise errors.OpExecError("Node daemon didn't answer queries within"
-                              " 10 seconds")
+     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
+                              " 10 seconds" % node_name)
  
  def InitCluster(cluster_name, mac_prefix,
                  master_netdev, file_storage_dir, candidate_pool_size,
                                 " belong to this host. Aborting." %
                                 hostname.ip, errors.ECODE_ENVIRON)
  
-   clustername = utils.GetHostInfo(cluster_name)
+   clustername = utils.GetHostInfo(utils.HostInfo.NormalizeName(cluster_name))
  
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
                     timeout=5):
@@@ -391,6 -475,8 +440,8 @@@ def SetupNodeDaemon(cluster_name, node
                               " output: %s" %
                               (node, result.fail_reason, result.output))
  
+   _WaitForNodeDaemon(node)
  
  def MasterFailover(no_voting=False):
    """Failover the master node.
diff --combined lib/cli.py
@@@ -36,6 -36,7 +36,7 @@@ from ganeti import opcode
  from ganeti import luxi
  from ganeti import ssconf
  from ganeti import rpc
+ from ganeti import ssh
  
  from optparse import (OptionParser, TitledHelpFormatter,
                        Option, OptionValueError)
@@@ -45,6 -46,7 +46,7 @@@ __all__ = 
    # Command line options
    "ALLOCATABLE_OPT",
    "ALL_OPT",
+   "AUTO_PROMOTE_OPT",
    "AUTO_REPLACE_OPT",
    "BACKEND_OPT",
    "CLEANUP_OPT",
    "IALLOCATOR_OPT",
    "IGNORE_CONSIST_OPT",
    "IGNORE_FAILURES_OPT",
 +  "IGNORE_REMOVE_FAILURES_OPT",
    "IGNORE_SECONDARIES_OPT",
    "IGNORE_SIZE_OPT",
    "MAC_PREFIX_OPT",
    "MASTER_NETDEV_OPT",
    "MC_OPT",
    "NET_OPT",
+   "NEW_CLUSTER_CERT_OPT",
+   "NEW_HMAC_KEY_OPT",
+   "NEW_RAPI_CERT_OPT",
    "NEW_SECONDARY_OPT",
    "NIC_PARAMS_OPT",
    "NODE_LIST_OPT",
    "OFFLINE_OPT",
    "OS_OPT",
    "OS_SIZE_OPT",
+   "RAPI_CERT_OPT",
    "READD_OPT",
    "REBOOT_TYPE_OPT",
 +  "REMOVE_INSTANCE_OPT",
    "SECONDARY_IP_OPT",
    "SELECT_OS_OPT",
    "SEP_OPT",
    "JobExecutor",
    "JobSubmittedException",
    "ParseTimespec",
+   "RunWhileClusterStopped",
    "SubmitOpCode",
    "SubmitOrSend",
    "UsesRPC",
    "ARGS_NONE",
    "ARGS_ONE_INSTANCE",
    "ARGS_ONE_NODE",
+   "ARGS_ONE_OS",
    "ArgChoice",
    "ArgCommand",
    "ArgFile",
    "ArgInstance",
    "ArgJobId",
    "ArgNode",
+   "ArgOs",
    "ArgSuggest",
    "ArgUnknown",
    "OPT_COMPL_INST_ADD_NODES",
@@@ -248,11 -255,18 +257,18 @@@ class ArgHost(_Argument)
    """
  
  
+ class ArgOs(_Argument):
+   """OS argument.
+   """
  ARGS_NONE = []
  ARGS_MANY_INSTANCES = [ArgInstance()]
  ARGS_MANY_NODES = [ArgNode()]
  ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
  ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
+ ARGS_ONE_OS = [ArgOs(min=1, max=1)]
  
  
  def _ExtractTagsObject(opts, args):
@@@ -686,18 -700,6 +702,18 @@@ IGNORE_FAILURES_OPT = cli_option("--ign
                                   " configuration even if there are failures"
                                   " during the removal process")
  
 +IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
 +                                        dest="ignore_remove_failures",
 +                                        action="store_true", default=False,
 +                                        help="Remove the instance from the"
 +                                        " cluster configuration even if there"
 +                                        " are failures during the removal"
 +                                        " process")
 +
 +REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
 +                                 action="store_true", default=False,
 +                                 help="Remove the instance from the cluster")
 +
  NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
                                 help="Specifies the new secondary node",
                                 metavar="NODE", default=None,
@@@ -713,6 -715,11 +729,11 @@@ ON_SECONDARY_OPT = cli_option("-s", "--
                                help="Replace the disk(s) on the secondary"
                                " node (only for the drbd template)")
  
+ AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
+                               default=False, action="store_true",
+                               help="Lock all nodes and auto-promote as needed"
+                               " to MC status")
  AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
                                default=False, action="store_true",
                                help="Automatically replace faulty disks"
@@@ -857,6 -864,24 +878,24 @@@ EARLY_RELEASE_OPT = cli_option("--early
                                 help="Release the locks on the secondary"
                                 " node(s) early")
  
+ NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
+                                   dest="new_cluster_cert",
+                                   default=False, action="store_true",
+                                   help="Generate a new cluster certificate")
+ RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
+                            default=None,
+                            help="File containing new RAPI certificate")
+ NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
+                                default=None, action="store_true",
+                                help=("Generate a new self-signed RAPI"
+                                      " certificate"))
+ NEW_HMAC_KEY_OPT = cli_option("--new-hmac-key", dest="new_hmac_key",
+                               default=False, action="store_true",
+                               help="Create a new HMAC key")
  
  def _ParseArgs(argv, commands, aliases):
    """Parser for the command line arguments.
@@@ -1146,12 -1171,28 +1185,28 @@@ def PollJob(job_id, cl=None, feedback_f
    prev_job_info = None
    prev_logmsg_serial = None
  
+   status = None
+   notified_queued = False
+   notified_waitlock = False
    while True:
-     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
-                                  prev_logmsg_serial)
+     result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                      prev_logmsg_serial)
      if not result:
        # job not found, go away!
        raise errors.JobLost("Job with id %s lost" % job_id)
+     elif result == constants.JOB_NOTCHANGED:
+       if status is not None and not callable(feedback_fn):
+         if status == constants.JOB_STATUS_QUEUED and not notified_queued:
+           ToStderr("Job %s is waiting in queue", job_id)
+           notified_queued = True
+         elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
+           ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+           notified_waitlock = True
+       # Wait again
+       continue
  
      # Split result, a tuple of (field values, log entries)
      (job_info, log_entries) = result
@@@ -1342,6 -1383,8 +1397,6 @@@ def FormatError(err)
      obuf.write("Parameter Error: %s" % msg)
    elif isinstance(err, errors.ParameterError):
      obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
 -  elif isinstance(err, errors.GenericError):
 -    obuf.write("Unhandled Ganeti error: %s" % msg)
    elif isinstance(err, luxi.NoMasterError):
      obuf.write("Cannot communicate with the master daemon.\nIs it running"
                 " and listening for connections?")
    elif isinstance(err, luxi.ProtocolError):
      obuf.write("Unhandled protocol error while talking to the master daemon:\n"
                 "%s" % msg)
 +  elif isinstance(err, errors.GenericError):
 +    obuf.write("Unhandled Ganeti error: %s" % msg)
    elif isinstance(err, JobSubmittedException):
      obuf.write("JobID: %s\n" % err.args[0])
      retcode = 0
@@@ -1532,6 -1573,127 +1587,127 @@@ def GenericInstanceCreate(mode, opts, a
    return 0
  
  
+ class _RunWhileClusterStoppedHelper:
+   """Helper class for L{RunWhileClusterStopped} to simplify state management
+   """
+   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
+     """Initializes this class.
+     @type feedback_fn: callable
+     @param feedback_fn: Feedback function
+     @type cluster_name: string
+     @param cluster_name: Cluster name
+     @type master_node: string
+     @param master_node Master node name
+     @type online_nodes: list
+     @param online_nodes: List of names of online nodes
+     """
+     self.feedback_fn = feedback_fn
+     self.cluster_name = cluster_name
+     self.master_node = master_node
+     self.online_nodes = online_nodes
+     self.ssh = ssh.SshRunner(self.cluster_name)
+     self.nonmaster_nodes = [name for name in online_nodes
+                             if name != master_node]
+     assert self.master_node not in self.nonmaster_nodes
+   def _RunCmd(self, node_name, cmd):
+     """Runs a command on the local or a remote machine.
+     @type node_name: string
+     @param node_name: Machine name
+     @type cmd: list
+     @param cmd: Command
+     """
+     if node_name is None or node_name == self.master_node:
+       # No need to use SSH
+       result = utils.RunCmd(cmd)
+     else:
+       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
+     if result.failed:
+       errmsg = ["Failed to run command %s" % result.cmd]
+       if node_name:
+         errmsg.append("on node %s" % node_name)
+       errmsg.append(": exitcode %s and error %s" %
+                     (result.exit_code, result.output))
+       raise errors.OpExecError(" ".join(errmsg))
+   def Call(self, fn, *args):
+     """Call function while all daemons are stopped.
+     @type fn: callable
+     @param fn: Function to be called
+     """
+     # Pause watcher by acquiring an exclusive lock on watcher state file
+     self.feedback_fn("Blocking watcher")
+     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
+     try:
+       # TODO: Currently, this just blocks. There's no timeout.
+       # TODO: Should it be a shared lock?
+       watcher_block.Exclusive(blocking=True)
+       # Stop master daemons, so that no new jobs can come in and all running
+       # ones are finished
+       self.feedback_fn("Stopping master daemons")
+       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
+       try:
+         # Stop daemons on all nodes
+         for node_name in self.online_nodes:
+           self.feedback_fn("Stopping daemons on %s" % node_name)
+           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
+         # All daemons are shut down now
+         try:
+           return fn(self, *args)
+         except Exception:
+           logging.exception("Caught exception")
+           raise
+       finally:
+         # Start cluster again, master node last
+         for node_name in self.nonmaster_nodes + [self.master_node]:
+           self.feedback_fn("Starting daemons on %s" % node_name)
+           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
+     finally:
+       # Resume watcher
+       watcher_block.Close()
+ def RunWhileClusterStopped(feedback_fn, fn, *args):
+   """Calls a function while all cluster daemons are stopped.
+   @type feedback_fn: callable
+   @param feedback_fn: Feedback function
+   @type fn: callable
+   @param fn: Function to be called when daemons are stopped
+   """
+   feedback_fn("Gathering cluster information")
+   # This ensures we're running on the master daemon
+   cl = GetClient()
+   (cluster_name, master_node) = \
+     cl.QueryConfigValues(["cluster_name", "master_node"])
+   online_nodes = GetOnlineNodes([], cl=cl)
+   # Don't keep a reference to the client. The master daemon will go away.
+   del cl
+   assert master_node in online_nodes
+   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
+                                        online_nodes).Call(fn, *args)
  def GenerateTable(headers, fields, separator, data,
                    numfields=None, unitfields=None,
                    units=None):
@@@ -1773,7 -1935,7 +1949,7 @@@ class JobExecutor(object)
    GetResults() calls.
  
    """
-   def __init__(self, cl=None, verbose=True, opts=None):
+   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
      self.queue = []
      if cl is None:
        cl = GetClient()
      self.verbose = verbose
      self.jobs = []
      self.opts = opts
+     self.feedback_fn = feedback_fn
  
    def QueueJob(self, name, *ops):
      """Record a job for later submit.
  
      """
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
-     for ((status, data), (name, _)) in zip(results, self.queue):
-       self.jobs.append((status, data, name))
+     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
+                                                             self.queue)):
+       self.jobs.append((idx, status, data, name))
+   def _ChooseJob(self):
+     """Choose a non-waiting/queued job to poll next.
+     """
+     assert self.jobs, "_ChooseJob called with empty job list"
+     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
+     assert result
+     for job_data, status in zip(self.jobs, result):
+       if status[0] in (constants.JOB_STATUS_QUEUED,
+                     constants.JOB_STATUS_WAITLOCK,
+                     constants.JOB_STATUS_CANCELING):
+         # job is still waiting
+         continue
+       # good candidate found
+       self.jobs.remove(job_data)
+       return job_data
+     # no job found
+     return self.jobs.pop(0)
  
    def GetResults(self):
      """Wait for and return the results of all jobs.
        self.SubmitPending()
      results = []
      if self.verbose:
-       ok_jobs = [row[1] for row in self.jobs if row[0]]
+       ok_jobs = [row[2] for row in self.jobs if row[1]]
        if ok_jobs:
          ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
-     for submit_status, jid, name in self.jobs:
-       if not submit_status:
-         ToStderr("Failed to submit job for %s: %s", name, jid)
-         results.append((False, jid))
-         continue
-       if self.verbose:
-         ToStdout("Waiting for job %s for %s...", jid, name)
+     # first, remove any non-submitted jobs
+     self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+     for idx, _, jid, name in failures:
+       ToStderr("Failed to submit job for %s: %s", name, jid)
+       results.append((idx, False, jid))
+     while self.jobs:
+       (idx, _, jid, name) = self._ChooseJob()
+       ToStdout("Waiting for job %s for %s...", jid, name)
        try:
-         job_result = PollJob(jid, cl=self.cl)
+         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
          success = True
        except (errors.GenericError, luxi.ProtocolError), err:
          _, job_result = FormatError(err)
          # the error message will always be shown, verbose or not
          ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
  
-       results.append((success, job_result))
+       results.append((idx, success, job_result))
+     # sort based on the index, then drop it
+     results.sort()
+     results = [i[1:] for i in results]
      return results
  
    def WaitOrShow(self, wait):
diff --combined lib/cmdlib.py
@@@ -33,6 -33,7 +33,7 @@@ import r
  import platform
  import logging
  import copy
+ import OpenSSL
  
  from ganeti import ssh
  from ganeti import utils
@@@ -848,6 -849,13 +849,13 @@@ def _FindFaultyInstanceDisks(cfg, rpc, 
    return faulty
  
  
+ def _FormatTimestamp(secs):
+   """Formats a Unix timestamp with the local timezone.
+   """
+   return time.strftime("%F %T %Z", time.gmtime(secs))
  class LUPostInitCluster(LogicalUnit):
    """Logical unit for running hooks after cluster initialization.
  
@@@ -939,6 -947,66 +947,66 @@@ class LUDestroyCluster(LogicalUnit)
      return master
  
  
+ def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
+                             warn_days=constants.SSL_CERT_EXPIRATION_WARN,
+                             error_days=constants.SSL_CERT_EXPIRATION_ERROR):
+   """Verifies certificate details for LUVerifyCluster.
+   """
+   if expired:
+     msg = "Certificate %s is expired" % filename
+     if not_before is not None and not_after is not None:
+       msg += (" (valid from %s to %s)" %
+               (_FormatTimestamp(not_before),
+                _FormatTimestamp(not_after)))
+     elif not_before is not None:
+       msg += " (valid from %s)" % _FormatTimestamp(not_before)
+     elif not_after is not None:
+       msg += " (valid until %s)" % _FormatTimestamp(not_after)
+     return (LUVerifyCluster.ETYPE_ERROR, msg)
+   elif not_before is not None and not_before > now:
+     return (LUVerifyCluster.ETYPE_WARNING,
+             "Certificate %s not yet valid (valid from %s)" %
+             (filename, _FormatTimestamp(not_before)))
+   elif not_after is not None:
+     remaining_days = int((not_after - now) / (24 * 3600))
+     msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
+     if remaining_days <= error_days:
+       return (LUVerifyCluster.ETYPE_ERROR, msg)
+     if remaining_days <= warn_days:
+       return (LUVerifyCluster.ETYPE_WARNING, msg)
+   return (None, None)
+ def _VerifyCertificate(filename):
+   """Verifies a certificate for LUVerifyCluster.
+   @type filename: string
+   @param filename: Path to PEM file
+   """
+   try:
+     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                            utils.ReadFile(filename))
+   except Exception, err: # pylint: disable-msg=W0703
+     return (LUVerifyCluster.ETYPE_ERROR,
+             "Failed to load X509 certificate %s: %s" % (filename, err))
+   # Depending on the pyOpenSSL version, this can just return (None, None)
+   (not_before, not_after) = utils.GetX509CertValidity(cert)
+   return _VerifyCertificateInner(filename, cert.has_expired(),
+                                  not_before, not_after, time.time())
  class LUVerifyCluster(LogicalUnit):
    """Verifies the cluster status.
  
    TINSTANCE = "instance"
  
    ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
+   ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
    EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
    EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
    EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
      for msg in self.cfg.VerifyConfig():
        _ErrorIf(True, self.ECLUSTERCFG, None, msg)
  
+     # Check the cluster certificates
+     for cert_filename in constants.ALL_CERT_FILES:
+       (errcode, msg) = _VerifyCertificate(cert_filename)
+       _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
      vg_name = self.cfg.GetVGName()
      hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
      nodelist = utils.NiceSort(self.cfg.GetNodeList())
      master_files = [constants.CLUSTER_CONF_FILE]
  
      file_names = ssconf.SimpleStore().GetFileList()
-     file_names.append(constants.SSL_CERT_FILE)
-     file_names.append(constants.RAPI_CERT_FILE)
+     file_names.extend(constants.ALL_CERT_FILES)
      file_names.extend(master_files)
  
      local_checksums = utils.FingerprintFiles(file_names)
        idata = nresult.get(constants.NV_INSTANCELIST, None)
        test = not isinstance(idata, list)
        _ErrorIf(test, self.ENODEHV, node,
-                "rpc call to node failed (instancelist)")
+                "rpc call to node failed (instancelist): %s",
+                utils.SafeEncode(str(idata)))
        if test:
          continue
  
        try:
          ntime_merged = utils.MergeTime(ntime)
        except (ValueError, TypeError):
-         _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
+         _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
  
        if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
-         ntime_diff = abs(nvinfo_starttime - ntime_merged)
+         ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
        elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
-         ntime_diff = abs(ntime_merged - nvinfo_endtime)
+         ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
        else:
          ntime_diff = None
  
        _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
-                "Node time diverges by at least %0.1fs from master node time",
+                "Node time diverges by at least %s from master node time",
                 ntime_diff)
  
        if ntime_diff is not None:
          _ErrorIf(snode not in node_info and snode not in n_offline,
                   self.ENODERPC, snode,
                   "instance %s, connection to secondary node"
-                  "failed", instance)
+                  " failed", instance)
  
          if snode in node_info:
            node_info[snode]['sinst'].append(instance)
            if test:
              output = indent_re.sub('      ', output)
              feedback_fn("%s" % output)
-             lu_result = 1
+             lu_result = 0
  
        return lu_result
  
@@@ -2064,6 -2138,25 +2138,25 @@@ class LUSetClusterParams(LogicalUnit)
          else:
            self.new_hvparams[hv_name].update(hv_dict)
  
+     # os hypervisor parameters
+     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
+     if self.op.os_hvp:
+       if not isinstance(self.op.os_hvp, dict):
+         raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
+                                    errors.ECODE_INVAL)
+       for os_name, hvs in self.op.os_hvp.items():
+         if not isinstance(hvs, dict):
+           raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
+                                       " input"), errors.ECODE_INVAL)
+         if os_name not in self.new_os_hvp:
+           self.new_os_hvp[os_name] = hvs
+         else:
+           for hv_name, hv_dict in hvs.items():
+             if hv_name not in self.new_os_hvp[os_name]:
+               self.new_os_hvp[os_name][hv_name] = hv_dict
+             else:
+               self.new_os_hvp[os_name][hv_name].update(hv_dict)
      if self.op.enabled_hypervisors is not None:
        self.hv_list = self.op.enabled_hypervisors
        if not self.hv_list:
            hv_class.CheckParameterSyntax(hv_params)
            _CheckHVParams(self, node_list, hv_name, hv_params)
  
+     if self.op.os_hvp:
+       # no need to check any newly-enabled hypervisors, since the
+       # defaults have already been checked in the above code-block
+       for os_name, os_hvp in self.new_os_hvp.items():
+         for hv_name, hv_params in os_hvp.items():
+           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+           # we need to fill in the new os_hvp on top of the actual hv_p
+           cluster_defaults = self.new_hvparams.get(hv_name, {})
+           new_osp = objects.FillDict(cluster_defaults, hv_params)
+           hv_class = hypervisor.GetHypervisor(hv_name)
+           hv_class.CheckParameterSyntax(new_osp)
+           _CheckHVParams(self, node_list, hv_name, new_osp)
    def Exec(self, feedback_fn):
      """Change the parameters of the cluster.
  
                      " state, not changing")
      if self.op.hvparams:
        self.cluster.hvparams = self.new_hvparams
+     if self.op.os_hvp:
+       self.cluster.os_hvp = self.new_os_hvp
      if self.op.enabled_hypervisors is not None:
        self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
      if self.op.beparams:
@@@ -2134,7 -2243,7 +2243,7 @@@ def _RedistributeAncillaryFiles(lu, add
    """
    # 1. Gather target nodes
    myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
-   dist_nodes = lu.cfg.GetNodeList()
+   dist_nodes = lu.cfg.GetOnlineNodeList()
    if additional_nodes is not None:
      dist_nodes.extend(additional_nodes)
    if myself.name in dist_nodes:
@@@ -2907,6 -3016,10 +3016,10 @@@ class LUAddNode(LogicalUnit)
    HTYPE = constants.HTYPE_NODE
    _OP_REQP = ["node_name"]
  
+   def CheckArguments(self):
+     # validate/normalize the node name
+     self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
    def BuildHooksEnv(self):
      """Build hooks env.
  
@@@ -3133,6 -3246,7 +3246,7 @@@ class LUSetNodeParams(LogicalUnit)
      _CheckBooleanOpField(self.op, 'master_candidate')
      _CheckBooleanOpField(self.op, 'offline')
      _CheckBooleanOpField(self.op, 'drained')
+     _CheckBooleanOpField(self.op, 'auto_promote')
      all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
      if all_mods.count(None) == 3:
        raise errors.OpPrereqError("Please pass at least one modification",
                                   " state at the same time",
                                   errors.ECODE_INVAL)
  
+     # Boolean value that tells us whether we're offlining or draining the node
+     self.offline_or_drain = (self.op.offline == True or
+                              self.op.drained == True)
+     self.deoffline_or_drain = (self.op.offline == False or
+                                self.op.drained == False)
+     self.might_demote = (self.op.master_candidate == False or
+                          self.offline_or_drain)
+     self.lock_all = self.op.auto_promote and self.might_demote
    def ExpandNames(self):
-     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+     if self.lock_all:
+       self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
+     else:
+       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
  
    def BuildHooksEnv(self):
      """Build hooks env.
                                     " only via masterfailover",
                                     errors.ECODE_INVAL)
  
-     # Boolean value that tells us whether we're offlining or draining the node
-     offline_or_drain = self.op.offline == True or self.op.drained == True
-     deoffline_or_drain = self.op.offline == False or self.op.drained == False
-     if (node.master_candidate and
-         (self.op.master_candidate == False or offline_or_drain)):
-       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
-       mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
-       if mc_now <= cp_size:
-         msg = ("Not enough master candidates (desired"
-                " %d, new value will be %d)" % (cp_size, mc_now-1))
-         # Only allow forcing the operation if it's an offline/drain operation,
-         # and we could not possibly promote more nodes.
-         # FIXME: this can still lead to issues if in any way another node which
-         # could be promoted appears in the meantime.
-         if self.op.force and offline_or_drain and mc_should == mc_max:
-           self.LogWarning(msg)
-         else:
-           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+     if node.master_candidate and self.might_demote and not self.lock_all:
+       assert not self.op.auto_promote, "auto-promote set but lock_all not"
+       # check if after removing the current node, we're missing master
+       # candidates
+       (mc_remaining, mc_should, _) = \
+           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
+       if mc_remaining != mc_should:
+         raise errors.OpPrereqError("Not enough master candidates, please"
+                                    " pass auto_promote to allow promotion",
+                                    errors.ECODE_INVAL)
  
      if (self.op.master_candidate == True and
          ((node.offline and not self.op.offline == False) or
                                   errors.ECODE_INVAL)
  
      # If we're being deofflined/drained, we'll MC ourself if needed
-     if (deoffline_or_drain and not offline_or_drain and not
+     if (self.deoffline_or_drain and not self.offline_or_drain and not
          self.op.master_candidate == True and not node.master_candidate):
        self.op.master_candidate = _DecideSelfPromotion(self)
        if self.op.master_candidate:
            node.offline = False
            result.append(("offline", "clear offline status due to drain"))
  
+     # we locked all nodes, we adjust the CP before updating this node
+     if self.lock_all:
+       _AdjustCandidatePool(self, [node.name])
      # this will trigger configuration file update, if needed
      self.cfg.Update(node, feedback_fn)
      # this will trigger job queue propagation or cleanup
      if changed_mc:
        self.context.ReaddNode(node)
@@@ -3332,6 -3457,15 +3457,15 @@@ class LUQueryClusterInfo(NoHooksLU)
  
      """
      cluster = self.cfg.GetClusterInfo()
+     os_hvp = {}
+     # Filter just for enabled hypervisors
+     for os_name, hv_dict in cluster.os_hvp.items():
+       os_hvp[os_name] = {}
+       for hv_name, hv_params in hv_dict.items():
+         if hv_name in cluster.enabled_hypervisors:
+           os_hvp[os_name][hv_name] = hv_params
      result = {
        "software_version": constants.RELEASE_VERSION,
        "protocol_version": constants.PROTOCOL_VERSION,
        "enabled_hypervisors": cluster.enabled_hypervisors,
        "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
                          for hypervisor_name in cluster.enabled_hypervisors]),
+       "os_hvp": os_hvp,
        "beparams": cluster.beparams,
        "nicparams": cluster.nicparams,
        "candidate_pool_size": cluster.candidate_pool_size,
@@@ -3396,7 -3531,7 +3531,7 @@@ class LUQueryConfigValues(NoHooksLU)
        elif field == "drain_flag":
          entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
        elif field == "watcher_pause":
-         return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+         entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
        else:
          raise errors.ParameterError(field)
        values.append(entry)
@@@ -4247,29 -4382,18 +4382,29 @@@ class LURemoveInstance(LogicalUnit)
                                   " node %s: %s" %
                                   (instance.name, instance.primary_node, msg))
  
 -    logging.info("Removing block devices for instance %s", instance.name)
 +    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
  
 -    if not _RemoveDisks(self, instance):
 -      if self.op.ignore_failures:
 -        feedback_fn("Warning: can't remove instance's disks")
 -      else:
 -        raise errors.OpExecError("Can't remove instance's disks")
  
 -    logging.info("Removing instance %s out of cluster config", instance.name)
 +def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
 +  """Utility function to remove an instance.
 +
 +  """
 +  logging.info("Removing block devices for instance %s", instance.name)
 +
 +  if not _RemoveDisks(lu, instance):
 +    if not ignore_failures:
 +      raise errors.OpExecError("Can't remove instance's disks")
 +    feedback_fn("Warning: can't remove instance's disks")
 +
 +  logging.info("Removing instance %s out of cluster config", instance.name)
 +
 +  lu.cfg.RemoveInstance(instance.name)
 +
 +  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
 +    "Instance lock removal conflict"
  
 -    self.cfg.RemoveInstance(instance.name)
 -    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 +  # Remove lock for the instance
 +  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
  
  
  class LUQueryInstances(NoHooksLU):
@@@ -5665,10 -5789,16 +5800,16 @@@ class LUCreateInstance(LogicalUnit)
      # for tools
      if not hasattr(self.op, "name_check"):
        self.op.name_check = True
+     # validate/normalize the instance name
+     self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
      if self.op.ip_check and not self.op.name_check:
        # TODO: make the ip check more flexible and not depend on the name check
        raise errors.OpPrereqError("Cannot do ip checks without a name check",
                                   errors.ECODE_INVAL)
+     if (self.op.disk_template == constants.DT_FILE and
+         not constants.ENABLE_FILE_STORAGE):
+       raise errors.OpPrereqError("File storage disabled at configure time",
+                                  errors.ECODE_INVAL)
  
    def ExpandNames(self):
      """ExpandNames for CreateInstance.
            self.needed_locks[locking.LEVEL_NODE].append(src_node)
          if not os.path.isabs(src_path):
            self.op.src_path = src_path = \
-             os.path.join(constants.EXPORT_DIR, src_path)
+             utils.PathJoin(constants.EXPORT_DIR, src_path)
  
        # On import force_variant must be True, because if we forced it at
        # initial install, our only chance when importing it back is that it
                                   " iallocator '%s': %s" %
                                   (self.op.iallocator, ial.info),
                                   errors.ECODE_NORES)
-     if len(ial.nodes) != ial.required_nodes:
+     if len(ial.result) != ial.required_nodes:
        raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                   " of nodes (%s), required %s" %
-                                  (self.op.iallocator, len(ial.nodes),
+                                  (self.op.iallocator, len(ial.result),
                                    ial.required_nodes), errors.ECODE_FAULT)
-     self.op.pnode = ial.nodes[0]
+     self.op.pnode = ial.result[0]
      self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                   self.op.instance_name, self.op.iallocator,
-                  utils.CommaJoin(ial.nodes))
+                  utils.CommaJoin(ial.result))
      if ial.required_nodes == 2:
-       self.op.snode = ial.nodes[1]
+       self.op.snode = ial.result[1]
  
    def BuildHooksEnv(self):
      """Build hooks env.
            self.secondaries)
      return env, nl, nl
  
 -
    def CheckPrereq(self):
      """Check prerequisites.
  
            if src_path in exp_list[node].payload:
              found = True
              self.op.src_node = src_node = node
-             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
-                                                        src_path)
+             self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+                                                          src_path)
              break
          if not found:
            raise errors.OpPrereqError("No export found for relative path %s" %
          if export_info.has_option(constants.INISECT_INS, option):
            # FIXME: are the old os-es, disk sizes, etc. useful?
            export_name = export_info.get(constants.INISECT_INS, option)
-           image = os.path.join(src_path, export_name)
+           image = utils.PathJoin(src_path, export_name)
            disk_images.append(image)
          else:
            disk_images.append(False)
      else:
        network_port = None
  
 -    ##if self.op.vnc_bind_address is None:
 -    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
 -
      # this is needed because os.path.join does not accept None arguments
      if self.op.file_storage_dir is None:
        string_file_storage_dir = ""
        string_file_storage_dir = self.op.file_storage_dir
  
      # build the full file storage dir path
-     file_storage_dir = os.path.normpath(os.path.join(
-                                         self.cfg.GetFileStorageDir(),
-                                         string_file_storage_dir, instance))
+     file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+                                       string_file_storage_dir, instance)
  
 -
      disks = _GenerateDiskTemplate(self,
                                    self.op.disk_template,
                                    instance, pnode_name,
@@@ -6555,14 -6689,14 +6695,14 @@@ class TLReplaceDisks(Tasklet)
                                   " %s" % (iallocator_name, ial.info),
                                   errors.ECODE_NORES)
  
-     if len(ial.nodes) != ial.required_nodes:
+     if len(ial.result) != ial.required_nodes:
        raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                   " of nodes (%s), required %s" %
                                   (iallocator_name,
-                                   len(ial.nodes), ial.required_nodes),
+                                   len(ial.result), ial.required_nodes),
                                   errors.ECODE_FAULT)
  
-     remote_node_name = ial.nodes[0]
+     remote_node_name = ial.result[0]
  
      lu.LogInfo("Selected new secondary for instance '%s': %s",
                 instance_name, remote_node_name)
@@@ -7218,6 -7352,60 +7358,60 @@@ class LURepairNodeStorage(NoHooksLU)
                   (self.op.name, self.op.node_name))
  
  
+ class LUNodeEvacuationStrategy(NoHooksLU):
+   """Computes the node evacuation strategy.
+   """
+   _OP_REQP = ["nodes"]
+   REQ_BGL = False
+   def CheckArguments(self):
+     if not hasattr(self.op, "remote_node"):
+       self.op.remote_node = None
+     if not hasattr(self.op, "iallocator"):
+       self.op.iallocator = None
+     if self.op.remote_node is not None and self.op.iallocator is not None:
+       raise errors.OpPrereqError("Give either the iallocator or the new"
+                                  " secondary, not both", errors.ECODE_INVAL)
+   def ExpandNames(self):
+     self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+     self.needed_locks = locks = {}
+     if self.op.remote_node is None:
+       locks[locking.LEVEL_NODE] = locking.ALL_SET
+     else:
+       self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+       locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
+   def CheckPrereq(self):
+     pass
+   def Exec(self, feedback_fn):
+     if self.op.remote_node is not None:
+       instances = []
+       for node in self.op.nodes:
+         instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
+       result = []
+       for i in instances:
+         if i.primary_node == self.op.remote_node:
+           raise errors.OpPrereqError("Node %s is the primary node of"
+                                      " instance %s, cannot use it as"
+                                      " secondary" %
+                                      (self.op.remote_node, i.name),
+                                      errors.ECODE_INVAL)
+         result.append([i.name, self.op.remote_node])
+     else:
+       ial = IAllocator(self.cfg, self.rpc,
+                        mode=constants.IALLOCATOR_MODE_MEVAC,
+                        evac_nodes=self.op.nodes)
+       ial.Run(self.op.iallocator, validate=True)
+       if not ial.success:
+         raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
+                                  errors.ECODE_NORES)
+       result = ial.result
+     return result
  class LUGrowDisk(LogicalUnit):
    """Grow a disk of an instance.
  
@@@ -8057,22 -8245,11 +8251,22 @@@ class LUExportInstance(LogicalUnit)
      """Check the arguments.
  
      """
 +    _CheckBooleanOpField(self.op, "remove_instance")
 +    _CheckBooleanOpField(self.op, "ignore_remove_failures")
 +
      self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                      constants.DEFAULT_SHUTDOWN_TIMEOUT)
 +    self.remove_instance = getattr(self.op, "remove_instance", False)
 +    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
 +                                          False)
 +
 +    if self.remove_instance and not self.op.shutdown:
 +      raise errors.OpPrereqError("Can not remove instance without shutting it"
 +                                 " down before")
  
    def ExpandNames(self):
      self._ExpandAndLockInstance()
 +
      # FIXME: lock only instance primary and destination node
      #
      # Sad but true, for now we have do lock all nodes, as we don't know where
        "EXPORT_NODE": self.op.target_node,
        "EXPORT_DO_SHUTDOWN": self.op.shutdown,
        "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
 +      # TODO: Generic function for boolean env variables
 +      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
        }
      env.update(_BuildInstanceHookEnvByObject(self, self.instance))
      nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
      _CheckNodeNotDrained(self, self.dst_node.name)
  
      # instance disk type verification
 +    # TODO: Implement export support for file-based disks
      for disk in self.instance.disks:
        if disk.dev_type == constants.LD_FILE:
          raise errors.OpPrereqError("Export not supported for instances with"
        feedback_fn("Shutting down instance %s" % instance.name)
        result = self.rpc.call_instance_shutdown(src_node, instance,
                                                 self.shutdown_timeout)
 +      # TODO: Maybe ignore failures if ignore_remove_failures is set
        result.Raise("Could not shutdown instance %s on"
                     " node %s" % (instance.name, src_node))
  
              snap_disks.append(new_dev)
  
        finally:
 -        if self.op.shutdown and instance.admin_up:
 +        if self.op.shutdown and instance.admin_up and not self.remove_instance:
            feedback_fn("Starting instance %s" % instance.name)
            result = self.rpc.call_instance_start(src_node, instance, None, None)
            msg = result.fail_msg
          feedback_fn("Deactivating disks for %s" % instance.name)
          _ShutdownInstanceDisks(self, instance)
  
 +    # Remove instance if requested
 +    if self.remove_instance:
 +      feedback_fn("Removing instance %s" % instance.name)
 +      _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
 +
      nodelist = self.cfg.GetNodeList()
      nodelist.remove(dst_node.name)
  
            if msg:
              self.LogWarning("Could not remove older export for instance %s"
                              " on node %s: %s", iname, node, msg)
 +
      return fin_resu, dresults
  
  
@@@ -8522,33 -8689,42 +8716,42 @@@ class IAllocator(object)
    # pylint: disable-msg=R0902
    # lots of instance attributes
    _ALLO_KEYS = [
-     "mem_size", "disks", "disk_template",
+     "name", "mem_size", "disks", "disk_template",
      "os", "tags", "nics", "vcpus", "hypervisor",
      ]
    _RELO_KEYS = [
-     "relocate_from",
+     "name", "relocate_from",
+     ]
+   _EVAC_KEYS = [
+     "evac_nodes",
      ]
  
-   def __init__(self, cfg, rpc, mode, name, **kwargs):
+   def __init__(self, cfg, rpc, mode, **kwargs):
      self.cfg = cfg
      self.rpc = rpc
      # init buffer variables
      self.in_text = self.out_text = self.in_data = self.out_data = None
      # init all input fields so that pylint is happy
      self.mode = mode
-     self.name = name
      self.mem_size = self.disks = self.disk_template = None
      self.os = self.tags = self.nics = self.vcpus = None
      self.hypervisor = None
      self.relocate_from = None
+     self.name = None
+     self.evac_nodes = None
      # computed fields
      self.required_nodes = None
      # init result fields
-     self.success = self.info = self.nodes = None
+     self.success = self.info = self.result = None
      if self.mode == constants.IALLOCATOR_MODE_ALLOC:
        keyset = self._ALLO_KEYS
+       fn = self._AddNewInstance
      elif self.mode == constants.IALLOCATOR_MODE_RELOC:
        keyset = self._RELO_KEYS
+       fn = self._AddRelocateInstance
+     elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+       keyset = self._EVAC_KEYS
+       fn = self._AddEvacuateNodes
      else:
        raise errors.ProgrammerError("Unknown mode '%s' passed to the"
                                     " IAllocator" % self.mode)
          raise errors.ProgrammerError("Invalid input parameter '%s' to"
                                       " IAllocator" % key)
        setattr(self, key, kwargs[key])
      for key in keyset:
        if key not in kwargs:
          raise errors.ProgrammerError("Missing input parameter '%s' to"
                                       " IAllocator" % key)
-     self._BuildInputData()
+     self._BuildInputData(fn)
  
    def _ComputeClusterData(self):
      """Compute the generic allocator input data.
        hypervisor_name = self.hypervisor
      elif self.mode == constants.IALLOCATOR_MODE_RELOC:
        hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
+     elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+       hypervisor_name = cluster_info.enabled_hypervisors[0]
  
      node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
                                          hypervisor_name)
      done.
  
      """
-     data = self.in_data
      disk_space = _ComputeDiskSize(self.disk_template, self.disks)
  
      if self.disk_template in constants.DTS_NET_MIRROR:
      else:
        self.required_nodes = 1
      request = {
-       "type": "allocate",
        "name": self.name,
        "disk_template": self.disk_template,
        "tags": self.tags,
        "nics": self.nics,
        "required_nodes": self.required_nodes,
        }
-     data["request"] = request
+     return request
  
    def _AddRelocateInstance(self):
      """Add relocate instance data to allocator structure.
      disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
  
      request = {
-       "type": "relocate",
        "name": self.name,
        "disk_space_total": disk_space,
        "required_nodes": self.required_nodes,
        "relocate_from": self.relocate_from,
        }
-     self.in_data["request"] = request
+     return request
  
-   def _BuildInputData(self):
+   def _AddEvacuateNodes(self):
+     """Add evacuate nodes data to allocator structure.
+     """
+     request = {
+       "evac_nodes": self.evac_nodes
+       }
+     return request
+   def _BuildInputData(self, fn):
      """Build input data structures.
  
      """
      self._ComputeClusterData()
  
-     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-       self._AddNewInstance()
-     else:
-       self._AddRelocateInstance()
+     request = fn()
+     request["type"] = self.mode
+     self.in_data["request"] = request
  
      self.in_text = serializer.Dump(self.in_data)
  
      if not isinstance(rdict, dict):
        raise errors.OpExecError("Can't parse iallocator results: not a dict")
  
-     for key in "success", "info", "nodes":
+     # TODO: remove backwards compatiblity in later versions
+     if "nodes" in rdict and "result" not in rdict:
+       rdict["result"] = rdict["nodes"]
+       del rdict["nodes"]
+     for key in "success", "info", "result":
        if key not in rdict:
          raise errors.OpExecError("Can't parse iallocator results:"
                                   " missing key '%s'" % key)
        setattr(self, key, rdict[key])
  
-     if not isinstance(rdict["nodes"], list):
-       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
+     if not isinstance(rdict["result"], list):
+       raise errors.OpExecError("Can't parse iallocator results: 'result' key"
                                 " is not a list")
      self.out_data = rdict
  
@@@ -8867,6 -9055,10 +9082,10 @@@ class LUTestAllocator(NoHooksLU)
        fname = _ExpandInstanceName(self.cfg, self.op.name)
        self.op.name = fname
        self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
+     elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
+       if not hasattr(self.op, "evac_nodes"):
+         raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
+                                    " opcode input", errors.ECODE_INVAL)
      else:
        raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
                                   self.op.mode, errors.ECODE_INVAL)
                         vcpus=self.op.vcpus,
                         hypervisor=self.op.hypervisor,
                         )
-     else:
+     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
        ial = IAllocator(self.cfg, self.rpc,
                         mode=self.op.mode,
                         name=self.op.name,
                         relocate_from=list(self.relocate_from),
                         )
+     elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
+       ial = IAllocator(self.cfg, self.rpc,
+                        mode=self.op.mode,
+                        evac_nodes=self.op.evac_nodes)
+     else:
+       raise errors.ProgrammerError("Uncatched mode %s in"
+                                    " LUTestAllocator.Exec", self.op.mode)
  
      if self.op.direction == constants.IALLOCATOR_DIR_IN:
        result = ial.in_text
diff --combined lib/constants.py
@@@ -109,27 -109,43 +109,35 @@@ QUEUE_DIR = DATA_DIR + "/queue
  DAEMON_UTIL = _autoconf.PKGLIBDIR + "/daemon-util"
  ETC_HOSTS = "/etc/hosts"
  DEFAULT_FILE_STORAGE_DIR = _autoconf.FILE_STORAGE_DIR
+ ENABLE_FILE_STORAGE = _autoconf.ENABLE_FILE_STORAGE
  SYSCONFDIR = _autoconf.SYSCONFDIR
  TOOLSDIR = _autoconf.TOOLSDIR
  CONF_DIR = SYSCONFDIR + "/ganeti"
  
+ ALL_CERT_FILES = frozenset([SSL_CERT_FILE, RAPI_CERT_FILE])
  MASTER_SOCKET = SOCKET_DIR + "/ganeti-master"
  
  NODED = "ganeti-noded"
  CONFD = "ganeti-confd"
  RAPI = "ganeti-rapi"
  MASTERD = "ganeti-masterd"
+ # used in the ganeti-nbma project
+ NLD = "ganeti-nld"
  
 -MULTITHREADED_DAEMONS = frozenset([MASTERD])
 -
 -DAEMONS_SSL = {
 -  # daemon-name: (default-cert-path, default-key-path)
 -  NODED: (SSL_CERT_FILE, SSL_CERT_FILE),
 -  RAPI: (RAPI_CERT_FILE, RAPI_CERT_FILE),
 -}
 -
  DAEMONS_PORTS = {
    # daemon-name: ("proto", "default-port")
    NODED: ("tcp", 1811),
    CONFD: ("udp", 1814),
    RAPI: ("tcp", 5080),
-   }
+   # used in the ganeti-nbma project
+   NLD: ("udp", 1816),
+ }
  DEFAULT_NODED_PORT = DAEMONS_PORTS[NODED][1]
  DEFAULT_CONFD_PORT = DAEMONS_PORTS[CONFD][1]
  DEFAULT_RAPI_PORT = DAEMONS_PORTS[RAPI][1]
+ # used in the ganeti-nbma project
+ DEFAULT_NLD_PORT = DAEMONS_PORTS[NLD][1]
  
  FIRST_DRBD_PORT = 11000
  LAST_DRBD_PORT = 14999
@@@ -142,8 -158,9 +150,10 @@@ DAEMONS_LOGFILES = 
    CONFD: LOG_DIR + "conf-daemon.log",
    RAPI: LOG_DIR + "rapi-daemon.log",
    MASTERD: LOG_DIR + "master-daemon.log",
+   # used in the ganeti-nbma project
+   NLD: LOG_DIR + "nl-daemon.log",
    }
 +
  LOG_OS_DIR = LOG_DIR + "os"
  LOG_WATCHER = LOG_DIR + "watcher.log"
  LOG_COMMANDS = LOG_DIR + "commands.log"
@@@ -170,14 -187,6 +180,14 @@@ SOCAT_PATH = _autoconf.SOCAT_PAT
  SOCAT_USE_ESCAPE = _autoconf.SOCAT_USE_ESCAPE
  SOCAT_ESCAPE_CODE = "0x1d"
  
 +# For RSA keys more bits are better, but they also make operations more
 +# expensive. NIST SP 800-131 recommends a minimum of 2048 bits from the year
 +# 2010 on.
 +RSA_KEY_BITS = 2048
 +
 +# Digest used to sign certificates ("openssl x509" uses SHA1 by default)
 +X509_CERT_SIGN_DIGEST = "SHA1"
 +
  VALUE_DEFAULT = "default"
  VALUE_AUTO = "auto"
  VALUE_GENERATE = "generate"
@@@ -193,6 -202,7 +203,7 @@@ HOOKS_BASE_DIR = CONF_DIR + "/hooks
  HOOKS_PHASE_PRE = "pre"
  HOOKS_PHASE_POST = "post"
  HOOKS_NAME_CFGUPDATE = "config-update"
+ HOOKS_NAME_WATCHER = "watcher"
  HOOKS_VERSION = 2
  
  # hooks subject type (what object type does the LU deal with)
@@@ -337,6 -347,13 +348,13 @@@ LVM_STRIPECOUNT = _autoconf.LVM_STRIPEC
  DEFAULT_SHUTDOWN_TIMEOUT = 120
  NODE_MAX_CLOCK_SKEW = 150
  
+ # runparts results
+ (RUNPARTS_SKIP,
+  RUNPARTS_RUN,
+  RUNPARTS_ERR) = range(3)
+ RUNPARTS_STATUS = frozenset([RUNPARTS_SKIP, RUNPARTS_RUN, RUNPARTS_ERR])
  # RPC constants
  (RPC_ENCODING_NONE,
   RPC_ENCODING_ZLIB_BASE64) = range(2)
@@@ -353,10 -370,10 +371,10 @@@ OS_API_FILE = 'ganeti_api_version
  OS_VARIANTS_FILE = 'variants.list'
  
  # ssh constants
- SSH_CONFIG_DIR = "/etc/ssh/"
- SSH_HOST_DSA_PRIV = SSH_CONFIG_DIR + "ssh_host_dsa_key"
+ SSH_CONFIG_DIR = _autoconf.SSH_CONFIG_DIR
+ SSH_HOST_DSA_PRIV = SSH_CONFIG_DIR + "/ssh_host_dsa_key"
  SSH_HOST_DSA_PUB = SSH_HOST_DSA_PRIV + ".pub"
- SSH_HOST_RSA_PRIV = SSH_CONFIG_DIR + "ssh_host_rsa_key"
+ SSH_HOST_RSA_PRIV = SSH_CONFIG_DIR + "/ssh_host_rsa_key"
  SSH_HOST_RSA_PUB = SSH_HOST_RSA_PRIV + ".pub"
  SSH = "ssh"
  SCP = "scp"
@@@ -407,6 -424,8 +425,8 @@@ HV_INIT_SCRIPT = "init_script
  HV_MIGRATION_PORT = "migration_port"
  HV_USE_LOCALTIME = "use_localtime"
  HV_DISK_CACHE = "disk_cache"
+ HV_SECURITY_MODEL = "security_model"
+ HV_SECURITY_DOMAIN = "security_domain"
  
  HVS_PARAMETER_TYPES = {
    HV_BOOT_ORDER: VTYPE_STRING,
    HV_MIGRATION_PORT: VTYPE_INT,
    HV_USE_LOCALTIME: VTYPE_BOOL,
    HV_DISK_CACHE: VTYPE_STRING,
+   HV_SECURITY_MODEL: VTYPE_STRING,
+   HV_SECURITY_DOMAIN: VTYPE_STRING,
    }
  
  HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
@@@ -535,6 -556,13 +557,13 @@@ HT_BO_NETWORK = "network
  
  HT_KVM_VALID_BO_TYPES = frozenset([HT_BO_CDROM, HT_BO_DISK, HT_BO_NETWORK])
  
+ # Security models
+ HT_SM_NONE = "none"
+ HT_SM_USER = "user"
+ HT_SM_POOL = "pool"
+ HT_KVM_VALID_SM_TYPES = frozenset([HT_SM_NONE, HT_SM_USER, HT_SM_POOL])
  # Cluster Verify steps
  VERIFY_NPLUSONE_MEM = 'nplusone_mem'
  VERIFY_OPTIONAL_CHECKS = frozenset([VERIFY_NPLUSONE_MEM])
@@@ -554,12 -582,17 +583,17 @@@ NV_DRBDLIST = "drbd-list
  NV_NODESETUP = "nodesetup"
  NV_TIME = "time"
  
+ # SSL certificate check constants (in days)
+ SSL_CERT_EXPIRATION_WARN = 30
+ SSL_CERT_EXPIRATION_ERROR = 7
  # Allocator framework constants
  IALLOCATOR_VERSION = 2
  IALLOCATOR_DIR_IN = "in"
  IALLOCATOR_DIR_OUT = "out"
  IALLOCATOR_MODE_ALLOC = "allocate"
  IALLOCATOR_MODE_RELOC = "relocate"
+ IALLOCATOR_MODE_MEVAC = "multi-evacuate"
  IALLOCATOR_SEARCH_PATH = _autoconf.IALLOCATOR_SEARCH_PATH
  
  # Job queue
@@@ -673,6 -706,8 +707,8 @@@ HVC_DEFAULTS = 
      HV_MIGRATION_PORT: 8102,
      HV_USE_LOCALTIME: False,
      HV_DISK_CACHE: HT_CACHE_DEFAULT,
+     HV_SECURITY_MODEL: HT_SM_NONE,
+     HV_SECURITY_DOMAIN: '',
      },
    HT_FAKE: {
      },
diff --combined lib/http/__init__.py
@@@ -32,6 -32,7 +32,6 @@@ import errn
  from cStringIO import StringIO
  
  from ganeti import constants
 -from ganeti import serializer
  from ganeti import utils
  
  
@@@ -65,9 -66,6 +65,9 @@@ HTTP_AUTHORIZATION = "Authorization
  HTTP_AUTHENTICATION_INFO = "Authentication-Info"
  HTTP_ALLOW = "Allow"
  
 +HTTP_APP_OCTET_STREAM = "application/octet-stream"
 +HTTP_APP_JSON = "application/json"
 +
  _SSL_UNEXPECTED_EOF = "Unexpected EOF"
  
  # Socket operations
@@@ -180,17 -178,6 +180,17 @@@ class HttpMethodNotAllowed(HttpExceptio
    code = 405
  
  
 +class HttpNotAcceptable(HttpException):
 +  """406 Not Acceptable
 +
 +  RFC2616, 10.4.7: The resource identified by the request is only capable of
 +  generating response entities which have content characteristics not
 +  acceptable according to the accept headers sent in the request.
 +
 +  """
 +  code = 406
 +
 +
  class HttpRequestTimeout(HttpException):
    """408 Request Timeout
  
@@@ -248,17 -235,6 +248,17 @@@ class HttpPreconditionFailed(HttpExcept
    code = 412
  
  
 +class HttpUnsupportedMediaType(HttpException):
 +  """415 Unsupported Media Type
 +
 +  RFC2616, 10.4.16: The server is refusing to service the request because the
 +  entity of the request is in a format not supported by the requested resource
 +  for the requested method.
 +
 +  """
 +  code = 415
 +
 +
  class HttpInternalServerError(HttpException):
    """500 Internal Server Error
  
@@@ -323,6 -299,18 +323,6 @@@ class HttpVersionNotSupported(HttpExcep
    code = 505
  
  
 -class HttpJsonConverter: # pylint: disable-msg=W0232
 -  CONTENT_TYPE = "application/json"
 -
 -  @staticmethod
 -  def Encode(data):
 -    return serializer.DumpJson(data)
 -
 -  @staticmethod
 -  def Decode(data):
 -    return serializer.LoadJson(data)
 -
 -
  def WaitForSocketCondition(sock, event, timeout):
    """Waits for a condition to occur on the socket.
  
@@@ -415,9 -403,9 +415,9 @@@ def SocketOperation(sock, op, arg1, tim
        if event is None:
          raise HttpSocketTimeout()
  
-       if (op == SOCKOP_RECV and
-           event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
-         return ""
+       if event & (select.POLLNVAL | select.POLLHUP | select.POLLERR):
+         # Let the socket functions handle these
+         break
  
        if not event & wait_for_event:
          continue
@@@ -668,6 -656,7 +668,6 @@@ class HttpMessage(object)
      self.start_line = None
      self.headers = None
      self.body = None
 -    self.decoded_body = None
  
  
  class HttpClientToServerStartLine(object):
@@@ -826,7 -815,7 +826,7 @@@ class HttpMessageReader(object)
        buf = self._ContinueParsing(buf, eof)
  
        # Must be done only after the buffer has been evaluated
 -      # TODO: Connection-length < len(data read) and connection closed
 +      # TODO: Content-Length < len(data read) and connection closed
        if (eof and
            self.parser_status in (self.PS_START_LINE,
                                   self.PS_HEADERS)):
      assert self.parser_status == self.PS_COMPLETE
      assert not buf, "Parser didn't read full response"
  
 +    # Body is complete
      msg.body = self.body_buffer.getvalue()
  
 -    # TODO: Content-type, error handling
 -    if msg.body:
 -      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
 -    else:
 -      msg.decoded_body = None
 -
 -    if msg.decoded_body:
 -      logging.debug("Message body: %s", msg.decoded_body)
 -
    def _ContinueParsing(self, buf, eof):
      """Main function for HTTP message state machine.
  
          # the CRLF."
          if idx == 0:
            # TODO: Limit number of CRLFs/empty lines for safety?
-           buf = buf[:2]
+           buf = buf[2:]
            continue
  
          if idx > 0:
diff --combined lib/http/server.py
@@@ -74,12 -74,12 +74,12 @@@ class _HttpServerRequest(object)
    """Data structure for HTTP request on server side.
  
    """
-   def __init__(self, request_msg):
+   def __init__(self, method, path, headers, body):
      # Request attributes
-     self.request_method = request_msg.start_line.method
-     self.request_path = request_msg.start_line.path
-     self.request_headers = request_msg.headers
-     self.request_body = request_msg.body
+     self.request_method = method
+     self.request_path = path
+     self.request_headers = headers
+     self.request_body = body
  
      # Response attributes
      self.resp_headers = {}
      # authentication)
      self.private = None
  
+   def __repr__(self):
+     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+               self.request_method, self.request_path,
+               "headers=%r" % str(self.request_headers),
+               "body=%r" % (self.request_body, )]
+     return "<%s at %#x>" % (" ".join(status), id(self))
  
  class _HttpServerToClientMessageWriter(http.HttpMessageWriter):
    """Writes an HTTP response to client.
@@@ -269,14 -277,6 +277,14 @@@ class HttpServerRequestExecutor(object)
          try:
            try:
              request_msg_reader = self._ReadRequest()
 +
 +            # RFC2616, 14.23: All Internet-based HTTP/1.1 servers MUST respond
 +            # with a 400 (Bad Request) status code to any HTTP/1.1 request
 +            # message which lacks a Host header field.
 +            if (self.request_msg.start_line.version == http.HTTP_1_1 and
 +                http.HTTP_HOST not in self.request_msg.headers):
 +              raise http.HttpBadRequest(message="Missing Host header")
 +
              self._HandleRequest()
  
              # Only wait for client to close if we didn't have any exception.
      """Calls the handler function for the current request.
  
      """
-     handler_context = _HttpServerRequest(self.request_msg)
+     handler_context = _HttpServerRequest(self.request_msg.start_line.method,
+                                          self.request_msg.start_line.path,
+                                          self.request_msg.headers,
 -                                         self.request_msg.decoded_body)
++                                         self.request_msg.body)
+     logging.debug("Handling request %r", handler_context)
  
      try:
        try:
          logging.exception("Unknown exception")
          raise http.HttpInternalServerError(message="Unknown error")
  
 -      # TODO: Content-type
 -      encoder = http.HttpJsonConverter()
 +      if not isinstance(result, basestring):
 +        raise http.HttpError("Handler function didn't return string type")
 +
        self.response_msg.start_line.code = http.HTTP_OK
 -      self.response_msg.body = encoder.Encode(result)
        self.response_msg.headers = handler_context.resp_headers
 -      self.response_msg.headers[http.HTTP_CONTENT_TYPE] = encoder.CONTENT_TYPE
 +      self.response_msg.body = result
      finally:
        # No reason to keep this any longer, even for exceptions
        handler_context.private = None
      """
      return self.error_message_format % values
  
 +
  class HttpServer(http.HttpBase, asyncore.dispatcher):
    """Generic HTTP server class
  
diff --combined lib/luxi.py
@@@ -33,15 -33,15 +33,16 @@@ import socke
  import collections
  import time
  import errno
 +import logging
  
  from ganeti import serializer
  from ganeti import constants
  from ganeti import errors
+ from ganeti import utils
  
  
 -KEY_METHOD = 'method'
 -KEY_ARGS = 'args'
 +KEY_METHOD = "method"
 +KEY_ARGS = "args"
  KEY_SUCCESS = "success"
  KEY_RESULT = "result"
  
@@@ -65,8 -65,8 +66,8 @@@ DEF_CTMO = 1
  DEF_RWTO = 60
  
  
 -class ProtocolError(Exception):
 -  """Denotes an error in the server communication"""
 +class ProtocolError(errors.GenericError):
 +  """Denotes an error in the LUXI protocol"""
  
  
  class ConnectionClosedError(ProtocolError):
@@@ -77,6 -77,14 +78,6 @@@ class TimeoutError(ProtocolError)
    """Operation timeout error"""
  
  
 -class EncodingError(ProtocolError):
 -  """Encoding failure on the sending side"""
 -
 -
 -class DecodingError(ProtocolError):
 -  """Decoding failure on the receiving side"""
 -
 -
  class RequestError(ProtocolError):
    """Error on request
  
@@@ -149,15 -157,14 +150,14 @@@ class Transport
  
      try:
        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-       self.socket.settimeout(self._ctimeout)
+       # Try to connect
        try:
-         self.socket.connect(address)
-       except socket.timeout, err:
-         raise TimeoutError("Connect timed out: %s" % str(err))
-       except socket.error, err:
-         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
-           raise NoMasterError(address)
-         raise
+         utils.Retry(self._Connect, 1.0, self._ctimeout,
+                     args=(self.socket, address, self._ctimeout))
+       except utils.RetryTimeout:
+         raise TimeoutError("Connect timed out")
        self.socket.settimeout(self._rwtimeout)
      except (socket.error, NoMasterError):
        if self.socket is not None:
        self.socket = None
        raise
  
+   @staticmethod
+   def _Connect(sock, address, timeout):
+     sock.settimeout(timeout)
+     try:
+       sock.connect(address)
+     except socket.timeout, err:
+       raise TimeoutError("Connect timed out: %s" % str(err))
+     except socket.error, err:
+       if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+         raise NoMasterError(address)
+       if err.args[0] == errno.EAGAIN:
+         # Server's socket backlog is full at the moment
+         raise utils.RetryAgain()
+       raise
    def _CheckSocket(self):
      """Make sure we are connected.
  
  
      """
      if self.eom in msg:
 -      raise EncodingError("Message terminator found in payload")
 +      raise ProtocolError("Message terminator found in payload")
 +
      self._CheckSocket()
      try:
        # TODO: sendall is not guaranteed to send everything
        self.socket = None
  
  
 +def ParseRequest(msg):
 +  """Parses a LUXI request message.
 +
 +  """
 +  try:
 +    request = serializer.LoadJson(msg)
 +  except ValueError, err:
 +    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
 +
 +  logging.debug("LUXI request: %s", request)
 +
 +  if not isinstance(request, dict):
 +    logging.error("LUXI request not a dict: %r", msg)
 +    raise ProtocolError("Invalid LUXI request (not a dict)")
 +
 +  method = request.get(KEY_METHOD, None)
 +  args = request.get(KEY_ARGS, None)
 +  if method is None or args is None:
 +    logging.error("LUXI request missing method or arguments: %r", msg)
 +    raise ProtocolError(("Invalid LUXI request (no method or arguments"
 +                         " in request): %r") % msg)
 +
 +  return (method, args)
 +
 +
 +def ParseResponse(msg):
 +  """Parses a LUXI response message.
 +
 +  """
 +  # Parse the result
 +  try:
 +    data = serializer.LoadJson(msg)
 +  except Exception, err:
 +    raise ProtocolError("Error while deserializing response: %s" % str(err))
 +
 +  # Validate response
 +  if not (isinstance(data, dict) and
 +          KEY_SUCCESS in data and
 +          KEY_RESULT in data):
 +    raise ProtocolError("Invalid response from server: %r" % data)
 +
 +  return (data[KEY_SUCCESS], data[KEY_RESULT])
 +
 +
 +def FormatResponse(success, result):
 +  """Formats a LUXI response message.
 +
 +  """
 +  response = {
 +    KEY_SUCCESS: success,
 +    KEY_RESULT: result,
 +    }
 +
 +  logging.debug("LUXI response: %s", response)
 +
 +  return serializer.DumpJson(response)
 +
 +
 +def FormatRequest(method, args):
 +  """Formats a LUXI request message.
 +
 +  """
 +  # Build request
 +  request = {
 +    KEY_METHOD: method,
 +    KEY_ARGS: args,
 +    }
 +
 +  # Serialize the request
 +  return serializer.DumpJson(request, indent=False)
 +
 +
 +def CallLuxiMethod(transport_cb, method, args):
 +  """Send a LUXI request via a transport and return the response.
 +
 +  """
 +  assert callable(transport_cb)
 +
 +  request_msg = FormatRequest(method, args)
 +
 +  # Send request and wait for response
 +  response_msg = transport_cb(request_msg)
 +
 +  (success, result) = ParseResponse(response_msg)
 +
 +  if success:
 +    return result
 +
 +  errors.MaybeRaise(result)
 +  raise RequestError(result)
 +
 +
  class Client(object):
    """High-level client implementation.
  
      except Exception: # pylint: disable-msg=W0703
        pass
  
 -  def CallMethod(self, method, args):
 -    """Send a generic request and return the response.
 -
 -    """
 -    # Build request
 -    request = {
 -      KEY_METHOD: method,
 -      KEY_ARGS: args,
 -      }
 -
 -    # Serialize the request
 -    send_data = serializer.DumpJson(request, indent=False)
 -
 +  def _SendMethodCall(self, data):
      # Send request and wait for response
      try:
        self._InitTransport()
 -      result = self.transport.Call(send_data)
 +      return self.transport.Call(data)
      except Exception:
        self._CloseTransport()
        raise
  
 -    # Parse the result
 -    try:
 -      data = serializer.LoadJson(result)
 -    except Exception, err:
 -      raise ProtocolError("Error while deserializing response: %s" % str(err))
 -
 -    # Validate response
 -    if (not isinstance(data, dict) or
 -        KEY_SUCCESS not in data or
 -        KEY_RESULT not in data):
 -      raise DecodingError("Invalid response from server: %s" % str(data))
 -
 -    result = data[KEY_RESULT]
 -
 -    if not data[KEY_SUCCESS]:
 -      errors.MaybeRaise(result)
 -      raise RequestError(result)
 +  def CallMethod(self, method, args):
 +    """Send a generic request and return the response.
  
 -    return result
 +    """
 +    return CallLuxiMethod(self._SendMethodCall, method, args)
  
    def SetQueueDrainFlag(self, drain_flag):
      return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
      timeout = (DEF_RWTO - 1) / 2
      return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
  
-   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+   def WaitForJobChangeOnce(self, job_id, fields,
+                            prev_job_info, prev_log_serial):
      timeout = (DEF_RWTO - 1) / 2
+     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                            (job_id, fields, prev_job_info,
+                             prev_log_serial, timeout))
+   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
      while True:
-       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
-                                (job_id, fields, prev_job_info,
-                                 prev_log_serial, timeout))
+       result = self.WaitForJobChangeOnce(job_id, fields,
+                                          prev_job_info, prev_log_serial)
        if result != constants.JOB_NOTCHANGED:
          break
      return result
  
    def QueryTags(self, kind, name):
      return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
- # TODO: class Server(object)
diff --combined lib/objects.py
@@@ -48,7 -48,6 +48,7 @@@ __all__ = ["ConfigObject", "ConfigData"
  _TIMESTAMPS = ["ctime", "mtime"]
  _UUID = ["uuid"]
  
 +
  def FillDict(defaults_dict, custom_dict, skip_keys=None):
    """Basic function to apply settings on top a default dict.
  
@@@ -412,6 -411,9 +412,9 @@@ class Disk(ConfigObject)
      irrespective of their status. For such devices, we return this
      path, for others we return None.
  
+     @warning: The path returned is not a normalized pathname; callers
+         should check that it is a valid path.
      """
      if self.dev_type == constants.LD_LV:
        return "/dev/%s/%s" % (self.logical_id[0], self.logical_id[1])
@@@ -856,6 -858,7 +859,7 @@@ class Cluster(TaggableObject)
      "file_storage_dir",
      "enabled_hypervisors",
      "hvparams",
+     "os_hvp",
      "beparams",
      "nicparams",
      "candidate_pool_size",
          self.hvparams[hypervisor] = FillDict(
              constants.HVC_DEFAULTS[hypervisor], self.hvparams[hypervisor])
  
+     # TODO: Figure out if it's better to put this into OS than Cluster
+     if self.os_hvp is None:
+       self.os_hvp = {}
      self.beparams = UpgradeGroupedParams(self.beparams,
                                           constants.BEC_DEFAULTS)
      migrate_default_bridge = not self.nicparams
        skip_keys = constants.HVC_GLOBALS
      else:
        skip_keys = []
-     return FillDict(self.hvparams.get(instance.hypervisor, {}),
-                     instance.hvparams, skip_keys=skip_keys)
+     # We fill the list from least to most important override
+     fill_stack = [
+       self.hvparams.get(instance.hypervisor, {}),
+       self.os_hvp.get(instance.os, {}).get(instance.hypervisor, {}),
+       instance.hvparams,
+       ]
+     ret_dict = {}
+     for o_dict in fill_stack:
+       ret_dict = FillDict(ret_dict, o_dict, skip_keys=skip_keys)
+     return ret_dict
  
    def FillBE(self, instance):
      """Fill an instance's beparams dict.
diff --combined lib/opcodes.py
@@@ -90,7 -90,7 +90,7 @@@ class BaseOpCode(object)
                         type(state))
  
      for name in self._all_slots():
-       if name not in state:
+       if name not in state and hasattr(self, name):
          delattr(self, name)
  
      for name in state:
@@@ -301,6 -301,7 +301,7 @@@ class OpSetClusterParams(OpCode)
      "vg_name",
      "enabled_hypervisors",
      "hvparams",
+     "os_hvp",
      "beparams",
      "nicparams",
      "candidate_pool_size",
@@@ -412,6 -413,7 +413,7 @@@ class OpSetNodeParams(OpCode)
      "master_candidate",
      "offline",
      "drained",
+     "auto_promote",
      ]
  
  
@@@ -444,6 -446,13 +446,13 @@@ class OpMigrateNode(OpCode)
      ]
  
  
+ class OpNodeEvacuationStrategy(OpCode):
+   """Compute the evacuation strategy for a list of nodes."""
+   OP_ID = "OP_NODE_EVAC_STRATEGY"
+   OP_DSC_FIELD = "nodes"
+   __slots__ = ["nodes", "iallocator", "remote_node"]
  # instance opcodes
  
  class OpCreateInstance(OpCode):
@@@ -644,8 -653,6 +653,8 @@@ class OpExportInstance(OpCode)
    OP_DSC_FIELD = "instance_name"
    __slots__ = [
      "instance_name", "target_node", "shutdown", "shutdown_timeout",
 +    "remove_instance",
 +    "ignore_remove_failures",
      ]
  
  
@@@ -727,6 -734,7 +736,7 @@@ class OpTestAllocator(OpCode)
      "direction", "mode", "allocator", "name",
      "mem_size", "disks", "disk_template",
      "os", "tags", "nics", "vcpus", "hypervisor",
+     "evac_nodes",
      ]
  
  
diff --combined lib/utils.py
@@@ -28,7 -28,6 +28,7 @@@ the command line scripts
  
  
  import os
 +import sys
  import time
  import subprocess
  import re
@@@ -44,7 -43,8 +44,9 @@@ import resourc
  import logging
  import logging.handlers
  import signal
 +import OpenSSL
+ import datetime
+ import calendar
  
  from cStringIO import StringIO
  
@@@ -120,18 -120,7 +122,23 @@@ class RunResult(object)
    output = property(_GetOutput, None, None, "Return full output")
  
  
- def _BuildCmdEnvironment(env):
 -def RunCmd(cmd, env=None, output=None, cwd='/', reset_env=False):
++def _BuildCmdEnvironment(env, reset):
 +  """Builds the environment for an external program.
 +
 +  """
-   cmd_env = os.environ.copy()
-   cmd_env["LC_ALL"] = "C"
++  if reset:
++    cmd_env = {}
++  else:
++    cmd_env = os.environ.copy()
++    cmd_env["LC_ALL"] = "C"
++
 +  if env is not None:
 +    cmd_env.update(env)
++
 +  return cmd_env
 +
 +
- def RunCmd(cmd, env=None, output=None, cwd="/"):
++def RunCmd(cmd, env=None, output=None, cwd="/", reset_env=False):
    """Execute a (shell) command.
  
    The command should not read from its standard input, as it will be
    @type cmd: string or list
    @param cmd: Command to run
    @type env: dict
 -  @param env: Additional environment
 +  @param env: Additional environment variables
    @type output: str
    @param output: if desired, the output of the command can be
        saved in a file instead of the RunResult instance; this
    @type cwd: string
    @param cwd: if specified, will be used as the working
        directory for the command; the default will be /
+   @type reset_env: boolean
+   @param reset_env: whether to reset or keep the default os environment
    @rtype: L{RunResult}
    @return: RunResult instance
    @raise errors.ProgrammerError: if we call this when forks are disabled
    if no_fork:
      raise errors.ProgrammerError("utils.RunCmd() called with fork() disabled")
  
 -  if isinstance(cmd, list):
 -    cmd = [str(val) for val in cmd]
 -    strcmd = " ".join(cmd)
 -    shell = False
 -  else:
 +  if isinstance(cmd, basestring):
      strcmd = cmd
      shell = True
 -  logging.debug("RunCmd '%s'", strcmd)
 +  else:
 +    cmd = [str(val) for val in cmd]
 +    strcmd = ShellQuoteArgs(cmd)
 +    shell = False
  
 -  if not reset_env:
 -    cmd_env = os.environ.copy()
 -    cmd_env["LC_ALL"] = "C"
 +  if output:
 +    logging.debug("RunCmd %s, output file '%s'", strcmd, output)
    else:
 -    cmd_env = {}
 +    logging.debug("RunCmd %s", strcmd)
  
-   cmd_env = _BuildCmdEnvironment(env)
 -  if env is not None:
 -    cmd_env.update(env)
++  cmd_env = _BuildCmdEnvironment(env, reset_env)
  
    try:
      if output is None:
    return RunResult(exitcode, signal_, out, err, strcmd)
  
  
 +def StartDaemon(cmd, env=None, cwd="/", output=None, output_fd=None,
 +                pidfile=None):
 +  """Start a daemon process after forking twice.
 +
 +  @type cmd: string or list
 +  @param cmd: Command to run
 +  @type env: dict
 +  @param env: Additional environment variables
 +  @type cwd: string
 +  @param cwd: Working directory for the program
 +  @type output: string
 +  @param output: Path to file in which to save the output
 +  @type output_fd: int
 +  @param output_fd: File descriptor for output
 +  @type pidfile: string
 +  @param pidfile: Process ID file
 +  @rtype: int
 +  @return: Daemon process ID
 +  @raise errors.ProgrammerError: if we call this when forks are disabled
 +
 +  """
 +  if no_fork:
 +    raise errors.ProgrammerError("utils.StartDaemon() called with fork()"
 +                                 " disabled")
 +
 +  if output and not (bool(output) ^ (output_fd is not None)):
 +    raise errors.ProgrammerError("Only one of 'output' and 'output_fd' can be"
 +                                 " specified")
 +
 +  if isinstance(cmd, basestring):
 +    cmd = ["/bin/sh", "-c", cmd]
 +
 +  strcmd = ShellQuoteArgs(cmd)
 +
 +  if output:
 +    logging.debug("StartDaemon %s, output file '%s'", strcmd, output)
 +  else:
 +    logging.debug("StartDaemon %s", strcmd)
 +
-   cmd_env = _BuildCmdEnvironment(env)
++  cmd_env = _BuildCmdEnvironment(env, False)
 +
 +  # Create pipe for sending PID back
 +  (pidpipe_read, pidpipe_write) = os.pipe()
 +  try:
 +    try:
 +      # Create pipe for sending error messages
 +      (errpipe_read, errpipe_write) = os.pipe()
 +      try:
 +        try:
 +          # First fork
 +          pid = os.fork()
 +          if pid == 0:
 +            try:
 +              # Child process, won't return
 +              _StartDaemonChild(errpipe_read, errpipe_write,
 +                                pidpipe_read, pidpipe_write,
 +                                cmd, cmd_env, cwd,
 +                                output, output_fd, pidfile)
 +            finally:
 +              # Well, maybe child process failed
 +              os._exit(1) # pylint: disable-msg=W0212
 +        finally:
 +          _CloseFDNoErr(errpipe_write)
 +
 +        # Wait for daemon to be started (or an error message to arrive) and read
 +        # up to 100 KB as an error message
 +        errormsg = RetryOnSignal(os.read, errpipe_read, 100 * 1024)
 +      finally:
 +        _CloseFDNoErr(errpipe_read)
 +    finally:
 +      _CloseFDNoErr(pidpipe_write)
 +
 +    # Read up to 128 bytes for PID
 +    pidtext = RetryOnSignal(os.read, pidpipe_read, 128)
 +  finally:
 +    _CloseFDNoErr(pidpipe_read)
 +
 +  # Try to avoid zombies by waiting for child process
 +  try:
 +    os.waitpid(pid, 0)
 +  except OSError:
 +    pass
 +
 +  if errormsg:
 +    raise errors.OpExecError("Error when starting daemon process: %r" %
 +                             errormsg)
 +
 +  try:
 +    return int(pidtext)
 +  except (ValueError, TypeError), err:
 +    raise errors.OpExecError("Error while trying to parse PID %r: %s" %
 +                             (pidtext, err))
 +
 +
 +def _StartDaemonChild(errpipe_read, errpipe_write,
 +                      pidpipe_read, pidpipe_write,
 +                      args, env, cwd,
 +                      output, fd_output, pidfile):
 +  """Child process for starting daemon.
 +
 +  """
 +  try:
 +    # Close parent's side
 +    _CloseFDNoErr(errpipe_read)
 +    _CloseFDNoErr(pidpipe_read)
 +
 +    # First child process
 +    os.chdir("/")
 +    os.umask(077)
 +    os.setsid()
 +
 +    # And fork for the second time
 +    pid = os.fork()
 +    if pid != 0:
 +      # Exit first child process
 +      os._exit(0) # pylint: disable-msg=W0212
 +
 +    # Make sure pipe is closed on execv* (and thereby notifies original process)
 +    SetCloseOnExecFlag(errpipe_write, True)
 +
 +    # List of file descriptors to be left open
 +    noclose_fds = [errpipe_write]
 +
 +    # Open PID file
 +    if pidfile:
 +      try:
 +        # TODO: Atomic replace with another locked file instead of writing into
 +        # it after creating
 +        fd_pidfile = os.open(pidfile, os.O_WRONLY | os.O_CREAT, 0600)
 +
 +        # Lock the PID file (and fail if not possible to do so). Any code
 +        # wanting to send a signal to the daemon should try to lock the PID
 +        # file before reading it. If acquiring the lock succeeds, the daemon is
 +        # no longer running and the signal should not be sent.
 +        LockFile(fd_pidfile)
 +
 +        os.write(fd_pidfile, "%d\n" % os.getpid())
 +      except Exception, err:
 +        raise Exception("Creating and locking PID file failed: %s" % err)
 +
 +      # Keeping the file open to hold the lock
 +      noclose_fds.append(fd_pidfile)
 +
 +      SetCloseOnExecFlag(fd_pidfile, False)
 +    else:
 +      fd_pidfile = None
 +
 +    # Open /dev/null
 +    fd_devnull = os.open(os.devnull, os.O_RDWR)
 +
 +    assert not output or (bool(output) ^ (fd_output is not None))
 +
 +    if fd_output is not None:
 +      pass
 +    elif output:
 +      # Open output file
 +      try:
 +        # TODO: Implement flag to set append=yes/no
 +        fd_output = os.open(output, os.O_WRONLY | os.O_CREAT, 0600)
 +      except EnvironmentError, err:
 +        raise Exception("Opening output file failed: %s" % err)
 +    else:
 +      fd_output = fd_devnull
 +
 +    # Redirect standard I/O
 +    os.dup2(fd_devnull, 0)
 +    os.dup2(fd_output, 1)
 +    os.dup2(fd_output, 2)
 +
 +    # Send daemon PID to parent
 +    RetryOnSignal(os.write, pidpipe_write, str(os.getpid()))
 +
 +    # Close all file descriptors except stdio and error message pipe
 +    CloseFDs(noclose_fds=noclose_fds)
 +
 +    # Change working directory
 +    os.chdir(cwd)
 +
 +    if env is None:
 +      os.execvp(args[0], args)
 +    else:
 +      os.execvpe(args[0], args, env)
 +  except: # pylint: disable-msg=W0702
 +    try:
 +      # Report errors to original process
 +      buf = str(sys.exc_info()[1])
 +
 +      RetryOnSignal(os.write, errpipe_write, buf)
 +    except: # pylint: disable-msg=W0702
 +      # Ignore errors in error handling
 +      pass
 +
 +  os._exit(1) # pylint: disable-msg=W0212
 +
 +
  def _RunCmdPipe(cmd, env, via_shell, cwd):
    """Run a command and return its output.
  
      child.stderr.fileno(): (err, child.stderr),
      }
    for fd in fdmap:
 -    status = fcntl.fcntl(fd, fcntl.F_GETFL)
 -    fcntl.fcntl(fd, fcntl.F_SETFL, status | os.O_NONBLOCK)
 +    SetNonblockFlag(fd, True)
  
    while fdmap:
 -    try:
 -      pollresult = poller.poll()
 -    except EnvironmentError, eerr:
 -      if eerr.errno == errno.EINTR:
 -        continue
 -      raise
 -    except select.error, serr:
 -      if serr[0] == errno.EINTR:
 -        continue
 -      raise
 +    pollresult = RetryOnSignal(poller.poll)
  
      for fd, event in pollresult:
        if event & select.POLLIN or event & select.POLLPRI:
@@@ -481,59 -290,43 +490,96 @@@ def _RunCmdFile(cmd, env, via_shell, ou
    return status
  
  
 +def SetCloseOnExecFlag(fd, enable):
 +  """Sets or unsets the close-on-exec flag on a file descriptor.
 +
 +  @type fd: int
 +  @param fd: File descriptor
 +  @type enable: bool
 +  @param enable: Whether to set or unset it.
 +
 +  """
 +  flags = fcntl.fcntl(fd, fcntl.F_GETFD)
 +
 +  if enable:
 +    flags |= fcntl.FD_CLOEXEC
 +  else:
 +    flags &= ~fcntl.FD_CLOEXEC
 +
 +  fcntl.fcntl(fd, fcntl.F_SETFD, flags)
 +
 +
 +def SetNonblockFlag(fd, enable):
 +  """Sets or unsets the O_NONBLOCK flag on on a file descriptor.
 +
 +  @type fd: int
 +  @param fd: File descriptor
 +  @type enable: bool
 +  @param enable: Whether to set or unset it
 +
 +  """
 +  flags = fcntl.fcntl(fd, fcntl.F_GETFL)
 +
 +  if enable:
 +    flags |= os.O_NONBLOCK
 +  else:
 +    flags &= ~os.O_NONBLOCK
 +
 +  fcntl.fcntl(fd, fcntl.F_SETFL, flags)
 +
 +
 +def RetryOnSignal(fn, *args, **kwargs):
 +  """Calls a function again if it failed due to EINTR.
 +
 +  """
 +  while True:
 +    try:
 +      return fn(*args, **kwargs)
 +    except EnvironmentError, err:
 +      if err.errno != errno.EINTR:
 +        raise
 +    except select.error, err:
 +      if not (err.args and err.args[0] == errno.EINTR):
 +        raise
 +
 +
+ def RunParts(dir_name, env=None, reset_env=False):
+   """Run Scripts or programs in a directory
+   @type dir_name: string
+   @param dir_name: absolute path to a directory
+   @type env: dict
+   @param env: The environment to use
+   @type reset_env: boolean
+   @param reset_env: whether to reset or keep the default os environment
+   @rtype: list of tuples
+   @return: list of (name, (one of RUNDIR_STATUS), RunResult)
+   """
+   rr = []
+   try:
+     dir_contents = ListVisibleFiles(dir_name)
+   except OSError, err:
+     logging.warning("RunParts: skipping %s (cannot list: %s)", dir_name, err)
+     return rr
+   for relname in sorted(dir_contents):
+     fname = PathJoin(dir_name, relname)
+     if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
+             constants.EXT_PLUGIN_MASK.match(relname) is not None):
+       rr.append((relname, constants.RUNPARTS_SKIP, None))
+     else:
+       try:
+         result = RunCmd([fname], env=env, reset_env=reset_env)
+       except Exception, err: # pylint: disable-msg=W0703
+         rr.append((relname, constants.RUNPARTS_ERR, str(err)))
+       else:
+         rr.append((relname, constants.RUNPARTS_RUN, result))
+   return rr
  def RemoveFile(filename):
    """Remove a file ignoring some errors.
  
@@@ -824,6 -617,8 +870,8 @@@ class HostInfo
    """Class implementing resolver and hostname functionality
  
    """
+   _VALID_NAME_RE = re.compile("^[a-z0-9._-]{1,255}$")
    def __init__(self, name=None):
      """Initialize the host name object.
  
  
      return result
  
+   @classmethod
+   def NormalizeName(cls, hostname):
+     """Validate and normalize the given hostname.
+     @attention: the validation is a bit more relaxed than the standards
+         require; most importantly, we allow underscores in names
+     @raise errors.OpPrereqError: when the name is not valid
+     """
+     hostname = hostname.lower()
+     if (not cls._VALID_NAME_RE.match(hostname) or
+         # double-dots, meaning empty label
+         ".." in hostname or
+         # empty initial label
+         hostname.startswith(".")):
+       raise errors.OpPrereqError("Invalid hostname '%s'" % hostname,
+                                  errors.ECODE_INVAL)
+     if hostname.endswith("."):
+       hostname = hostname.rstrip(".")
+     return hostname
  
  def GetHostInfo(name=None):
    """Lookup host name and raise an OpPrereqError for failures"""
@@@ -1303,6 -1119,16 +1372,16 @@@ def RemoveHostFromEtcHosts(hostname)
    RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
  
  
+ def TimestampForFilename():
+   """Returns the current time formatted for filenames.
+   The format doesn't contain colons as some shells and applications them as
+   separators.
+   """
+   return time.strftime("%Y-%m-%d_%H_%M_%S")
  def CreateBackup(file_name):
    """Creates a backup of a file.
  
      raise errors.ProgrammerError("Can't make a backup of a non-file '%s'" %
                                  file_name)
  
-   prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time()))
+   prefix = ("%s.backup-%s." %
+             (os.path.basename(file_name), TimestampForFilename()))
    dir_name = os.path.dirname(file_name)
  
    fsrc = open(file_name, 'rb')
      (fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name)
      fdst = os.fdopen(fd, 'wb')
      try:
+       logging.debug("Backing up %s at %s", file_name, backup_name)
        shutil.copyfileobj(fsrc, fdst)
      finally:
        fdst.close()
@@@ -1430,8 -1258,12 +1511,12 @@@ def ListVisibleFiles(path)
    @param path: the directory to enumerate
    @rtype: list
    @return: the list of all files not starting with a dot
+   @raise ProgrammerError: if L{path} is not an absolue and normalized path
  
    """
+   if not IsNormAbsPath(path):
+     raise errors.ProgrammerError("Path passed to ListVisibleFiles is not"
+                                  " absolute/normalized: '%s'" % path)
    files = [i for i in os.listdir(path) if not i.startswith(".")]
    files.sort()
    return files
@@@ -1659,6 -1491,12 +1744,12 @@@ def any(seq, pred=bool): # pylint: disa
    return False
  
  
+ def partition(seq, pred=bool): # # pylint: disable-msg=W0622
+   "Partition a list in two, based on the given predicate"
+   return (list(itertools.ifilter(pred, seq)),
+           list(itertools.ifilterfalse(pred, seq)))
  def UniqueSequence(seq):
    """Returns a list with unique elements.
  
@@@ -1816,7 -1654,20 +1907,20 @@@ def DaemonPidFileName(name)
        daemon name
  
    """
-   return os.path.join(constants.RUN_GANETI_DIR, "%s.pid" % name)
+   return PathJoin(constants.RUN_GANETI_DIR, "%s.pid" % name)
+ def EnsureDaemon(name):
+   """Check for and start daemon if not alive.
+   """
+   result = RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
+   if result.failed:
+     logging.error("Can't start daemon '%s', failure %s, output: %s",
+                   name, result.fail_reason, result.output)
+     return False
+   return True
  
  
  def WritePidFile(name):
@@@ -1944,6 -1795,7 +2048,7 @@@ def FindFile(name, search_path, test=os
      return None
  
    for dir_name in search_path:
+     # FIXME: investigate switch to PathJoin
      item_name = os.path.sep.join([dir_name, name])
      # check the user test and that we're indeed resolving to the given
      # basename
@@@ -2132,6 -1984,36 +2237,36 @@@ def IsNormAbsPath(path)
    return os.path.normpath(path) == path and os.path.isabs(path)
  
  
+ def PathJoin(*args):
+   """Safe-join a list of path components.
+   Requirements:
+       - the first argument must be an absolute path
+       - no component in the path must have backtracking (e.g. /../),
+         since we check for normalization at the end
+   @param args: the path components to be joined
+   @raise ValueError: for invalid paths
+   """
+   # ensure we're having at least one path passed in
+   assert args
+   # ensure the first component is an absolute and normalized path name
+   root = args[0]
+   if not IsNormAbsPath(root):
+     raise ValueError("Invalid parameter to PathJoin: '%s'" % str(args[0]))
+   result = os.path.join(*args)
+   # ensure that the whole path is normalized
+   if not IsNormAbsPath(result):
+     raise ValueError("Invalid parameters to PathJoin: '%s'" % str(args))
+   # check that we're still under the original prefix
+   prefix = os.path.commonprefix([root, result])
+   if prefix != root:
+     raise ValueError("Error: path joining resulted in different prefix"
+                      " (%s != %s)" % (prefix, root))
+   return result
  def TailFile(fname, lines=20):
    """Return the last lines from a file.
  
    return rows[-lines:]
  
  
+ def _ParseAsn1Generalizedtime(value):
+   """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
+   @type value: string
+   @param value: ASN1 GENERALIZEDTIME timestamp
+   """
+   m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
+   if m:
+     # We have an offset
+     asn1time = m.group(1)
+     hours = int(m.group(2))
+     minutes = int(m.group(3))
+     utcoffset = (60 * hours) + minutes
+   else:
+     if not value.endswith("Z"):
+       raise ValueError("Missing timezone")
+     asn1time = value[:-1]
+     utcoffset = 0
+   parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
+   tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
+   return calendar.timegm(tt.utctimetuple())
+ def GetX509CertValidity(cert):
+   """Returns the validity period of the certificate.
+   @type cert: OpenSSL.crypto.X509
+   @param cert: X509 certificate object
+   """
+   # The get_notBefore and get_notAfter functions are only supported in
+   # pyOpenSSL 0.7 and above.
+   try:
+     get_notbefore_fn = cert.get_notBefore
+   except AttributeError:
+     not_before = None
+   else:
+     not_before_asn1 = get_notbefore_fn()
+     if not_before_asn1 is None:
+       not_before = None
+     else:
+       not_before = _ParseAsn1Generalizedtime(not_before_asn1)
+   try:
+     get_notafter_fn = cert.get_notAfter
+   except AttributeError:
+     not_after = None
+   else:
+     not_after_asn1 = get_notafter_fn()
+     if not_after_asn1 is None:
+       not_after = None
+     else:
+       not_after = _ParseAsn1Generalizedtime(not_after_asn1)
+   return (not_before, not_after)
  def SafeEncode(text):
    """Return a 'safe' version of a source string.
  
@@@ -2272,7 -2217,7 +2470,7 @@@ def CalculateDirectorySize(path)
  
    for (curpath, _, files) in os.walk(path):
      for filename in files:
-       st = os.lstat(os.path.join(curpath, filename))
+       st = os.lstat(PathJoin(curpath, filename))
        size += st.st_size
  
    return BytesToMebibyte(size)
@@@ -2294,15 -2239,15 +2492,15 @@@ def GetFilesystemStats(path)
    return (tsize, fsize)
  
  
- def RunInSeparateProcess(fn):
+ def RunInSeparateProcess(fn, *args):
    """Runs a function in a separate process.
  
    Note: Only boolean return values are supported.
  
    @type fn: callable
    @param fn: Function to be called
-   @rtype: tuple of (int/None, int/None)
-   @return: Exit code and signal number
+   @rtype: bool
+   @return: Function's result
  
    """
    pid = os.fork()
        ResetTempfileModule()
  
        # Call function
-       result = int(bool(fn()))
+       result = int(bool(fn(*args)))
        assert result in (0, 1)
      except: # pylint: disable-msg=W0702
        logging.exception("Error while calling function in separate process")
@@@ -2580,70 -2525,35 +2778,84 @@@ def Retry(fn, delay, timeout, args=None
          wait_fn(current_delay)
  
  
 +def GetClosedTempfile(*args, **kwargs):
 +  """Creates a temporary file and returns its path.
 +
 +  """
 +  (fd, path) = tempfile.mkstemp(*args, **kwargs)
 +  _CloseFDNoErr(fd)
 +  return path
 +
 +
 +def GenerateSelfSignedX509Cert(common_name, validity):
 +  """Generates a self-signed X509 certificate.
 +
 +  @type common_name: string
 +  @param common_name: commonName value
 +  @type validity: int
 +  @param validity: Validity for certificate in seconds
 +
 +  """
 +  # Create private and public key
 +  key = OpenSSL.crypto.PKey()
 +  key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
 +
 +  # Create self-signed certificate
 +  cert = OpenSSL.crypto.X509()
 +  if common_name:
 +    cert.get_subject().CN = common_name
 +  cert.set_serial_number(1)
 +  cert.gmtime_adj_notBefore(0)
 +  cert.gmtime_adj_notAfter(validity)
 +  cert.set_issuer(cert.get_subject())
 +  cert.set_pubkey(key)
 +  cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
 +
 +  key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
 +  cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
 +
 +  return (key_pem, cert_pem)
 +
 +
 +def GenerateSelfSignedSslCert(filename, validity=(5 * 365)):
 +  """Legacy function to generate self-signed X509 certificate.
 +
 +  """
 +  (key_pem, cert_pem) = GenerateSelfSignedX509Cert(None,
 +                                                   validity * 24 * 60 * 60)
 +
 +  WriteFile(filename, mode=0400, data=key_pem + cert_pem)
 +
 +
  class FileLock(object):
    """Utility class for file locks.
  
    """
-   def __init__(self, filename):
+   def __init__(self, fd, filename):
      """Constructor for FileLock.
  
-     This will open the file denoted by the I{filename} argument.
+     @type fd: file
+     @param fd: File object
      @type filename: str
-     @param filename: path to the file to be locked
+     @param filename: Path of the file opened at I{fd}
  
      """
+     self.fd = fd
      self.filename = filename
-     self.fd = open(self.filename, "w")
+   @classmethod
+   def Open(cls, filename):
+     """Creates and opens a file to be used as a file-based lock.
+     @type filename: string
+     @param filename: path to the file to be locked
+     """
+     # Using "os.open" is necessary to allow both opening existing file
+     # read/write and creating if not existing. Vanilla "open" will truncate an
+     # existing file -or- allow creating if not existing.
+     return cls(os.fdopen(os.open(filename, os.O_RDWR | os.O_CREAT), "w+"),
+                filename)
  
    def __del__(self):
      self.Close()
      assert self.fd, "Lock was closed"
      assert timeout is None or timeout >= 0, \
        "If specified, timeout must be positive"
+     assert not (flag & fcntl.LOCK_NB), "LOCK_NB must not be set"
  
-     if timeout is not None:
+     # When a timeout is used, LOCK_NB must always be set
+     if not (timeout is None and blocking):
        flag |= fcntl.LOCK_NB
-       timeout_end = time.time() + timeout
  
-     # Blocking doesn't have effect with timeout
-     elif not blocking:
-       flag |= fcntl.LOCK_NB
-       timeout_end = None
+     if timeout is None:
+       self._Lock(self.fd, flag, timeout)
+     else:
+       try:
+         Retry(self._Lock, (0.1, 1.2, 1.0), timeout,
+               args=(self.fd, flag, timeout))
+       except RetryTimeout:
+         raise errors.LockError(errmsg)
  
-     # TODO: Convert to utils.Retry
+   @staticmethod
+   def _Lock(fd, flag, timeout):
+     try:
+       fcntl.flock(fd, flag)
+     except IOError, err:
+       if timeout is not None and err.errno == errno.EAGAIN:
+         raise RetryAgain()
  
-     retry = True
-     while retry:
-       try:
-         fcntl.flock(self.fd, flag)
-         retry = False
-       except IOError, err:
-         if err.errno in (errno.EAGAIN, ):
-           if timeout_end is not None and time.time() < timeout_end:
-             # Wait before trying again
-             time.sleep(max(0.1, min(1.0, timeout)))
-           else:
-             raise errors.LockError(errmsg)
-         else:
-           logging.exception("fcntl.flock failed")
-           raise
+       logging.exception("fcntl.flock failed")
+       raise
  
    def Exclusive(self, blocking=False, timeout=None):
      """Locks the file in exclusive mode.
@@@ -2801,22 -2709,16 +3011,22 @@@ class SignalHandler(object)
    @ivar called: tracks whether any of the signals have been raised
  
    """
 -  def __init__(self, signum):
 +  def __init__(self, signum, handler_fn=None):
      """Constructs a new SignalHandler instance.
  
      @type signum: int or list of ints
      @param signum: Single signal number or set of signal numbers
 +    @type handler_fn: callable
 +    @param handler_fn: Signal handling function
  
      """
 +    assert handler_fn is None or callable(handler_fn)
 +
      self.signum = set(signum)
      self.called = False
  
 +    self._handler_fn = handler_fn
 +
      self._previous = {}
      try:
        for signum in self.signum:
      """
      self.called = False
  
 -  # we don't care about arguments, but we leave them named for the future
 -  def _HandleSignal(self, signum, frame): # pylint: disable-msg=W0613
 +  def _HandleSignal(self, signum, frame):
      """Actual signal handling function.
  
      """
      # solution in Python -- there are no atomic types.
      self.called = True
  
 +    if self._handler_fn:
 +      self._handler_fn(signum, frame)
 +
  
  class FieldSet(object):
    """A simple field set.
diff --combined qa/ganeti-qa.py
@@@ -88,6 -88,9 +88,9 @@@ def RunClusterTests()
    """Runs tests related to gnt-cluster.
  
    """
+   if qa_config.TestEnabled("cluster-renew-crypto"):
+     RunTest(qa_cluster.TestClusterRenewCrypto)
    if qa_config.TestEnabled('cluster-verify'):
      RunTest(qa_cluster.TestClusterVerify)
  
      RunTest(qa_rapi.TestVersion)
      RunTest(qa_rapi.TestEmptyCluster)
  
  def RunOsTests():
    """Runs all tests related to gnt-os.
  
    RunTest(qa_os.TestOsValid)
    RunTest(qa_os.TestOsInvalid)
    RunTest(qa_os.TestOsPartiallyValid)
+   RunTest(qa_os.TestOsModifyValid)
+   RunTest(qa_os.TestOsModifyInvalid)
  
  
  def RunCommonInstanceTests(instance):
@@@ -318,18 -324,6 +324,18 @@@ def main()
          finally:
            qa_config.ReleaseNode(snode)
  
 +    if (qa_config.TestEnabled('instance-add-plain-disk') and
 +        qa_config.TestEnabled("instance-export")):
 +      instance = RunTest(qa_instance.TestInstanceAddWithPlainDisk, pnode)
 +      expnode = qa_config.AcquireNode(exclude=pnode)
 +      try:
 +        RunTest(qa_instance.TestInstanceExportWithRemove, instance, expnode)
 +        RunTest(qa_instance.TestBackupList, expnode)
 +      finally:
 +        qa_config.ReleaseNode(expnode)
 +      del expnode
 +      del instance
 +
    finally:
      qa_config.ReleaseNode(pnode)
  
diff --combined qa/qa_cluster.py
  
  import tempfile
  
+ from ganeti import constants
 -from ganeti import bootstrap
  from ganeti import utils
  
  import qa_config
  import qa_utils
  import qa_error
  
- from qa_utils import AssertEqual, StartSSH
+ from qa_utils import AssertEqual, AssertNotEqual, StartSSH
  
  
  def _RemoveFileFromAllNodes(filename):
@@@ -144,6 -146,50 +145,50 @@@ def TestClusterVersion()
                         utils.ShellQuoteArgs(cmd)).wait(), 0)
  
  
+ def TestClusterRenewCrypto():
+   """gnt-cluster renew-crypto"""
+   master = qa_config.GetMasterNode()
+   # Conflicting options
+   cmd = ["gnt-cluster", "renew-crypto", "--force",
+          "--new-cluster-certificate", "--new-hmac-key",
+          "--new-rapi-certificate", "--rapi-certificate=/dev/null"]
+   AssertNotEqual(StartSSH(master["primary"],
+                           utils.ShellQuoteArgs(cmd)).wait(), 0)
+   # Invalid RAPI certificate
+   cmd = ["gnt-cluster", "renew-crypto", "--force",
+          "--rapi-certificate=/dev/null"]
+   AssertNotEqual(StartSSH(master["primary"],
+                           utils.ShellQuoteArgs(cmd)).wait(), 0)
+   # Custom RAPI certificate
+   fh = tempfile.NamedTemporaryFile()
+   # Ensure certificate doesn't cause "gnt-cluster verify" to complain
+   validity = constants.SSL_CERT_EXPIRATION_WARN * 3
 -  bootstrap.GenerateSelfSignedSslCert(fh.name, validity=validity)
++  utils.GenerateSelfSignedSslCert(fh.name, validity=validity)
+   tmpcert = qa_utils.UploadFile(master["primary"], fh.name)
+   try:
+     cmd = ["gnt-cluster", "renew-crypto", "--force",
+            "--rapi-certificate=%s" % tmpcert]
+     AssertEqual(StartSSH(master["primary"],
+                          utils.ShellQuoteArgs(cmd)).wait(), 0)
+   finally:
+     cmd = ["rm", "-f", tmpcert]
+     AssertEqual(StartSSH(master["primary"],
+                          utils.ShellQuoteArgs(cmd)).wait(), 0)
+   # Normal case
+   cmd = ["gnt-cluster", "renew-crypto", "--force",
+          "--new-cluster-certificate", "--new-hmac-key",
+          "--new-rapi-certificate"]
+   AssertEqual(StartSSH(master["primary"],
+                        utils.ShellQuoteArgs(cmd)).wait(), 0)
  def TestClusterBurnin():
    """Burnin"""
    master = qa_config.GetMasterNode()
@@@ -27,6 -27,7 +27,7 @@@ import tim
  import tempfile
  import os.path
  import os
+ import stat
  import md5
  import signal
  import socket
@@@ -34,8 -35,10 +35,11 @@@ import shuti
  import re
  import select
  import string
 +import fcntl
  import OpenSSL
+ import warnings
+ import distutils.version
+ import glob
  
  import ganeti
  import testutils
@@@ -48,10 -51,10 +52,10 @@@ from ganeti.utils import IsProcessAlive
       ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
       SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
       TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
-      UnescapeAndSplit
+      UnescapeAndSplit, RunParts, PathJoin, HostInfo
  
  from ganeti.errors import LockError, UnitParseError, GenericError, \
-      ProgrammerError
+      ProgrammerError, OpPrereqError
  
  
  class TestIsProcessAlive(unittest.TestCase):
@@@ -233,156 -236,137 +237,286 @@@ class TestRunCmd(testutils.GanetiTestCa
      cwd = os.getcwd()
      self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
  
+   def testResetEnv(self):
+     """Test environment reset functionality"""
+     self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
+ class TestRunParts(unittest.TestCase):
+   """Testing case for the RunParts function"""
+   def setUp(self):
+     self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
+   def tearDown(self):
+     shutil.rmtree(self.rundir)
+   def testEmpty(self):
+     """Test on an empty dir"""
+     self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
+   def testSkipWrongName(self):
+     """Test that wrong files are skipped"""
+     fname = os.path.join(self.rundir, "00test.dot")
+     utils.WriteFile(fname, data="")
+     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+     relname = os.path.basename(fname)
+     self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
+                          [(relname, constants.RUNPARTS_SKIP, None)])
+   def testSkipNonExec(self):
+     """Test that non executable files are skipped"""
+     fname = os.path.join(self.rundir, "00test")
+     utils.WriteFile(fname, data="")
+     relname = os.path.basename(fname)
+     self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
+                          [(relname, constants.RUNPARTS_SKIP, None)])
+   def testError(self):
+     """Test error on a broken executable"""
+     fname = os.path.join(self.rundir, "00test")
+     utils.WriteFile(fname, data="")
+     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+     (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
+     self.failUnlessEqual(relname, os.path.basename(fname))
+     self.failUnlessEqual(status, constants.RUNPARTS_ERR)
+     self.failUnless(error)
+   def testSorted(self):
+     """Test executions are sorted"""
+     files = []
+     files.append(os.path.join(self.rundir, "64test"))
+     files.append(os.path.join(self.rundir, "00test"))
+     files.append(os.path.join(self.rundir, "42test"))
+     for fname in files:
+       utils.WriteFile(fname, data="")
+     results = RunParts(self.rundir, reset_env=True)
+     for fname in sorted(files):
+       self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
+   def testOk(self):
+     """Test correct execution"""
+     fname = os.path.join(self.rundir, "00test")
+     utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
+     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+     (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
+     self.failUnlessEqual(relname, os.path.basename(fname))
+     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+     self.failUnlessEqual(runresult.stdout, "ciao")
+   def testRunFail(self):
+     """Test correct execution, with run failure"""
+     fname = os.path.join(self.rundir, "00test")
+     utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
+     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+     (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
+     self.failUnlessEqual(relname, os.path.basename(fname))
+     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+     self.failUnlessEqual(runresult.exit_code, 1)
+     self.failUnless(runresult.failed)
+   def testRunMix(self):
+     files = []
+     files.append(os.path.join(self.rundir, "00test"))
+     files.append(os.path.join(self.rundir, "42test"))
+     files.append(os.path.join(self.rundir, "64test"))
+     files.append(os.path.join(self.rundir, "99test"))
+     files.sort()
+     # 1st has errors in execution
+     utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
+     os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
+     # 2nd is skipped
+     utils.WriteFile(files[1], data="")
+     # 3rd cannot execute properly
+     utils.WriteFile(files[2], data="")
+     os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
+     # 4th execs
+     utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
+     os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
+     results = RunParts(self.rundir, reset_env=True)
+     (relname, status, runresult) = results[0]
+     self.failUnlessEqual(relname, os.path.basename(files[0]))
+     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+     self.failUnlessEqual(runresult.exit_code, 1)
+     self.failUnless(runresult.failed)
+     (relname, status, runresult) = results[1]
+     self.failUnlessEqual(relname, os.path.basename(files[1]))
+     self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
+     self.failUnlessEqual(runresult, None)
+     (relname, status, runresult) = results[2]
+     self.failUnlessEqual(relname, os.path.basename(files[2]))
+     self.failUnlessEqual(status, constants.RUNPARTS_ERR)
+     self.failUnless(runresult)
+     (relname, status, runresult) = results[3]
+     self.failUnlessEqual(relname, os.path.basename(files[3]))
+     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+     self.failUnlessEqual(runresult.output, "ciao")
+     self.failUnlessEqual(runresult.exit_code, 0)
+     self.failUnless(not runresult.failed)
  
 +class TestStartDaemon(testutils.GanetiTestCase):
 +  def setUp(self):
 +    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
 +    self.tmpfile = os.path.join(self.tmpdir, "test")
 +
 +  def tearDown(self):
 +    shutil.rmtree(self.tmpdir)
 +
 +  def testShell(self):
 +    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
 +    self._wait(self.tmpfile, 60.0, "Hello World")
 +
 +  def testShellOutput(self):
 +    utils.StartDaemon("echo Hello World", output=self.tmpfile)
 +    self._wait(self.tmpfile, 60.0, "Hello World")
 +
 +  def testNoShellNoOutput(self):
 +    utils.StartDaemon(["pwd"])
 +
 +  def testNoShellNoOutputTouch(self):
 +    testfile = os.path.join(self.tmpdir, "check")
 +    self.failIf(os.path.exists(testfile))
 +    utils.StartDaemon(["touch", testfile])
 +    self._wait(testfile, 60.0, "")
 +
 +  def testNoShellOutput(self):
 +    utils.StartDaemon(["pwd"], output=self.tmpfile)
 +    self._wait(self.tmpfile, 60.0, "/")
 +
 +  def testNoShellOutputCwd(self):
 +    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
 +    self._wait(self.tmpfile, 60.0, os.getcwd())
 +
 +  def testShellEnv(self):
 +    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
 +                      env={ "GNT_TEST_VAR": "Hello World", })
 +    self._wait(self.tmpfile, 60.0, "Hello World")
 +
 +  def testNoShellEnv(self):
 +    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
 +                      env={ "GNT_TEST_VAR": "Hello World", })
 +    self._wait(self.tmpfile, 60.0, "Hello World")
 +
 +  def testOutputFd(self):
 +    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
 +    try:
 +      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
 +    finally:
 +      os.close(fd)
 +    self._wait(self.tmpfile, 60.0, os.getcwd())
 +
 +  def testPid(self):
 +    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
 +    self._wait(self.tmpfile, 60.0, str(pid))
 +
 +  def testPidFile(self):
 +    pidfile = os.path.join(self.tmpdir, "pid")
 +    checkfile = os.path.join(self.tmpdir, "abort")
 +
 +    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
 +                            output=self.tmpfile)
 +    try:
 +      fd = os.open(pidfile, os.O_RDONLY)
 +      try:
 +        # Check file is locked
 +        self.assertRaises(errors.LockError, utils.LockFile, fd)
 +
 +        pidtext = os.read(fd, 100)
 +      finally:
 +        os.close(fd)
 +
 +      self.assertEqual(int(pidtext.strip()), pid)
 +
 +      self.assert_(utils.IsProcessAlive(pid))
 +    finally:
 +      # No matter what happens, kill daemon
 +      utils.KillProcess(pid, timeout=5.0, waitpid=False)
 +      self.failIf(utils.IsProcessAlive(pid))
 +
 +    self.assertEqual(utils.ReadFile(self.tmpfile), "")
 +
 +  def _wait(self, path, timeout, expected):
 +    # Due to the asynchronous nature of daemon processes, polling is necessary.
 +    # A timeout makes sure the test doesn't hang forever.
 +    def _CheckFile():
 +      if not (os.path.isfile(path) and
 +              utils.ReadFile(path).strip() == expected):
 +        raise utils.RetryAgain()
 +
 +    try:
 +      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
 +    except utils.RetryTimeout:
 +      self.fail("Apparently the daemon didn't run in %s seconds and/or"
 +                " didn't write the correct output" % timeout)
 +
 +  def testError(self):
 +    self.assertRaises(errors.OpExecError, utils.StartDaemon,
 +                      ["./does-NOT-EXIST/here/0123456789"])
 +    self.assertRaises(errors.OpExecError, utils.StartDaemon,
 +                      ["./does-NOT-EXIST/here/0123456789"],
 +                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
 +    self.assertRaises(errors.OpExecError, utils.StartDaemon,
 +                      ["./does-NOT-EXIST/here/0123456789"],
 +                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
 +    self.assertRaises(errors.OpExecError, utils.StartDaemon,
 +                      ["./does-NOT-EXIST/here/0123456789"],
 +                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
 +
 +    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
 +    try:
 +      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
 +                        ["./does-NOT-EXIST/here/0123456789"],
 +                        output=self.tmpfile, output_fd=fd)
 +    finally:
 +      os.close(fd)
 +
 +
 +class TestSetCloseOnExecFlag(unittest.TestCase):
 +  """Tests for SetCloseOnExecFlag"""
 +
 +  def setUp(self):
 +    self.tmpfile = tempfile.TemporaryFile()
 +
 +  def testEnable(self):
 +    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
 +    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
 +                    fcntl.FD_CLOEXEC)
 +
 +  def testDisable(self):
 +    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
 +    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
 +                fcntl.FD_CLOEXEC)
 +
 +
 +class TestSetNonblockFlag(unittest.TestCase):
 +  def setUp(self):
 +    self.tmpfile = tempfile.TemporaryFile()
 +
 +  def testEnable(self):
 +    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
 +    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
 +                    os.O_NONBLOCK)
 +
 +  def testDisable(self):
 +    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
 +    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
 +                os.O_NONBLOCK)
 +
 +
  class TestRemoveFile(unittest.TestCase):
    """Test case for the RemoveFile function"""
  
        os.unlink(self.tmpfile)
      os.rmdir(self.tmpdir)
  
 -
    def testIgnoreDirs(self):
      """Test that RemoveFile() ignores directories"""
      self.assertEqual(None, RemoveFile(self.tmpdir))
  
 -
    def testIgnoreNotExisting(self):
      """Test that RemoveFile() ignores non-existing files"""
      RemoveFile(self.tmpfile)
      RemoveFile(self.tmpfile)
  
 -
    def testRemoveFile(self):
      """Test that RemoveFile does remove a file"""
      RemoveFile(self.tmpfile)
      if os.path.exists(self.tmpfile):
        self.fail("File '%s' not removed" % self.tmpfile)
  
 -
    def testRemoveSymlink(self):
      """Test that RemoveFile does remove symlinks"""
      symlink = self.tmpdir + "/symlink"
@@@ -526,6 -514,55 +660,55 @@@ class TestMatchNameComponent(unittest.T
                       None)
  
  
+ class TestTimestampForFilename(unittest.TestCase):
+   def test(self):
+     self.assert_("." not in utils.TimestampForFilename())
+     self.assert_(":" not in utils.TimestampForFilename())
+ class TestCreateBackup(testutils.GanetiTestCase):
+   def setUp(self):
+     testutils.GanetiTestCase.setUp(self)
+     self.tmpdir = tempfile.mkdtemp()
+   def tearDown(self):
+     testutils.GanetiTestCase.tearDown(self)
+     shutil.rmtree(self.tmpdir)
+   def testEmpty(self):
+     filename = utils.PathJoin(self.tmpdir, "config.data")
+     utils.WriteFile(filename, data="")
+     bname = utils.CreateBackup(filename)
+     self.assertFileContent(bname, "")
+     self.assertEqual(len(glob.glob("%s*" % filename)), 2)
+     utils.CreateBackup(filename)
+     self.assertEqual(len(glob.glob("%s*" % filename)), 3)
+     utils.CreateBackup(filename)
+     self.assertEqual(len(glob.glob("%s*" % filename)), 4)
+     fifoname = utils.PathJoin(self.tmpdir, "fifo")
+     os.mkfifo(fifoname)
+     self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
+   def testContent(self):
+     bkpcount = 0
+     for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
+       for rep in [1, 2, 10, 127]:
+         testdata = data * rep
+         filename = utils.PathJoin(self.tmpdir, "test.data_")
+         utils.WriteFile(filename, data=testdata)
+         self.assertFileContent(filename, testdata)
+         for _ in range(3):
+           bname = utils.CreateBackup(filename)
+           bkpcount += 1
+           self.assertFileContent(bname, testdata)
+           self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
  class TestFormatUnit(unittest.TestCase):
    """Test case for the FormatUnit function"""
  
@@@ -913,6 -950,13 +1096,13 @@@ class TestListVisibleFiles(unittest.Tes
      expected = ["a", "b"]
      self._test(files, expected)
  
+   def testNonAbsolutePath(self):
+     self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
+   def testNonNormalizedPath(self):
+     self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
+                           "/bin/../tmp")
  
  class TestNewUUID(unittest.TestCase):
    """Test case for NewUUID"""
@@@ -1001,13 -1045,9 +1191,9 @@@ class TestTailFile(testutils.GanetiTest
        self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
  
  
- class TestFileLock(unittest.TestCase):
+ class _BaseFileLockTest:
    """Test case for the FileLock class"""
  
-   def setUp(self):
-     self.tmpfile = tempfile.NamedTemporaryFile()
-     self.lock = utils.FileLock(self.tmpfile.name)
    def testSharedNonblocking(self):
      self.lock.Shared(blocking=False)
      self.lock.Close()
      self.lock.Unlock(blocking=False)
      self.lock.Close()
  
+   def testSimpleTimeout(self):
+     # These will succeed on the first attempt, hence a short timeout
+     self.lock.Shared(blocking=True, timeout=10.0)
+     self.lock.Exclusive(blocking=False, timeout=10.0)
+     self.lock.Unlock(blocking=True, timeout=10.0)
+     self.lock.Close()
+   @staticmethod
+   def _TryLockInner(filename, shared, blocking):
+     lock = utils.FileLock.Open(filename)
+     if shared:
+       fn = lock.Shared
+     else:
+       fn = lock.Exclusive
+     try:
+       # The timeout doesn't really matter as the parent process waits for us to
+       # finish anyway.
+       fn(blocking=blocking, timeout=0.01)
+     except errors.LockError, err:
+       return False
+     return True
+   def _TryLock(self, *args):
+     return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
+                                       *args)
+   def testTimeout(self):
+     for blocking in [True, False]:
+       self.lock.Exclusive(blocking=True)
+       self.failIf(self._TryLock(False, blocking))
+       self.failIf(self._TryLock(True, blocking))
+       self.lock.Shared(blocking=True)
+       self.assert_(self._TryLock(True, blocking))
+       self.failIf(self._TryLock(False, blocking))
    def testCloseShared(self):
      self.lock.Close()
      self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
      self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
  
  
+ class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
+   TESTDATA = "Hello World\n" * 10
+   def setUp(self):
+     testutils.GanetiTestCase.setUp(self)
+     self.tmpfile = tempfile.NamedTemporaryFile()
+     utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
+     self.lock = utils.FileLock.Open(self.tmpfile.name)
+     # Ensure "Open" didn't truncate file
+     self.assertFileContent(self.tmpfile.name, self.TESTDATA)
+   def tearDown(self):
+     self.assertFileContent(self.tmpfile.name, self.TESTDATA)
+     testutils.GanetiTestCase.tearDown(self)
+ class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
+   def setUp(self):
+     self.tmpfile = tempfile.NamedTemporaryFile()
+     self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
  class TestTimeFunctions(unittest.TestCase):
    """Test case for time functions"""
  
@@@ -1148,7 -1252,7 +1398,7 @@@ class TestForceDictType(unittest.TestCa
  
  
  class TestIsAbsNormPath(unittest.TestCase):
 -  """Testing case for IsProcessAlive"""
 +  """Testing case for IsNormAbsPath"""
  
    def _pathTestHelper(self, path, result):
      if result:
@@@ -1209,6 -1313,13 +1459,13 @@@ class RunInSeparateProcess(unittest.Tes
  
        self.assertEqual(exp, utils.RunInSeparateProcess(_child))
  
+   def testArgs(self):
+     for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
+       def _child(carg1, carg2):
+         return carg1 == "Foo" and carg2 == arg
+       self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
    def testPid(self):
      parent_pid = os.getpid()
  
@@@ -1276,52 -1387,121 +1533,168 @@@ class TestUnescapeAndSplit(unittest.Tes
        self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
  
  
 +class TestGenerateSelfSignedX509Cert(unittest.TestCase):
 +  def setUp(self):
 +    self.tmpdir = tempfile.mkdtemp()
 +
 +  def tearDown(self):
 +    shutil.rmtree(self.tmpdir)
 +
 +  def _checkRsaPrivateKey(self, key):
 +    lines = key.splitlines()
 +    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
 +            "-----END RSA PRIVATE KEY-----" in lines)
 +
 +  def _checkCertificate(self, cert):
 +    lines = cert.splitlines()
 +    return ("-----BEGIN CERTIFICATE-----" in lines and
 +            "-----END CERTIFICATE-----" in lines)
 +
 +  def test(self):
 +    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
 +      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
 +      self._checkRsaPrivateKey(key_pem)
 +      self._checkCertificate(cert_pem)
 +
 +      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
 +                                           key_pem)
 +      self.assert_(key.bits() >= 1024)
 +      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
 +      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
 +
 +      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
 +                                             cert_pem)
 +      self.failIf(x509.has_expired())
 +      self.assertEqual(x509.get_issuer().CN, common_name)
 +      self.assertEqual(x509.get_subject().CN, common_name)
 +      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
 +
 +  def testLegacy(self):
 +    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
 +
 +    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
 +
 +    cert1 = utils.ReadFile(cert1_filename)
 +
 +    self.assert_(self._checkRsaPrivateKey(cert1))
 +    self.assert_(self._checkCertificate(cert1))
 +
 +
+ class TestPathJoin(unittest.TestCase):
+   """Testing case for PathJoin"""
+   def testBasicItems(self):
+     mlist = ["/a", "b", "c"]
+     self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
+   def testNonAbsPrefix(self):
+     self.failUnlessRaises(ValueError, PathJoin, "a", "b")
+   def testBackTrack(self):
+     self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
+   def testMultiAbs(self):
+     self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
+ class TestHostInfo(unittest.TestCase):
+   """Testing case for HostInfo"""
+   def testUppercase(self):
+     data = "AbC.example.com"
+     self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
+   def testTooLongName(self):
+     data = "a.b." + "c" * 255
+     self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
+   def testTrailingDot(self):
+     data = "a.b.c"
+     self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
+   def testInvalidName(self):
+     data = [
+       "a b",
+       "a/b",
+       ".a.b",
+       "a..b",
+       ]
+     for value in data:
+       self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
+   def testValidName(self):
+     data = [
+       "a.b",
+       "a-b",
+       "a_b",
+       "a.b.c",
+       ]
+     for value in data:
+       HostInfo.NormalizeName(value)
+ class TestParseAsn1Generalizedtime(unittest.TestCase):
+   def test(self):
+     # UTC
+     self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
+     self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
+                      1266860512)
+     self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
+                      (2**31) - 1)
+     # With offset
+     self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
+                      1266860512)
+     self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
+                      1266931012)
+     self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
+                      1266931088)
+     self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
+                      1266931295)
+     self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
+                      3600)
+     # Leap seconds are not supported by datetime.datetime
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                       "19841231235960+0000")
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                       "19920630235960+0000")
+     # Errors
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                       "20100222174152")
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                       "Mon Feb 22 17:47:02 UTC 2010")
+     self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+                       "2010-02-22 17:42:02")
+ class TestGetX509CertValidity(testutils.GanetiTestCase):
+   def setUp(self):
+     testutils.GanetiTestCase.setUp(self)
+     pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
+     # Test whether we have pyOpenSSL 0.7 or above
+     self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
+     if not self.pyopenssl0_7:
+       warnings.warn("This test requires pyOpenSSL 0.7 or above to"
+                     " function correctly")
+   def _LoadCert(self, name):
+     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                            self._ReadTestData(name))
+   def test(self):
+     validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
+     if self.pyopenssl0_7:
+       self.assertEqual(validity, (1266919967, 1267524767))
+     else:
+       self.assertEqual(validity, (None, None))
  if __name__ == '__main__':
    testutils.GanetiTestProgram()