Update scripts and qa config for changed hypervisor names.
[ganeti-local] / lib / backend.py
index 2adbc04..0df7a00 100644 (file)
@@ -32,6 +32,7 @@ import re
 import subprocess
 import random
 import logging
 import subprocess
 import random
 import logging
+import tempfile
 
 from ganeti import errors
 from ganeti import utils
 
 from ganeti import errors
 from ganeti import utils
@@ -43,22 +44,62 @@ from ganeti import objects
 from ganeti import ssconf
 
 
 from ganeti import ssconf
 
 
-def _GetSshRunner():
-  return ssh.SshRunner()
+def _GetConfig():
+  return ssconf.SimpleConfigReader()
 
 
 
 
-def _GetMasterInfo():
-  """Return the master ip and netdev.
+def _GetSshRunner(cluster_name):
+  return ssh.SshRunner(cluster_name)
+
+
+def _CleanDirectory(path, exclude=[]):
+  """Removes all regular files in a directory.
+
+  @param exclude: List of files to be excluded.
+  @type exclude: list
+
+  """
+  if not os.path.isdir(path):
+    return
+
+  # Normalize excluded paths
+  exclude = [os.path.normpath(i) for i in exclude]
+
+  for rel_name in utils.ListVisibleFiles(path):
+    full_name = os.path.normpath(os.path.join(path, rel_name))
+    if full_name in exclude:
+      continue
+    if os.path.isfile(full_name) and not os.path.islink(full_name):
+      utils.RemoveFile(full_name)
+
+
+def JobQueuePurge():
+  """Removes job queue files and archived jobs
+
+  """
+  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
+  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def GetMasterInfo():
+  """Returns master information.
+
+  This is an utility function to compute master information, either
+  for consumption here or from the node daemon.
+
+  @rtype: tuple
+  @return: (master_netdev, master_ip, master_name)
 
   """
   try:
 
   """
   try:
-    ss = ssconf.SimpleStore()
-    master_netdev = ss.GetMasterNetdev()
-    master_ip = ss.GetMasterIP()
+    cfg = _GetConfig()
+    master_netdev = cfg.GetMasterNetdev()
+    master_ip = cfg.GetMasterIP()
+    master_node = cfg.GetMasterNode()
   except errors.ConfigurationError, err:
     logging.exception("Cluster configuration incomplete")
     return (None, None)
   except errors.ConfigurationError, err:
     logging.exception("Cluster configuration incomplete")
     return (None, None)
-  return (master_netdev, master_ip)
+  return (master_netdev, master_ip, master_node)
 
 
 def StartMaster(start_daemons):
 
 
 def StartMaster(start_daemons):
@@ -71,7 +112,7 @@ def StartMaster(start_daemons):
 
   """
   ok = True
 
   """
   ok = True
-  master_netdev, master_ip = _GetMasterInfo()
+  master_netdev, master_ip, _ = GetMasterInfo()
   if not master_netdev:
     return False
 
   if not master_netdev:
     return False
 
@@ -113,14 +154,14 @@ def StopMaster(stop_daemons):
   stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
   stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
-  master_netdev, master_ip = _GetMasterInfo()
+  master_netdev, master_ip, _ = GetMasterInfo()
   if not master_netdev:
     return False
 
   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
                          "dev", master_netdev])
   if result.failed:
   if not master_netdev:
     return False
 
   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
                          "dev", master_netdev])
   if result.failed:
-    logger.error("Can't remove the master IP, error: %s", result.output)
+    logging.error("Can't remove the master IP, error: %s", result.output)
     # but otherwise ignore the failure
 
   if stop_daemons:
     # but otherwise ignore the failure
 
   if stop_daemons:
@@ -168,11 +209,8 @@ def LeaveCluster():
   """Cleans up the current node and prepares it to be removed from the cluster.
 
   """
   """Cleans up the current node and prepares it to be removed from the cluster.
 
   """
-  if os.path.isdir(constants.DATA_DIR):
-    for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
-      full_name = os.path.join(constants.DATA_DIR, rel_name)
-      if os.path.isfile(full_name) and not os.path.islink(full_name):
-        utils.RemoveFile(full_name)
+  _CleanDirectory(constants.DATA_DIR)
+  JobQueuePurge()
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
@@ -193,18 +231,21 @@ def LeaveCluster():
   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
 
 
   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
 
 
