jqueue: make replication on job update optional
[ganeti-local] / lib / backend.py
index 413e9df..2a158d6 100644 (file)
@@ -193,6 +193,7 @@ def _BuildUploadFileList():
     constants.RAPI_CERT_FILE,
     constants.RAPI_USERS_FILE,
     constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
     ])
 
   for hv_name in constants.HYPER_TYPES:
@@ -1862,7 +1863,7 @@ def _TryOSFromDisk(name, base_dir=None):
         return False, ("File '%s' under path '%s' is not executable" %
                        (filename, os_dir))
 
-  variants = None
+  variants = []
   if constants.OS_VARIANTS_FILE in os_files:
     variants_file = os_files[constants.OS_VARIANTS_FILE]
     try:
@@ -1910,11 +1911,9 @@ def OSFromDisk(name, base_dir=None):
   return payload
 
 
-def OSEnvironment(instance, inst_os, debug=0):
-  """Calculate the environment for an os script.
+def OSCoreEnv(inst_os, debug=0):
+  """Calculate the basic environment for an os script.
 
-  @type instance: L{objects.Instance}
-  @param instance: target instance for the os script run
   @type inst_os: L{objects.OS}
   @param inst_os: operating system for which the environment is being built
   @type debug: integer
@@ -1929,18 +1928,44 @@ def OSEnvironment(instance, inst_os, debug=0):
   api_version = \
     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
   result['OS_API_VERSION'] = '%d' % api_version
-  result['INSTANCE_NAME'] = instance.name
-  result['INSTANCE_OS'] = instance.os
-  result['HYPERVISOR'] = instance.hypervisor
-  result['DISK_COUNT'] = '%d' % len(instance.disks)
-  result['NIC_COUNT'] = '%d' % len(instance.nics)
+  result['OS_NAME'] = inst_os.name
   result['DEBUG_LEVEL'] = '%d' % debug
+
+  # OS variants
   if api_version >= constants.OS_API_V15:
     try:
-      variant = instance.os.split('+', 1)[1]
+      variant = inst_os.name.split('+', 1)[1]
     except IndexError:
       variant = inst_os.supported_variants[0]
     result['OS_VARIANT'] = variant
+
+  return result
+
+
+def OSEnvironment(instance, inst_os, debug=0):
+  """Calculate the environment for an os script.
+
+  @type instance: L{objects.Instance}
+  @param instance: target instance for the os script run
+  @type inst_os: L{objects.OS}
+  @param inst_os: operating system for which the environment is being built
+  @type debug: integer
+  @param debug: debug level (0 or 1, for OS Api 10)
+  @rtype: dict
+  @return: dict of environment variables
+  @raise errors.BlockDeviceError: if the block device
+      cannot be found
+
+  """
+  result = OSCoreEnv(inst_os, debug)
+
+  result['INSTANCE_NAME'] = instance.name
+  result['INSTANCE_OS'] = instance.os
+  result['HYPERVISOR'] = instance.hypervisor
+  result['DISK_COUNT'] = '%d' % len(instance.disks)
+  result['NIC_COUNT'] = '%d' % len(instance.nics)
+
+  # Disks
   for idx, disk in enumerate(instance.disks):
     real_disk = _OpenRealBD(disk)
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
@@ -1953,6 +1978,8 @@ def OSEnvironment(instance, inst_os, debug=0):
     elif disk.dev_type == constants.LD_FILE:
       result['DISK_%d_BACKEND_TYPE' % idx] = \
         'file:%s' % disk.physical_id[0]
+
+  # NICs
   for idx, nic in enumerate(instance.nics):
     result['NIC_%d_MAC' % idx] = nic.mac
     if nic.ip:
@@ -1966,6 +1993,7 @@ def OSEnvironment(instance, inst_os, debug=0):
       result['NIC_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_NIC_TYPE]
 
+  # HV/BE params
   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
     for key, value in source.items():
       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
@@ -2134,7 +2162,7 @@ def ListExports():
 
   """
   if os.path.isdir(constants.EXPORT_DIR):
-    return utils.ListVisibleFiles(constants.EXPORT_DIR)
+    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
   else:
     _Fail("No exports directory")
 
@@ -2356,24 +2384,6 @@ def JobQueueRename(old, new):
   utils.RenameFile(old, new, mkdir=True)
 
 
-def JobQueueSetDrainFlag(drain_flag):
-  """Set the drain flag for the queue.
-
-  This will set or unset the queue drain flag.
-
-  @type drain_flag: boolean
-  @param drain_flag: if True, will set the drain flag, otherwise reset it.
-  @rtype: truple
-  @return: always True, None
-  @warning: the function always returns True
-
-  """
-  if drain_flag:
-    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
-  else:
-    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
-
-
 def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
@@ -2525,6 +2535,7 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
   env = None
   prefix = None
   suffix = None
+  exp_size = None
 
   if ieio == constants.IEIO_FILE:
     (filename, ) = ieargs
@@ -2549,6 +2560,14 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
     elif mode == constants.IEM_EXPORT:
       suffix = "< %s" % quoted_filename
 
+      # Retrieve file size
+      try:
+        st = os.stat(filename)
+      except EnvironmentError, err:
+        logging.error("Can't stat(2) %s: %s", filename, err)
+      else:
+        exp_size = utils.BytesToMebibyte(st.st_size)
+
   elif ieio == constants.IEIO_RAW_DISK:
     (disk, ) = ieargs
 
@@ -2572,6 +2591,7 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
                                    real_disk.dev_path,
                                    str(1024 * 1024), # 1 MB
                                    str(disk.size))
+      exp_size = disk.size
 
   elif ieio == constants.IEIO_SCRIPT:
     (disk, disk_index, ) = ieargs
@@ -2602,10 +2622,13 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
     elif mode == constants.IEM_EXPORT:
       prefix = "%s |" % script_cmd
 
+    # Let script predict size
+    exp_size = constants.IE_CUSTOM_SIZE
+
   else:
     _Fail("Invalid %s I/O mode %r", mode, ieio)
 
-  return (env, prefix, suffix)
+  return (env, prefix, suffix, exp_size)
 
 
 def _CreateImportExportStatusDir(prefix):
@@ -2617,15 +2640,12 @@ def _CreateImportExportStatusDir(prefix):
                                   (prefix, utils.TimestampForFilename())))
 
 
-def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
-                            ieio, ieioargs):
+def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
   """Starts an import or export daemon.
 
   @param mode: Import/output mode
-  @type key_name: string
-  @param key_name: RSA key name (None to use cluster certificate)
-  @type ca: string:
-  @param ca: Remote CA in PEM format (None to use cluster certificate)
+  @type opts: L{objects.ImportExportOptions}
+  @param opts: Daemon options
   @type host: string
   @param host: Remote host for export (None for import)
   @type port: int
@@ -2651,21 +2671,21 @@ def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
   else:
     _Fail("Invalid mode %r", mode)
 
-  if (key_name is None) ^ (ca is None):
+  if (opts.key_name is None) ^ (opts.ca_pem is None):
     _Fail("Cluster certificate can only be used for both key and CA")
 
-  (cmd_env, cmd_prefix, cmd_suffix) = \
+  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
 
-  if key_name is None:
+  if opts.key_name is None:
     # Use server.pem
     key_path = constants.NODED_CERT_FILE
     cert_path = constants.NODED_CERT_FILE
-    assert ca is None
+    assert opts.ca_pem is None
   else:
     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
-                                                 key_name)
-    assert ca is not None
+                                                 opts.key_name)
+    assert opts.ca_pem is not None
 
   for i in [key_path, cert_path]:
     if not os.path.exists(i):
@@ -2677,10 +2697,13 @@ def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
 
-    if ca is None:
+    if opts.ca_pem is None:
       # Use server.pem
       ca = utils.ReadFile(constants.NODED_CERT_FILE)
+    else:
+      ca = opts.ca_pem
 
+    # Write CA file
     utils.WriteFile(ca_file, data=ca, mode=0400)
 
     cmd = [
@@ -2697,6 +2720,15 @@ def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
     if port:
       cmd.append("--port=%s" % port)
 
+    if opts.compress:
+      cmd.append("--compress=%s" % opts.compress)
+
+    if opts.magic:
+      cmd.append("--magic=%s" % opts.magic)
+
+    if exp_size is not None:
+      cmd.append("--expected-size=%s" % exp_size)
+
     if cmd_prefix:
       cmd.append("--cmd-prefix=%s" % cmd_prefix)
 
@@ -2762,7 +2794,7 @@ def AbortImportExport(name):
   if pid:
     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
                  name, pid)
-    os.kill(pid, signal.SIGTERM)
+    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
 
 
 def CleanupImportExport(name):