jqueue: make replication on job update optional
[ganeti-local] / lib / backend.py
index 769e6a6..2a158d6 100644 (file)
@@ -47,6 +47,7 @@ import logging
 import tempfile
 import zlib
 import base64
+import signal
 
 from ganeti import errors
 from ganeti import utils
@@ -56,6 +57,7 @@ from ganeti import constants
 from ganeti import bdev
 from ganeti import objects
 from ganeti import ssconf
+from ganeti import serializer
 
 
 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
@@ -63,7 +65,14 @@ _ALLOWED_CLEAN_DIRS = frozenset([
   constants.DATA_DIR,
   constants.JOB_QUEUE_ARCHIVE_DIR,
   constants.QUEUE_DIR,
+  constants.CRYPTO_KEYS_DIR,
   ])
+_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
+_X509_KEY_FILE = "key"
+_X509_CERT_FILE = "cert"
+_IES_STATUS_FILE = "status"
+_IES_PID_FILE = "pid"
+_IES_CA_FILE = "ca"
 
 
 class RPCFail(Exception):
@@ -184,6 +193,7 @@ def _BuildUploadFileList():
     constants.RAPI_CERT_FILE,
     constants.RAPI_USERS_FILE,
     constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
     ])
 
   for hv_name in constants.HYPER_TYPES:
@@ -385,6 +395,7 @@ def LeaveCluster(modify_ssh_setup):
 
   """
   _CleanDirectory(constants.DATA_DIR)
+  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
   JobQueuePurge()
 
   if modify_ssh_setup:
@@ -838,6 +849,7 @@ def _InstanceLogName(kind, os_name, instance):
   @param instance: the name of the instance being imported/added/etc.
 
   """
+  # TODO: Use tempfile.mkstemp to create unique filename
   base = ("%s-%s-%s-%s.log" %
           (kind, os_name, instance, utils.TimestampForFilename()))
   return utils.PathJoin(constants.LOG_OS_DIR, base)
@@ -1899,11 +1911,9 @@ def OSFromDisk(name, base_dir=None):
   return payload
 
 
-def OSEnvironment(instance, inst_os, debug=0):
-  """Calculate the environment for an os script.
+def OSCoreEnv(inst_os, debug=0):
+  """Calculate the basic environment for an os script.
 
-  @type instance: L{objects.Instance}
-  @param instance: target instance for the os script run
   @type inst_os: L{objects.OS}
   @param inst_os: operating system for which the environment is being built
   @type debug: integer
@@ -1918,18 +1928,44 @@ def OSEnvironment(instance, inst_os, debug=0):
   api_version = \
     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
   result['OS_API_VERSION'] = '%d' % api_version
-  result['INSTANCE_NAME'] = instance.name
-  result['INSTANCE_OS'] = instance.os
-  result['HYPERVISOR'] = instance.hypervisor
-  result['DISK_COUNT'] = '%d' % len(instance.disks)
-  result['NIC_COUNT'] = '%d' % len(instance.nics)
+  result['OS_NAME'] = inst_os.name
   result['DEBUG_LEVEL'] = '%d' % debug
+
+  # OS variants
   if api_version >= constants.OS_API_V15:
     try:
-      variant = instance.os.split('+', 1)[1]
+      variant = inst_os.name.split('+', 1)[1]
     except IndexError:
       variant = inst_os.supported_variants[0]
     result['OS_VARIANT'] = variant