-def GetNodeInfo(vgname):
+def GetNodeInfo(vgname, hypervisor_type):
   """Gives back a hash with different informations about the node.
 
   """Gives back a hash with different informations about the node.
 
-  Returns:
-    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
-      'memory_free' : xxx, 'memory_total' : xxx }
-    where
-    vg_size is the size of the configured volume group in MiB
-    vg_free is the free size of the volume group in MiB
-    memory_dom0 is the memory allocated for domain0 in MiB
-    memory_free is the currently available (free) ram in MiB
-    memory_total is the total number of ram in MiB
+  @type vgname: C{string}
+  @param vgname: the name of the volume group to ask for disk space information
+  @type hypervisor_type: C{str}
+  @param hypervisor_type: the name of the hypervisor to ask for
+      memory information
+  @rtype: C{dict}
+  @return: dictionary with the following keys:
+      - vg_size is the size of the configured volume group in MiB
+      - vg_free is the free size of the volume group in MiB
+      - memory_dom0 is the memory allocated for domain0 in MiB
+      - memory_free is the currently available (free) ram in MiB
+      - memory_total is the total number of ram in MiB
 
   """
   outputarray = {}
 
   """
   outputarray = {}
@@ -212,7 +253,7 @@ def GetNodeInfo(vgname):
   outputarray['vg_size'] = vginfo['vg_size']
   outputarray['vg_free'] = vginfo['vg_free']
 
   outputarray['vg_size'] = vginfo['vg_size']
   outputarray['vg_free'] = vginfo['vg_free']
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(hypervisor_type)
   hyp_info = hyper.GetNodeInfo()
   if hyp_info is not None:
     outputarray.update(hyp_info)
   hyp_info = hyper.GetNodeInfo()
   if hyp_info is not None:
     outputarray.update(hyp_info)
@@ -226,28 +267,39 @@ def GetNodeInfo(vgname):
   return outputarray
 
 
   return outputarray
 
 
-def VerifyNode(what):
+def VerifyNode(what, cluster_name):
   """Verify the status of the local node.
 
   """Verify the status of the local node.
 
-  Args:
-    what - a dictionary of things to check:
-      'filelist' : list of files for which to compute checksums
-      'nodelist' : list of nodes we should check communication with
-      'hypervisor': run the hypervisor-specific verify
+  Based on the input L{what} parameter, various checks are done on the
+  local node.
+
+  If the I{filelist} key is present, this list of
+  files is checksummed and the file/checksum pairs are returned.
 
 
-  Requested files on local node are checksummed and the result returned.
+  If the I{nodelist} key is present, we check that we have
+  connectivity via ssh with the target nodes (and check the hostname
+  report).
+
+  If the I{node-net-test} key is present, we check that we have
+  connectivity to the given nodes via both primary IP and, if
+  applicable, secondary IPs.
+
+  @type what: C{dict}
+  @param what: a dictionary of things to check:
+      - filelist: list of files for which to compute checksums
+      - nodelist: list of nodes we should check ssh communication with
+      - node-net-test: list of nodes we should check node daemon port
+        connectivity with
+      - hypervisor: list with hypervisors to run the verify for
 
 
-  The nodelist is traversed, with the following checks being made
-  for each node:
-  - known_hosts key correct
-  - correct resolving of node name (target node returns its own hostname
-    by ssh-execution of 'hostname', result compared against name in list.
 
   """
   result = {}
 
   if 'hypervisor' in what:
 
   """
   result = {}
 
   if 'hypervisor' in what:
-    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
+    result['hypervisor'] = my_dict = {}
+    for hv_name in what['hypervisor']:
+      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
   if 'filelist' in what:
     result['filelist'] = utils.FingerprintFiles(what['filelist'])
 
   if 'filelist' in what:
     result['filelist'] = utils.FingerprintFiles(what['filelist'])
@@ -256,7 +308,7 @@ def VerifyNode(what):
     result['nodelist'] = {}
     random.shuffle(what['nodelist'])
     for node in what['nodelist']:
     result['nodelist'] = {}
     random.shuffle(what['nodelist'])
     for node in what['nodelist']:
-      success, message = _GetSshRunner().VerifyNodeHostname(node)
+      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
         result['nodelist'][node] = message
   if 'node-net-test' in what:
       if not success:
         result['nodelist'][node] = message
   if 'node-net-test' in what:
@@ -273,7 +325,7 @@ def VerifyNode(what):
                                           " primary/secondary IP"
                                           " in the node list")
     else:
                                           " primary/secondary IP"
                                           " in the node list")
     else:
-      port = ssconf.SimpleStore().GetNodeDaemonPort()
+      port = utils.GetNodeDaemonPort()
       for name, pip, sip in what['node-net-test']:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
       for name, pip, sip in what['node-net-test']:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
@@ -377,41 +429,49 @@ def BridgesExist(bridges_list):
   return True
 
 
   return True
 
 
-def GetInstanceList():
+def GetInstanceList(hypervisor_list):
   """Provides a list of instances.
 
   """Provides a list of instances.
 
-  Returns:
-    A list of all running instances on the current node
-    - instance1.example.com
-    - instance2.example.com
+  @type hypervisor_list: list
+  @param hypervisor_list: the list of hypervisors to query information
+
+  @rtype: list
+  @return: a list of all running instances on the current node
+             - instance1.example.com
+             - instance2.example.com
 
   """
 
   """
-  try:
-    names = hypervisor.GetHypervisor().ListInstances()
-  except errors.HypervisorError, err:
-    logging.exception("Error enumerating instances")
-    raise
+  results = []
+  for hname in hypervisor_list:
+    try:
+      names = hypervisor.GetHypervisor(hname).ListInstances()
+      results.extend(names)
+    except errors.HypervisorError, err:
+      logging.exception("Error enumerating instances for hypevisor %s", hname)
+      # FIXME: should we somehow not propagate this to the master?
+      raise
 
 
-  return names
+  return results
 
 
 
 
-def GetInstanceInfo(instance):
+def GetInstanceInfo(instance, hname):
   """Gives back the informations about an instance as a dictionary.
 
   """Gives back the informations about an instance as a dictionary.
 
-  Args:
-    instance: name of the instance (ex. instance1.example.com)
+  @type instance: string
+  @param instance: the instance name
+  @type hname: string
+  @param hname: the hypervisor type of the instance
 
 
-  Returns:
-    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
-    where
-    memory: memory size of instance (int)
-    state: xen state of instance (string)
-    time: cpu time of instance (float)
+  @rtype: dict
+  @return: dictionary with the following keys:
+      - memory: memory size of instance (int)
+      - state: xen state of instance (string)
+      - time: cpu time of instance (float)
 
   """
   output = {}
 
 
   """
   output = {}
 
-  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
+  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
   if iinfo is not None:
     output['memory'] = iinfo[2]
     output['state'] = iinfo[4]
   if iinfo is not None:
     output['memory'] = iinfo[2]
     output['state'] = iinfo[4]
@@ -420,34 +480,38 @@ def GetInstanceInfo(instance):
   return output
 
 
   return output
 
 
