return wrapper
+def _DecodeImportExportIO(ieio, ieioargs):
+ """Decodes import/export I/O information.
+
+ """
+ if ieio == constants.IEIO_RAW_DISK:
+ assert len(ieioargs) == 1
+ return (objects.Disk.FromDict(ieioargs[0]), )
+
+ if ieio == constants.IEIO_SCRIPT:
+ assert len(ieioargs) == 2
+ return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
+
+ return ieioargs
+
+
class NodeHttpServer(http.server.HttpServer):
"""The server implementation.
(name, ) = params
return backend.RemoveX509Certificate(name)
+ # Import and export
+
+ @staticmethod
+ def perspective_start_import_listener(params):
+ """Starts an import daemon.
+
+ """
+ (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
+ return backend.StartImportExportDaemon(constants.IEM_IMPORT,
+ x509_key_name, source_x509_ca,
+ None, None,
+ objects.Instance.FromDict(instance),
+ dest,
+ _DecodeImportExportIO(dest,
+ dest_args))
+ @staticmethod
+ def perspective_start_export(params):
+ """Starts an export daemon.
+
+ """
+ (x509_key_name, dest_x509_ca, host, port, instance,
+ source, source_args) = params
+ return backend.StartImportExportDaemon(constants.IEM_EXPORT,
+ x509_key_name, dest_x509_ca,
+ host, port,
+ objects.Instance.FromDict(instance),
+ source,
+ _DecodeImportExportIO(source,
+ source_args))
+
+ @staticmethod
+ def perspective_get_import_export_status(params):
+ """Retrieves the status of an import or export daemon.
+
+ """
+ return backend.GetImportExportStatus(params[0])
+
+ @staticmethod
+ def perspective_cleanup_import_export(params):
+ """Cleans up after an import or export.
+
+ """
+ return backend.CleanupImportExport(params[0])
+
def CheckNoded(_, args):
"""Initial checks whether to run or exit with a failure.
dirs.append((constants.LOG_OS_DIR, 0750))
dirs.append((constants.LOCK_DIR, 1777))
dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
+ dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
default_ssl_cert=constants.NODED_CERT_FILE,
default_ssl_key=constants.NODED_CERT_FILE)
from ganeti import bdev
from ganeti import objects
from ganeti import ssconf
+from ganeti import serializer
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
_X509_KEY_FILE = "key"
_X509_CERT_FILE = "cert"
+_IES_STATUS_FILE = "status"
+_IES_PID_FILE = "pid"
+_IES_CA_FILE = "ca"
class RPCFail(Exception):
@param instance: the name of the instance being imported/added/etc.
"""
+ # TODO: Use tempfile.mkstemp to create unique filename
base = ("%s-%s-%s-%s.log" %
(kind, os_name, instance, utils.TimestampForFilename()))
return utils.PathJoin(constants.LOG_OS_DIR, base)
cert_dir, err)
+def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
+ """Returns the command for the requested input/output.
+
+ @type instance: L{objects.Instance}
+ @param instance: The instance object
+ @param mode: Import/export mode
+ @param ieio: Input/output type
+ @param ieargs: Input/output arguments
+
+ """
+ assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
+
+ env = None
+ prefix = None
+ suffix = None
+
+ if ieio == constants.IEIO_FILE:
+ (filename, ) = ieargs
+
+ if not utils.IsNormAbsPath(filename):
+ _Fail("Path '%s' is not normalized or absolute", filename)
+
+ directory = os.path.normpath(os.path.dirname(filename))
+
+ if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
+ constants.EXPORT_DIR):
+ _Fail("File '%s' is not under exports directory '%s'",
+ filename, constants.EXPORT_DIR)
+
+ # Create directory
+ utils.Makedirs(directory, mode=0750)
+
+ quoted_filename = utils.ShellQuote(filename)
+
+ if mode == constants.IEM_IMPORT:
+ suffix = "> %s" % quoted_filename
+ elif mode == constants.IEM_EXPORT:
+ suffix = "< %s" % quoted_filename
+
+ elif ieio == constants.IEIO_RAW_DISK:
+ (disk, ) = ieargs
+
+ real_disk = _OpenRealBD(disk)
+
+ if mode == constants.IEM_IMPORT:
+ # we set here a smaller block size as, due to transport buffering, more
+ # than 64-128k will mostly ignored; we use nocreat to fail if the device
+ # is not already there or we pass a wrong path; we use notrunc to no
+ # attempt truncate on an LV device; we use oflag=dsync to not buffer too
+ # much memory; this means that at best, we flush every 64k, which will
+ # not be very fast
+ suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
+ " bs=%s oflag=dsync"),
+ real_disk.dev_path,
+ str(64 * 1024))
+
+ elif mode == constants.IEM_EXPORT:
+ # the block size on the read dd is 1MiB to match our units
+ prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
+ real_disk.dev_path,
+ str(1024 * 1024), # 1 MB
+ str(disk.size))
+
+ elif ieio == constants.IEIO_SCRIPT:
+ (disk, disk_index, ) = ieargs
+
+ assert isinstance(disk_index, (int, long))
+
+ real_disk = _OpenRealBD(disk)
+
+ inst_os = OSFromDisk(instance.os)
+ env = OSEnvironment(instance, inst_os)
+
+ if mode == constants.IEM_IMPORT:
+ env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
+ env["IMPORT_INDEX"] = str(disk_index)
+ script = inst_os.import_script
+
+ elif mode == constants.IEM_EXPORT:
+ env["EXPORT_DEVICE"] = real_disk.dev_path
+ env["EXPORT_INDEX"] = str(disk_index)
+ script = inst_os.export_script
+
+ # TODO: Pass special environment only to script
+ script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
+
+ if mode == constants.IEM_IMPORT:
+ suffix = "| %s" % script_cmd
+
+ elif mode == constants.IEM_EXPORT:
+ prefix = "%s |" % script_cmd
+
+ else:
+ _Fail("Invalid %s I/O mode %r", mode, ieio)
+
+ return (env, prefix, suffix)
+
+
+def _CreateImportExportStatusDir(prefix):
+ """Creates status directory for import/export.
+
+ """
+ return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
+ prefix=("%s-%s-" %
+ (prefix, utils.TimestampForFilename())))
+
+
+def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
+ ieio, ieioargs):
+ """Starts an import or export daemon.
+
+ @param mode: Import/output mode
+ @type key_name: string
+ @param key_name: RSA key name (None to use cluster certificate)
+ @type ca: string:
+ @param ca: Remote CA in PEM format (None to use cluster certificate)
+ @type host: string
+ @param host: Remote host for export (None for import)
+ @type port: int
+ @param port: Remote port for export (None for import)
+ @type instance: L{objects.Instance}
+ @param instance: Instance object
+ @param ieio: Input/output type
+ @param ieioargs: Input/output arguments
+
+ """
+ if mode == constants.IEM_IMPORT:
+ prefix = "import"
+
+ if not (host is None and port is None):
+ _Fail("Can not specify host or port on import")
+
+ elif mode == constants.IEM_EXPORT:
+ prefix = "export"
+
+ if host is None or port is None:
+ _Fail("Host and port must be specified for an export")
+
+ else:
+ _Fail("Invalid mode %r", mode)
+
+ if (key_name is None) ^ (ca is None):
+ _Fail("Cluster certificate can only be used for both key and CA")
+
+ (cmd_env, cmd_prefix, cmd_suffix) = \
+ _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
+
+ if key_name is None:
+ # Use server.pem
+ key_path = constants.NODED_CERT_FILE
+ cert_path = constants.NODED_CERT_FILE
+ assert ca is None
+ else:
+ (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
+ key_name)
+ assert ca is not None
+
+ status_dir = _CreateImportExportStatusDir(prefix)
+ try:
+ status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
+ pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
+
+ if ca is None:
+ # Use server.pem
+ # TODO: If socat runs as a non-root user, this might need to be copied to
+ # a separate file
+ ca_path = constants.NODED_CERT_FILE
+ else:
+ ca_path = utils.PathJoin(status_dir, _IES_CA_FILE)
+ utils.WriteFile(ca_path, data=ca, mode=0400)
+
+ cmd = [
+ constants.IMPORT_EXPORT_DAEMON,
+ status_file, mode,
+ "--key=%s" % key_path,
+ "--cert=%s" % cert_path,
+ "--ca=%s" % ca_path,
+ ]
+
+ if host:
+ cmd.append("--host=%s" % host)
+
+ if port:
+ cmd.append("--port=%s" % port)
+
+ if cmd_prefix:
+ cmd.append("--cmd-prefix=%s" % cmd_prefix)
+
+ if cmd_suffix:
+ cmd.append("--cmd-suffix=%s" % cmd_suffix)
+
+ logfile = _InstanceLogName(prefix, instance.os, instance.name)
+
+ # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
+ # support for receiving a file descriptor for output
+ utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
+ output=logfile)
+
+ # The import/export name is simply the status directory name
+ return os.path.basename(status_dir)
+
+ except Exception:
+ shutil.rmtree(status_dir, ignore_errors=True)
+ raise
+
+
+def GetImportExportStatus(names):
+ """Returns import/export daemon status.
+
+ @type names: sequence
+ @param names: List of names
+ @rtype: List of dicts
+ @return: Returns a list of the state of each named import/export or None if a
+ status couldn't be read
+
+ """
+ result = []
+
+ for name in names:
+ status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
+ _IES_STATUS_FILE)
+
+ try:
+ data = utils.ReadFile(status_file)
+ except EnvironmentError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ data = None
+
+ if not data:
+ result.append(None)
+ continue
+
+ result.append(serializer.LoadJson(data))
+
+ return result
+
+
+def CleanupImportExport(name):
+ """Cleanup after an import or export.
+
+ If the import/export daemon is still running it's killed. Afterwards the
+ whole status directory is removed.
+
+ """
+ logging.info("Finalizing import/export %s", name)
+
+ status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
+ pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
+
+ pid = None
+ try:
+ fd = os.open(pid_file, os.O_RDONLY)
+ except EnvironmentError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ # PID file doesn't exist
+ else:
+ try:
+ try:
+ # Try to acquire lock
+ utils.LockFile(fd)
+ except errors.LockError:
+ # Couldn't lock, daemon is running
+ pid = int(os.read(fd, 100))
+ finally:
+ os.close(fd)
+
+ if pid:
+ logging.info("Import/export %s is still running with PID %s",
+ name, pid)
+ utils.KillProcess(pid, waitpid=False)
+
+ shutil.rmtree(status_dir, ignore_errors=True)
+
+
def _FindDisks(nodes_ip, disks):
"""Sets the physical ID on disks and returns the block devices.
return results
+def _EncodeImportExportIO(ieio, ieioargs):
+ """Encodes import/export I/O information.
+
+ """
+ if ieio == constants.IEIO_RAW_DISK:
+ assert len(ieioargs) == 1
+ return (ieioargs[0].ToDict(), )
+
+ if ieio == constants.IEIO_SCRIPT:
+ assert len(ieioargs) == 2
+ return (ieioargs[0].ToDict(), ieioargs[1])
+
+ return ieioargs
+
+
class RpcRunner(object):
"""RPC runner class"""
"""
return self._SingleNodeCall(node, "remove_x509_certificate", [name])
+
+ def call_start_import_listener(self, node, x509_key_name, source_x509_ca,
+ instance, dest, dest_args):
+ """Starts a listener for an import.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: Node name
+ @type instance: C{objects.Instance}
+ @param instance: Instance object
+
+ """
+ return self._SingleNodeCall(node, "start_import_listener",
+ [x509_key_name, source_x509_ca,
+ self._InstDict(instance), dest,
+ _EncodeImportExportIO(dest, dest_args)])
+
+ def call_start_export(self, node, x509_key_name, dest_x509_ca, host, port,
+ instance, source, source_args):
+ """Starts an export daemon.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: Node name
+ @type instance: C{objects.Instance}
+ @param instance: Instance object
+
+ """
+ return self._SingleNodeCall(node, "start_export",
+ [x509_key_name, dest_x509_ca, host, port,
+ self._InstDict(instance), source,
+ _EncodeImportExportIO(source, source_args)])
+
+ def call_get_import_export_status(self, node, names):
+ """Gets the status of an import or export.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: Node name
+ @type names: List of strings
+ @param names: Import/export names
+ @rtype: List of L{objects.ImportExportStatus} instances
+ @return: Returns a list of the state of each named import/export or None if
+ a status couldn't be retrieved
+
+ """
+ result = self._SingleNodeCall(node, "get_import_export_status", [names])
+
+ if not result.fail_msg:
+ decoded = []
+
+ for i in result.payload:
+ if i is None:
+ decoded.append(None)
+ continue
+ decoded.append(objects.ImportExportStatus.FromDict(i))
+
+ result.payload = decoded
+
+ return result
+
+ def call_cleanup_import_export(self, node, name):
+ """Cleans up after an import or export.
+
+ This is a single-node call.
+
+ @type node: string
+ @param node: Node name
+ @type name: string
+ @param name: Import/export name
+
+ """
+ return self._SingleNodeCall(node, "cleanup_import_export", [name])