+
+  return result
+
+
+def OSEnvironment(instance, inst_os, debug=0):
+  """Calculate the environment for an os script.
+
+  @type instance: L{objects.Instance}
+  @param instance: target instance for the os script run
+  @type inst_os: L{objects.OS}
+  @param inst_os: operating system for which the environment is being built
+  @type debug: integer
+  @param debug: debug level (0 or 1, for OS Api 10)
+  @rtype: dict
+  @return: dict of environment variables
+  @raise errors.BlockDeviceError: if the block device
+      cannot be found
+
+  """
+  result = OSCoreEnv(inst_os, debug)
+
+  result['INSTANCE_NAME'] = instance.name
+  result['INSTANCE_OS'] = instance.os
+  result['HYPERVISOR'] = instance.hypervisor
+  result['DISK_COUNT'] = '%d' % len(instance.disks)
+  result['NIC_COUNT'] = '%d' % len(instance.nics)
+
+  # Disks
   for idx, disk in enumerate(instance.disks):
     real_disk = _OpenRealBD(disk)
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
@@ -1942,6 +1978,8 @@ def OSEnvironment(instance, inst_os, debug=0):
     elif disk.dev_type == constants.LD_FILE:
       result['DISK_%d_BACKEND_TYPE' % idx] = \
         'file:%s' % disk.physical_id[0]
+
+  # NICs
   for idx, nic in enumerate(instance.nics):
     result['NIC_%d_MAC' % idx] = nic.mac
     if nic.ip:
@@ -1955,6 +1993,7 @@ def OSEnvironment(instance, inst_os, debug=0):
       result['NIC_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_NIC_TYPE]
 
+  # HV/BE params
   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
     for key, value in source.items():
       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
@@ -2016,64 +2055,6 @@ def BlockdevSnapshot(disk):
           disk.unique_id, disk.dev_type)
 
 
-def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
-  """Export a block device snapshot to a remote node.
-
-  @type disk: L{objects.Disk}
-  @param disk: the description of the disk to export
-  @type dest_node: str
-  @param dest_node: the destination node to export to
-  @type instance: L{objects.Instance}
-  @param instance: the instance object to whom the disk belongs
-  @type cluster_name: str
-  @param cluster_name: the cluster name, needed for SSH hostalias
-  @type idx: int
-  @param idx: the index of the disk in the instance's disk list,
-      used to export to the OS scripts environment
-  @type debug: integer
-  @param debug: debug level, passed to the OS scripts
-  @rtype: None
-
-  """
-  inst_os = OSFromDisk(instance.os)
-  export_env = OSEnvironment(instance, inst_os, debug)
-
-  export_script = inst_os.export_script
-
-  logfile = _InstanceLogName("export", inst_os.name, instance.name)
-
-  real_disk = _OpenRealBD(disk)
-
-  export_env['EXPORT_DEVICE'] = real_disk.dev_path
-  export_env['EXPORT_INDEX'] = str(idx)
-
-  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
-  destfile = disk.physical_id[1]
-
-  # the target command is built out of three individual commands,
-  # which are joined by pipes; we check each individual command for
-  # valid parameters
-  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
-                               inst_os.path, export_script, logfile)
-
-  comprcmd = "gzip"
-
-  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
-                                destdir, utils.PathJoin(destdir, destfile))
-  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
-                                                   constants.GANETI_RUNAS,
-                                                   destcmd)
-
-  # all commands have been checked, so we're safe to combine them
-  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
-
-  result = utils.RunCmd(["bash", "-c", command], env=export_env)
-
-  if result.failed:
-    _Fail("OS snapshot export command '%s' returned error: %s"
-          " output: %s", command, result.fail_reason, result.output)
-
-
 def FinalizeExport(instance, snap_disks):
   """Write out the export configuration information.
 
@@ -2173,53 +2154,6 @@ def ExportInfo(dest):
   return config.Dumps()
 
 
-def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
-  """Import an os image into an instance.
-
-  @type instance: L{objects.Instance}
-  @param instance: instance to import the disks into
-  @type src_node: string
-  @param src_node: source node for the disk images
-  @type src_images: list of string
-  @param src_images: absolute paths of the disk images
-  @type debug: integer
-  @param debug: debug level, passed to the OS scripts
-  @rtype: list of boolean
-  @return: each boolean represent the success of importing the n-th disk
-
-  """
-  inst_os = OSFromDisk(instance.os)
-  import_env = OSEnvironment(instance, inst_os, debug)
-  import_script = inst_os.import_script
-
-  logfile = _InstanceLogName("import", instance.os, instance.name)
-
-  comprcmd = "gunzip"
-  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
-                               import_script, logfile)
-
-  final_result = []
-  for idx, image in enumerate(src_images):
-    if image:
-      destcmd = utils.BuildShellCmd('cat %s', image)
-      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
-                                                       constants.GANETI_RUNAS,
-                                                       destcmd)
-      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
-      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
-      import_env['IMPORT_INDEX'] = str(idx)
-      result = utils.RunCmd(command, env=import_env)
-      if result.failed:
-        logging.error("Disk import command '%s' returned error: %s"
-                      " output: %s", command, result.fail_reason,
-                      result.output)
-        final_result.append("error importing disk %d: %s, %s" %
-                            (idx, result.fail_reason, result.output[-100]))
-
-  if final_result:
-    _Fail("; ".join(final_result), log=False)
-
-
 def ListExports():
   """Return a list of exports currently available on this machine.
 