-def GetAllInstancesInfo():
+def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
   This is the equivalent of `GetInstanceInfo()`, except that it
   computes data for all instances at once, thus being faster if one
   needs data about more than one instance.
 
   """Gather data about all instances.
 
   This is the equivalent of `GetInstanceInfo()`, except that it
   computes data for all instances at once, thus being faster if one
   needs data about more than one instance.
 
-  Returns: a dictionary of dictionaries, keys being the instance name,
-    and with values:
-    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
-    where
-    memory: memory size of instance (int)
-    state: xen state of instance (string)
-    time: cpu time of instance (float)
-    vcpus: the number of cpus
+  @type hypervisor_list: list
+  @param hypervisor_list: list of hypervisors to query for instance data
+
+  @rtype: dict of dicts
+  @return: dictionary of instance: data, with data having the following keys:
+      - memory: memory size of instance (int)
+      - state: xen state of instance (string)
+      - time: cpu time of instance (float)
+      - vcpuus: the number of vcpus
 
   """
   output = {}
 
 
   """
   output = {}
 
-  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
-  if iinfo:
-    for name, inst_id, memory, vcpus, state, times in iinfo:
-      output[name] = {
-        'memory': memory,
-        'vcpus': vcpus,
-        'state': state,
-        'time': times,
-        }
+  for hname in hypervisor_list:
+    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
+    if iinfo:
+      for name, inst_id, memory, vcpus, state, times in iinfo:
+        if name in output:
+          raise errors.HypervisorError("Instance %s running duplicate" % name)
+        output[name] = {
+          'memory': memory,
+          'vcpus': vcpus,
+          'state': state,
+          'time': times,
+          }
 
   return output
 
 
   return output
 
@@ -496,8 +560,9 @@ def AddOSToInstance(instance, os_disk, swap_disk):
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
+  env = {'HYPERVISOR': instance.hypervisor}
 
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
@@ -626,17 +691,19 @@ def _GatherBlockDevs(instance):
 def StartInstance(instance, extra_args):
   """Start an instance.
 
 def StartInstance(instance, extra_args):
   """Start an instance.
 
-  Args:
-    instance - name of instance to start.
+  @type instance: instance object
+  @param instance: the instance object
+  @rtype: boolean
+  @return: whether the startup was successful or not
 
   """
 
   """
-  running_instances = GetInstanceList()
+  running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
     return True
 
   block_devices = _GatherBlockDevs(instance)
 
   if instance.name in running_instances:
     return True
 
   block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.StartInstance(instance, block_devices, extra_args)
 
   try:
     hyper.StartInstance(instance, block_devices, extra_args)
@@ -650,16 +717,19 @@ def StartInstance(instance, extra_args):
 def ShutdownInstance(instance):
   """Shut an instance down.
 
 def ShutdownInstance(instance):
   """Shut an instance down.
 
-  Args:
-    instance - name of instance to shutdown.
+  @type instance: instance object
+  @param instance: the instance object
+  @rtype: boolean
+  @return: whether the startup was successful or not
 
   """
 
   """
-  running_instances = GetInstanceList()
+  hv_name = instance.hypervisor
+  running_instances = GetInstanceList([hv_name])
 
   if instance.name not in running_instances:
     return True
 
 
   if instance.name not in running_instances:
     return True
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(hv_name)
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
@@ -671,7 +741,7 @@ def ShutdownInstance(instance):
 
   time.sleep(1)
   for dummy in range(11):
 
   time.sleep(1)
   for dummy in range(11):
-    if instance.name not in GetInstanceList():
+    if instance.name not in GetInstanceList([hv_name]):
       break
     time.sleep(10)
   else:
       break
     time.sleep(10)
   else:
@@ -685,7 +755,7 @@ def ShutdownInstance(instance):
       return False
 
     time.sleep(1)
       return False
 
     time.sleep(1)
-    if instance.name in GetInstanceList():
+    if instance.name in GetInstanceList([hv_name]):
       logging.error("could not shutdown instance '%s' even by destroy",
                     instance.name)
       return False
       logging.error("could not shutdown instance '%s' even by destroy",
                     instance.name)
       return False
@@ -701,13 +771,13 @@ def RebootInstance(instance, reboot_type, extra_args):
     reboot_type - how to reboot [soft,hard,full]
 
   """
     reboot_type - how to reboot [soft,hard,full]
 
   """
-  running_instances = GetInstanceList()
+  running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name not in running_instances:
     logging.error("Cannot reboot instance that is not running")
     return False
 
 
   if instance.name not in running_instances:
     logging.error("Cannot reboot instance that is not running")
     return False
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
     try:
       hyper.RebootInstance(instance)
   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
     try:
       hyper.RebootInstance(instance)
@@ -724,18 +794,29 @@ def RebootInstance(instance, reboot_type, extra_args):
   else:
     raise errors.ParameterError("reboot_type invalid")
 
   else:
     raise errors.ParameterError("reboot_type invalid")
 
-
   return True
 
 
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
   return True
 
 
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
+  @type instance: C{objects.Instance}
+  @param instance: the instance definition
+  @type target: string
+  @param target: the target node name
+  @type live: boolean
+  @param live: whether the migration should be done live or not (the
+      interpretation of this parameter is left to the hypervisor)
+  @rtype: tuple
+  @return: a tuple of (success, msg) where:
+      - succes is a boolean denoting the success/failure of the operation
+      - msg is a string with details in case of failure
+
   """
   """
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
 
   try:
 
   try:
-    hyper.MigrateInstance(instance, target, live)
+    hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
     msg = "Failed to migrate instance: %s" % str(err)
     logging.error(msg)
   except errors.HypervisorError, err:
     msg = "Failed to migrate instance: %s" % str(err)
     logging.error(msg)
@@ -1036,9 +1117,8 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
     constants.VNC_PASSWORD_FILE,
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
     constants.VNC_PASSWORD_FILE,
-    constants.JOB_QUEUE_SERIAL_FILE,
     ]
     ]
-  allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
   if file_name not in allowed_files:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
   if file_name not in allowed_files:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
@@ -1256,7 +1336,7 @@ def SnapshotBlockDevice(disk):
                                  (disk.unique_id, disk.dev_type))
 
 
                                  (disk.unique_id, disk.dev_type))
 
 
-def ExportSnapshot(disk, dest_node, instance):
+def ExportSnapshot(disk, dest_node, instance, cluster_name):
   """Export a block device snapshot to a remote node.
 
   Args:
   """Export a block device snapshot to a remote node.
 
   Args:
@@ -1297,8 +1377,9 @@ def ExportSnapshot(disk, dest_node, instance):
 
   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
                                 destdir, destdir, destfile)
 
   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
                                 destdir, destdir, destfile)
-  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
-                                       destcmd)
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
 
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
 
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
@@ -1347,7 +1428,8 @@ def FinalizeExport(instance, snap_disks):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+               '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
@@ -1396,7 +1478,8 @@ def ExportInfo(dest):
   return config
 
 
   return config
 
 
-def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
+def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
+                         cluster_name):
   """Import an os image into an instance.
 
   Args:
   """Import an os image into an instance.
 
   Args:
@@ -1441,8 +1524,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
   destcmd = utils.BuildShellCmd('cat %s', src_image)
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
   destcmd = utils.BuildShellCmd('cat %s', src_image)
-  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
-                                       destcmd)
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
 
   comprcmd = "gunzip"
   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
 
   comprcmd = "gunzip"
   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
@@ -1451,8 +1535,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+  env = {'HYPERVISOR': instance.hypervisor}
 
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
@@ -1536,8 +1621,9 @@ def _TransformFileStorageDir(file_storage_dir):
     normalized file_storage_dir (string) if valid, None otherwise
 
   """
     normalized file_storage_dir (string) if valid, None otherwise
 
   """
+  cfg = _GetConfig()
   file_storage_dir = os.path.normpath(file_storage_dir)
   file_storage_dir = os.path.normpath(file_storage_dir)
-  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
+  base_file_storage_dir = cfg.GetFileStorageDir()
   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
       base_file_storage_dir):
     logging.error("file storage directory '%s' is not under base file"
   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
       base_file_storage_dir):
     logging.error("file storage directory '%s' is not under base file"
@@ -1646,6 +1732,45 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
   return result
 
 
   return result
 
 
+def _IsJobQueueFile(file_name):
+  """Checks whether the given filename is in the queue directory.
+
+  """
+  queue_dir = os.path.normpath(constants.QUEUE_DIR)
+  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+  if not result:
+    logging.error("'%s' is not a file in the queue directory",
+                  file_name)
+
+  return result
+
+
+def JobQueueUpdate(file_name, content):
+  """Updates a file in the queue directory.
+
+  """
+  if not _IsJobQueueFile(file_name):
+    return False
+
+  # Write and replace the file atomically
+  utils.WriteFile(file_name, data=content)
+
+  return True
+
+
+def JobQueueRename(old, new):
+  """Renames a job queue file.
+
+  """
+  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+    return False
+
+  os.rename(old, new)
+
+  return True
+
+
 def CloseBlockDevices(disks):
   """Closes the given block devices.
 
 def CloseBlockDevices(disks):
   """Closes the given block devices.