Add RPC calls to import and export instance data
authorMichael Hanselmann <hansmi@google.com>
Fri, 16 Apr 2010 12:47:47 +0000 (14:47 +0200)
committerMichael Hanselmann <hansmi@google.com>
Fri, 16 Apr 2010 13:24:35 +0000 (15:24 +0200)
These RPC calls can be used to start, monitor and stop the instance data
import/export daemon.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

daemons/ganeti-noded
lib/backend.py
lib/constants.py
lib/rpc.py

index 922d315..7a5ab7a 100755 (executable)
@@ -93,6 +93,21 @@ def _RequireJobQueueLock(fn):
   return wrapper
 
 
+def _DecodeImportExportIO(ieio, ieioargs):
+  """Decodes import/export I/O information.
+
+  """
+  if ieio == constants.IEIO_RAW_DISK:
+    assert len(ieioargs) == 1
+    return (objects.Disk.FromDict(ieioargs[0]), )
+
+  if ieio == constants.IEIO_SCRIPT:
+    assert len(ieioargs) == 2
+    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
+
+  return ieioargs
+
+
 class NodeHttpServer(http.server.HttpServer):
   """The server implementation.
 
@@ -838,6 +853,50 @@ class NodeHttpServer(http.server.HttpServer):
     (name, ) = params
     return backend.RemoveX509Certificate(name)
 
+  # Import and export
+
+  @staticmethod
+  def perspective_start_import_listener(params):
+    """Starts an import daemon.
+
+    """
+    (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
+    return backend.StartImportExportDaemon(constants.IEM_IMPORT,
+                                           x509_key_name, source_x509_ca,
+                                           None, None,
+                                           objects.Instance.FromDict(instance),
+                                           dest,
+                                           _DecodeImportExportIO(dest,
+                                                                 dest_args))
+  @staticmethod
+  def perspective_start_export(params):
+    """Starts an export daemon.
+
+    """
+    (x509_key_name, dest_x509_ca, host, port, instance,
+     source, source_args) = params
+    return backend.StartImportExportDaemon(constants.IEM_EXPORT,
+                                           x509_key_name, dest_x509_ca,
+                                           host, port,
+                                           objects.Instance.FromDict(instance),
+                                           source,
+                                           _DecodeImportExportIO(source,
+                                                                 source_args))
+
+  @staticmethod
+  def perspective_get_import_export_status(params):
+    """Retrieves the status of an import or export daemon.
+
+    """
+    return backend.GetImportExportStatus(params[0])
+
+  @staticmethod
+  def perspective_cleanup_import_export(params):
+    """Cleans up after an import or export.
+
+    """
+    return backend.CleanupImportExport(params[0])
+
 
 def CheckNoded(_, args):
   """Initial checks whether to run or exit with a failure.
@@ -889,6 +948,7 @@ def main():
   dirs.append((constants.LOG_OS_DIR, 0750))
   dirs.append((constants.LOCK_DIR, 1777))
   dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
+  dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
                      default_ssl_cert=constants.NODED_CERT_FILE,
                      default_ssl_key=constants.NODED_CERT_FILE)
index 1271681..29a2c5b 100644 (file)
@@ -56,6 +56,7 @@ from ganeti import constants
 from ganeti import bdev
 from ganeti import objects
 from ganeti import ssconf
+from ganeti import serializer
 
 
 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