@@ -2228,7 +2162,7 @@ def ListExports():
 
   """
   if os.path.isdir(constants.EXPORT_DIR):
-    return utils.ListVisibleFiles(constants.EXPORT_DIR)
+    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
   else:
     _Fail("No exports directory")
 
@@ -2450,24 +2384,6 @@ def JobQueueRename(old, new):
   utils.RenameFile(old, new, mkdir=True)
 
 
-def JobQueueSetDrainFlag(drain_flag):
-  """Set the drain flag for the queue.
-
-  This will set or unset the queue drain flag.
-
-  @type drain_flag: boolean
-  @param drain_flag: if True, will set the drain flag, otherwise reset it.
-  @rtype: truple
-  @return: always True, None
-  @warning: the function always returns True
-
-  """
-  if drain_flag:
-    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
-  else:
-    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
-
-
 def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
@@ -2545,6 +2461,363 @@ def DemoteFromMC():
   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
 
 
+def _GetX509Filenames(cryptodir, name):
+  """Returns the full paths for the private key and certificate.
+
+  """
+  return (utils.PathJoin(cryptodir, name),
+          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
+          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
+
+
+def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
+  """Creates a new X509 certificate for SSL/TLS.
+
+  @type validity: int
+  @param validity: Validity in seconds
+  @rtype: tuple; (string, string)
+  @return: Certificate name and public part
+
+  """
+  (key_pem, cert_pem) = \
+    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
+                                     min(validity, _MAX_SSL_CERT_VALIDITY))
+
+  cert_dir = tempfile.mkdtemp(dir=cryptodir,
+                              prefix="x509-%s-" % utils.TimestampForFilename())
+  try:
+    name = os.path.basename(cert_dir)
+    assert len(name) > 5
+
+    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
+
+    utils.WriteFile(key_file, mode=0400, data=key_pem)
+    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
+
+    # Never return private key as it shouldn't leave the node
+    return (name, cert_pem)
+  except Exception:
+    shutil.rmtree(cert_dir, ignore_errors=True)
+    raise
+
+
+def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
+  """Removes a X509 certificate.
+
+  @type name: string
+  @param name: Certificate name
+
+  """
+  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
+
+  utils.RemoveFile(key_file)
+  utils.RemoveFile(cert_file)
+
+  try:
+    os.rmdir(cert_dir)
+  except EnvironmentError, err:
+    _Fail("Cannot remove certificate directory '%s': %s",
+          cert_dir, err)
+
+
+def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
+  """Returns the command for the requested input/output.
+
+  @type instance: L{objects.Instance}
+  @param instance: The instance object
+  @param mode: Import/export mode
+  @param ieio: Input/output type
+  @param ieargs: Input/output arguments
+
+  """
+  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
+
+  env = None
+  prefix = None
+  suffix = None
+  exp_size = None
+
+  if ieio == constants.IEIO_FILE:
+    (filename, ) = ieargs
+
+    if not utils.IsNormAbsPath(filename):
+      _Fail("Path '%s' is not normalized or absolute", filename)
+
+    directory = os.path.normpath(os.path.dirname(filename))
+
+    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
+        constants.EXPORT_DIR):
+      _Fail("File '%s' is not under exports directory '%s'",
+            filename, constants.EXPORT_DIR)
+
+    # Create directory
+    utils.Makedirs(directory, mode=0750)
+
+    quoted_filename = utils.ShellQuote(filename)
+
+    if mode == constants.IEM_IMPORT:
+      suffix = "> %s" % quoted_filename
+    elif mode == constants.IEM_EXPORT:
+      suffix = "< %s" % quoted_filename
+
+      # Retrieve file size
+      try:
+        st = os.stat(filename)
+      except EnvironmentError, err:
+        logging.error("Can't stat(2) %s: %s", filename, err)
+      else:
+        exp_size = utils.BytesToMebibyte(st.st_size)
+
+  elif ieio == constants.IEIO_RAW_DISK:
+    (disk, ) = ieargs
+
+    real_disk = _OpenRealBD(disk)
+
+    if mode == constants.IEM_IMPORT:
+      # we set here a smaller block size as, due to transport buffering, more
+      # than 64-128k will mostly ignored; we use nocreat to fail if the device
+      # is not already there or we pass a wrong path; we use notrunc to no
+      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
+      # much memory; this means that at best, we flush every 64k, which will
+      # not be very fast
+      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
+                                    " bs=%s oflag=dsync"),
+                                    real_disk.dev_path,
+                                    str(64 * 1024))
+
+    elif mode == constants.IEM_EXPORT:
+      # the block size on the read dd is 1MiB to match our units
+      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
+                                   real_disk.dev_path,
+                                   str(1024 * 1024), # 1 MB
+                                   str(disk.size))
+      exp_size = disk.size
+
+  elif ieio == constants.IEIO_SCRIPT:
+    (disk, disk_index, ) = ieargs
+
+    assert isinstance(disk_index, (int, long))
+
+    real_disk = _OpenRealBD(disk)
+
+    inst_os = OSFromDisk(instance.os)
+    env = OSEnvironment(instance, inst_os)
+
+    if mode == constants.IEM_IMPORT:
+      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
+      env["IMPORT_INDEX"] = str(disk_index)
+      script = inst_os.import_script
+
+    elif mode == constants.IEM_EXPORT:
+      env["EXPORT_DEVICE"] = real_disk.dev_path
+      env["EXPORT_INDEX"] = str(disk_index)
+      script = inst_os.export_script
+
+    # TODO: Pass special environment only to script
+    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
+
+    if mode == constants.IEM_IMPORT:
+      suffix = "| %s" % script_cmd
+
+    elif mode == constants.IEM_EXPORT:
+      prefix = "%s |" % script_cmd
+
+    # Let script predict size
+    exp_size = constants.IE_CUSTOM_SIZE
+
+  else:
+    _Fail("Invalid %s I/O mode %r", mode, ieio)
+
+  return (env, prefix, suffix, exp_size)
+
+
+def _CreateImportExportStatusDir(prefix):
+  """Creates status directory for import/export.
+
+  """
+  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
+                          prefix=("%s-%s-" %
+                                  (prefix, utils.TimestampForFilename())))
+
+
+def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
+  """Starts an import or export daemon.
+
+  @param mode: Import/output mode
+  @type opts: L{objects.ImportExportOptions}
+  @param opts: Daemon options
+  @type host: string
+  @param host: Remote host for export (None for import)
+  @type port: int
+  @param port: Remote port for export (None for import)
+  @type instance: L{objects.Instance}
+  @param instance: Instance object
+  @param ieio: Input/output type
+  @param ieioargs: Input/output arguments
+
+  """
+  if mode == constants.IEM_IMPORT:
+    prefix = "import"
+
+    if not (host is None and port is None):
+      _Fail("Can not specify host or port on import")
+
+  elif mode == constants.IEM_EXPORT:
+    prefix = "export"
+
+    if host is None or port is None:
+      _Fail("Host and port must be specified for an export")
+
+  else:
+    _Fail("Invalid mode %r", mode)
+
+  if (opts.key_name is None) ^ (opts.ca_pem is None):
+    _Fail("Cluster certificate can only be used for both key and CA")
+
+  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
+    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
+
+  if opts.key_name is None:
+    # Use server.pem
+    key_path = constants.NODED_CERT_FILE
+    cert_path = constants.NODED_CERT_FILE
+    assert opts.ca_pem is None
+  else:
+    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
+                                                 opts.key_name)
+    assert opts.ca_pem is not None
+
+  for i in [key_path, cert_path]:
+    if not os.path.exists(i):
+      _Fail("File '%s' does not exist" % i)
+
+  status_dir = _CreateImportExportStatusDir(prefix)
+  try:
+    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
+    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
+    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
+
+    if opts.ca_pem is None:
+      # Use server.pem
+      ca = utils.ReadFile(constants.NODED_CERT_FILE)
+    else:
+      ca = opts.ca_pem
+
+    # Write CA file
+    utils.WriteFile(ca_file, data=ca, mode=0400)
+
+    cmd = [
+      constants.IMPORT_EXPORT_DAEMON,
+      status_file, mode,
+      "--key=%s" % key_path,
+      "--cert=%s" % cert_path,
+      "--ca=%s" % ca_file,
+      ]
+
+    if host:
+      cmd.append("--host=%s" % host)
+
+    if port:
+      cmd.append("--port=%s" % port)
+
+    if opts.compress:
+      cmd.append("--compress=%s" % opts.compress)
+
+    if opts.magic:
+      cmd.append("--magic=%s" % opts.magic)
+
+    if exp_size is not None:
+      cmd.append("--expected-size=%s" % exp_size)
+
+    if cmd_prefix:
+      cmd.append("--cmd-prefix=%s" % cmd_prefix)
+
+    if cmd_suffix:
+      cmd.append("--cmd-suffix=%s" % cmd_suffix)
+
+    logfile = _InstanceLogName(prefix, instance.os, instance.name)
+
+    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
+    # support for receiving a file descriptor for output
+    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
+                      output=logfile)
+
+    # The import/export name is simply the status directory name
+    return os.path.basename(status_dir)
+
+  except Exception:
+    shutil.rmtree(status_dir, ignore_errors=True)
+    raise
+
+
+def GetImportExportStatus(names):
+  """Returns import/export daemon status.
+
+  @type names: sequence
+  @param names: List of names
+  @rtype: List of dicts
+  @return: Returns a list of the state of each named import/export or None if a
+           status couldn't be read
+
+  """
+  result = []
+
+  for name in names:
+    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
+                                 _IES_STATUS_FILE)
+
+    try:
+      data = utils.ReadFile(status_file)
+    except EnvironmentError, err:
+      if err.errno != errno.ENOENT:
+        raise
+      data = None
+
+    if not data:
+      result.append(None)
+      continue
+
+    result.append(serializer.LoadJson(data))
+
+  return result
+
+
+def AbortImportExport(name):
+  """Sends SIGTERM to a running import/export daemon.
+
+  """
+  logging.info("Abort import/export %s", name)
+
+  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
+  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
+
+  if pid:
+    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
+                 name, pid)
+    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
+
+
+def CleanupImportExport(name):
+  """Cleanup after an import or export.
+
+  If the import/export daemon is still running it's killed. Afterwards the
+  whole status directory is removed.
+
+  """
+  logging.info("Finalizing import/export %s", name)
+
+  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
+
+  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
+
+  if pid:
+    logging.info("Import/export %s is still running with PID %s",
+                 name, pid)
+    utils.KillProcess(pid, waitpid=False)
+
+  shutil.rmtree(status_dir, ignore_errors=True)
+
+
 def _FindDisks(nodes_ip, disks):
   """Sets the physical ID on disks and returns the block devices.