@@ -68,6 +69,9 @@ _ALLOWED_CLEAN_DIRS = frozenset([
 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
 _X509_KEY_FILE = "key"
 _X509_CERT_FILE = "cert"
+_IES_STATUS_FILE = "status"
+_IES_PID_FILE = "pid"
+_IES_CA_FILE = "ca"
 
 
 class RPCFail(Exception):
@@ -832,6 +836,7 @@ def _InstanceLogName(kind, os_name, instance):
   @param instance: the name of the instance being imported/added/etc.
 
   """
+  # TODO: Use tempfile.mkstemp to create unique filename
   base = ("%s-%s-%s-%s.log" %
           (kind, os_name, instance, utils.TimestampForFilename()))
   return utils.PathJoin(constants.LOG_OS_DIR, base)
@@ -2597,6 +2602,282 @@ def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
           cert_dir, err)
 
 
+def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
+  """Returns the command for the requested input/output.
+
+  @type instance: L{objects.Instance}
+  @param instance: The instance object
+  @param mode: Import/export mode
+  @param ieio: Input/output type
+  @param ieargs: Input/output arguments
+
+  """
+  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
+
+  env = None
+  prefix = None
+  suffix = None
+
+  if ieio == constants.IEIO_FILE:
+    (filename, ) = ieargs
+
+    if not utils.IsNormAbsPath(filename):
+      _Fail("Path '%s' is not normalized or absolute", filename)
+
+    directory = os.path.normpath(os.path.dirname(filename))
+
+    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
+        constants.EXPORT_DIR):
+      _Fail("File '%s' is not under exports directory '%s'",
+            filename, constants.EXPORT_DIR)
+
+    # Create directory
+    utils.Makedirs(directory, mode=0750)
+
+    quoted_filename = utils.ShellQuote(filename)
+
+    if mode == constants.IEM_IMPORT:
+      suffix = "> %s" % quoted_filename
+    elif mode == constants.IEM_EXPORT:
+      suffix = "< %s" % quoted_filename
+
+  elif ieio == constants.IEIO_RAW_DISK:
+    (disk, ) = ieargs
+
+    real_disk = _OpenRealBD(disk)
+
+    if mode == constants.IEM_IMPORT:
+      # we set here a smaller block size as, due to transport buffering, more
+      # than 64-128k will mostly ignored; we use nocreat to fail if the device
+      # is not already there or we pass a wrong path; we use notrunc to no
+      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
+      # much memory; this means that at best, we flush every 64k, which will
+      # not be very fast
+      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
+                                    " bs=%s oflag=dsync"),
+                                    real_disk.dev_path,
+                                    str(64 * 1024))
+
+    elif mode == constants.IEM_EXPORT:
+      # the block size on the read dd is 1MiB to match our units
+      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
+                                   real_disk.dev_path,
+                                   str(1024 * 1024), # 1 MB
+                                   str(disk.size))
+
+  elif ieio == constants.IEIO_SCRIPT:
+    (disk, disk_index, ) = ieargs
+
+    assert isinstance(disk_index, (int, long))
+
+    real_disk = _OpenRealBD(disk)
+
+    inst_os = OSFromDisk(instance.os)
+    env = OSEnvironment(instance, inst_os)
+
+    if mode == constants.IEM_IMPORT:
+      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
+      env["IMPORT_INDEX"] = str(disk_index)
+      script = inst_os.import_script
+
+    elif mode == constants.IEM_EXPORT:
+      env["EXPORT_DEVICE"] = real_disk.dev_path
+      env["EXPORT_INDEX"] = str(disk_index)
+      script = inst_os.export_script
+
+    # TODO: Pass special environment only to script
+    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
+
+    if mode == constants.IEM_IMPORT:
+      suffix = "| %s" % script_cmd
+
+    elif mode == constants.IEM_EXPORT:
+      prefix = "%s |" % script_cmd
+
+  else:
+    _Fail("Invalid %s I/O mode %r", mode, ieio)
+
+  return (env, prefix, suffix)
+
+
+def _CreateImportExportStatusDir(prefix):
+  """Creates status directory for import/export.
+
+  """
+  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
+                          prefix=("%s-%s-" %
+                                  (prefix, utils.TimestampForFilename())))
+
+
+def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
+                            ieio, ieioargs):
+  """Starts an import or export daemon.
+
+  @param mode: Import/output mode
+  @type key_name: string
+  @param key_name: RSA key name (None to use cluster certificate)
+  @type ca: string:
+  @param ca: Remote CA in PEM format (None to use cluster certificate)
+  @type host: string
+  @param host: Remote host for export (None for import)
+  @type port: int
+  @param port: Remote port for export (None for import)
+  @type instance: L{objects.Instance}
+  @param instance: Instance object
+  @param ieio: Input/output type
+  @param ieioargs: Input/output arguments
+
+  """
+  if mode == constants.IEM_IMPORT:
+    prefix = "import"
+
+    if not (host is None and port is None):
+      _Fail("Can not specify host or port on import")
+
+  elif mode == constants.IEM_EXPORT:
+    prefix = "export"
+
+    if host is None or port is None:
+      _Fail("Host and port must be specified for an export")
+
+  else:
+    _Fail("Invalid mode %r", mode)
+
+  if (key_name is None) ^ (ca is None):
+    _Fail("Cluster certificate can only be used for both key and CA")
+
+  (cmd_env, cmd_prefix, cmd_suffix) = \
+    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
+
+  if key_name is None:
+    # Use server.pem
+    key_path = constants.NODED_CERT_FILE
+    cert_path = constants.NODED_CERT_FILE
+    assert ca is None
+  else:
+    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
+                                                 key_name)
+    assert ca is not None
+
+  status_dir = _CreateImportExportStatusDir(prefix)
+  try:
+    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
+    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
+
+    if ca is None:
+      # Use server.pem
+      # TODO: If socat runs as a non-root user, this might need to be copied to
+      # a separate file
+      ca_path = constants.NODED_CERT_FILE
+    else:
+      ca_path = utils.PathJoin(status_dir, _IES_CA_FILE)
+      utils.WriteFile(ca_path, data=ca, mode=0400)
+
+    cmd = [
+      constants.IMPORT_EXPORT_DAEMON,
+      status_file, mode,
+      "--key=%s" % key_path,
+      "--cert=%s" % cert_path,
+      "--ca=%s" % ca_path,
+      ]
+
+    if host:
+      cmd.append("--host=%s" % host)
+
+    if port:
+      cmd.append("--port=%s" % port)
+
+    if cmd_prefix:
+      cmd.append("--cmd-prefix=%s" % cmd_prefix)
+
+    if cmd_suffix:
+      cmd.append("--cmd-suffix=%s" % cmd_suffix)
+
+    logfile = _InstanceLogName(prefix, instance.os, instance.name)
+
+    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
+    # support for receiving a file descriptor for output
+    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
+                      output=logfile)
+
+    # The import/export name is simply the status directory name
+    return os.path.basename(status_dir)
+
+  except Exception:
+    shutil.rmtree(status_dir, ignore_errors=True)
+    raise
+
+
+def GetImportExportStatus(names):
+  """Returns import/export daemon status.
+
+  @type names: sequence
+  @param names: List of names
+  @rtype: List of dicts
+  @return: Returns a list of the state of each named import/export or None if a
+           status couldn't be read
+
+  """
+  result = []
+
+  for name in names:
+    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
+                                 _IES_STATUS_FILE)
+
+    try:
+      data = utils.ReadFile(status_file)
+    except EnvironmentError, err:
+      if err.errno != errno.ENOENT:
+        raise
+      data = None
+
+    if not data:
+      result.append(None)
+      continue
+
+    result.append(serializer.LoadJson(data))
+
+  return result
+
+
+def CleanupImportExport(name):
+  """Cleanup after an import or export.
+
+  If the import/export daemon is still running it's killed. Afterwards the
+  whole status directory is removed.
+
+  """
+  logging.info("Finalizing import/export %s", name)
+
+  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
+  pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
+
+  pid = None
+  try:
+    fd = os.open(pid_file, os.O_RDONLY)
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      raise
+    # PID file doesn't exist
+  else:
+    try:
+      try:
+        # Try to acquire lock
+        utils.LockFile(fd)
+      except errors.LockError:
+        # Couldn't lock, daemon is running
+        pid = int(os.read(fd, 100))
+    finally:
+      os.close(fd)
+
+  if pid:
+    logging.info("Import/export %s is still running with PID %s",
+                 name, pid)
+    utils.KillProcess(pid, waitpid=False)
+
+  shutil.rmtree(status_dir, ignore_errors=True)
+
+
 def _FindDisks(nodes_ip, disks):
   """Sets the physical ID on disks and returns the block devices.
 
index b38a5bd..6a35a42 100644 (file)
@@ -93,6 +93,8 @@ SOCKET_DIR = RUN_GANETI_DIR + "/socket"
 SOCKET_DIR_MODE = 0700
 CRYPTO_KEYS_DIR = RUN_GANETI_DIR + "/crypto"
 CRYPTO_KEYS_DIR_MODE = 0700
+IMPORT_EXPORT_DIR = RUN_GANETI_DIR + "/import-export"
+IMPORT_EXPORT_DIR_MODE = 0755
 # keep RUN_GANETI_DIR first here, to make sure all get created when the node
 # daemon is started (this takes care of RUN_DIR being tmpfs)
 SUB_RUN_DIRS = [ RUN_GANETI_DIR, BDEV_CACHE_DIR, DISK_LINKS_DIR ]
@@ -199,6 +201,14 @@ IMPORT_EXPORT_DAEMON = _autoconf.PKGLIBDIR + "/import-export"
 IEM_IMPORT = "import"
 IEM_EXPORT = "export"
 
+# Import/export I/O
+# Direct file I/O, equivalent to a shell's I/O redirection using '<' or '>'
+IEIO_FILE = "file"
+# Raw block device I/O using "dd"
+IEIO_RAW_DISK = "raw"
+# OS definition import/export script
+IEIO_SCRIPT = "script"
+
 VALUE_DEFAULT = "default"
 VALUE_AUTO = "auto"
 VALUE_GENERATE = "generate"
index 3f1fffb..b99b49e 100644 (file)
@@ -258,6 +258,21 @@ class Client:
     return results
 
 
+def _EncodeImportExportIO(ieio, ieioargs):
+  """Encodes import/export I/O information.
+
+  """
+  if ieio == constants.IEIO_RAW_DISK:
+    assert len(ieioargs) == 1
+    return (ieioargs[0].ToDict(), )
+
+  if ieio == constants.IEIO_SCRIPT:
+    assert len(ieioargs) == 2
+    return (ieioargs[0].ToDict(), ieioargs[1])
+
+  return ieioargs
+
+
 class RpcRunner(object):
   """RPC runner class"""
 
@@ -1209,3 +1224,79 @@ class RpcRunner(object):
 
     """
     return self._SingleNodeCall(node, "remove_x509_certificate", [name])
+
+  def call_start_import_listener(self, node, x509_key_name, source_x509_ca,
+                                 instance, dest, dest_args):
+    """Starts a listener for an import.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type instance: C{objects.Instance}
+    @param instance: Instance object
+
+    """
+    return self._SingleNodeCall(node, "start_import_listener",
+                                [x509_key_name, source_x509_ca,
+                                 self._InstDict(instance), dest,
+                                 _EncodeImportExportIO(dest, dest_args)])
+
+  def call_start_export(self, node, x509_key_name, dest_x509_ca, host, port,
+                        instance, source, source_args):
+    """Starts an export daemon.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type instance: C{objects.Instance}
+    @param instance: Instance object
+
+    """
+    return self._SingleNodeCall(node, "start_export",
+                                [x509_key_name, dest_x509_ca, host, port,
+                                 self._InstDict(instance), source,
+                                 _EncodeImportExportIO(source, source_args)])
+
+  def call_get_import_export_status(self, node, names):
+    """Gets the status of an import or export.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type names: List of strings
+    @param names: Import/export names
+    @rtype: List of L{objects.ImportExportStatus} instances
+    @return: Returns a list of the state of each named import/export or None if
+             a status couldn't be retrieved
+
+    """
+    result = self._SingleNodeCall(node, "get_import_export_status", [names])
+
+    if not result.fail_msg:
+      decoded = []
+
+      for i in result.payload:
+        if i is None:
+          decoded.append(None)
+          continue
+        decoded.append(objects.ImportExportStatus.FromDict(i))
+
+      result.payload = decoded
+
+    return result
+
+  def call_cleanup_import_export(self, node, name):
+    """Cleans up after an import or export.
+
+    This is a single-node call.
+
+    @type node: string
+    @param node: Node name
+    @type name: string
+    @param name: Import/export name
+
+    """
+    return self._SingleNodeCall(node, "cleanup_import_export", [name])