Convert http.py to use the logging module
[ganeti-local] / lib / backend.py
index b9e8a33..7e25c68 100644 (file)
@@ -30,8 +30,10 @@ import stat
 import errno
 import re
 import subprocess
+import random
+import logging
+import tempfile
 
-from ganeti import logger
 from ganeti import errors
 from ganeti import utils
 from ganeti import ssh
@@ -42,40 +44,129 @@ from ganeti import objects
 from ganeti import ssconf
 
 
-def _GetSshRunner():
-  return ssh.SshRunner()
+def _GetConfig():
+  return ssconf.SimpleConfigReader()
 
 
-def StartMaster():
-  """Activate local node as master node.
+def _GetSshRunner(cluster_name):
+  return ssh.SshRunner(cluster_name)
+
 
-  There are two needed steps for this:
-    - run the master script
-    - register the cron script
+def _CleanDirectory(path, exclude=[]):
+  """Removes all regular files in a directory.
+
+  @param exclude: List of files to be excluded.
+  @type exclude: list
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
+  if not os.path.isdir(path):
+    return
 
-  if result.failed:
-    logger.Error("could not activate cluster interface with command %s,"
-                 " error: '%s'" % (result.cmd, result.output))
-    return False
+  # Normalize excluded paths
+  exclude = [os.path.normpath(i) for i in exclude]
 
-  return True
+  for rel_name in utils.ListVisibleFiles(path):
+    full_name = os.path.normpath(os.path.join(path, rel_name))
+    if full_name in exclude:
+      continue
+    if os.path.isfile(full_name) and not os.path.islink(full_name):
+      utils.RemoveFile(full_name)
+
+
+def JobQueuePurge():
+  """Removes job queue files and archived jobs
+
+  """
+  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
+  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def GetMasterInfo():
+  """Returns master information.
 
+  This is an utility function to compute master information, either
+  for consumption here or from the node daemon.
 
-def StopMaster():
+  @rtype: tuple
+  @return: (master_netdev, master_ip, master_name)
+
+  """
+  try:
+    cfg = _GetConfig()
+    master_netdev = cfg.GetMasterNetdev()
+    master_ip = cfg.GetMasterIP()
+    master_node = cfg.GetMasterNode()
+  except errors.ConfigurationError, err:
+    logging.exception("Cluster configuration incomplete")
+    return (None, None)
+  return (master_netdev, master_ip, master_node)
+
+
+def StartMaster(start_daemons):
+  """Activate local node as master node.
+
+  The function will always try activate the IP address of the master
+  (if someone else has it, then it won't). Then, if the start_daemons
+  parameter is True, it will also start the master daemons
+  (ganet-masterd and ganeti-rapi).
+
+  """
+  ok = True
+  master_netdev, master_ip, _ = GetMasterInfo()
+  if not master_netdev:
+    return False
+
+  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+    if utils.OwnIpAddress(master_ip):
+      # we already have the ip:
+      logging.debug("Already started")
+    else:
+      logging.error("Someone else has the master ip, not activating")
+      ok = False
+  else:
+    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
+                           "dev", master_netdev, "label",
+                           "%s:0" % master_netdev])
+    if result.failed:
+      logging.error("Can't activate master IP: %s", result.output)
+      ok = False
+
+    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
+                           "-s", master_ip, master_ip])
+    # we'll ignore the exit code of arping
+
+  # and now start the master and rapi daemons
+  if start_daemons:
+    for daemon in 'ganeti-masterd', 'ganeti-rapi':
+      result = utils.RunCmd([daemon])
+      if result.failed:
+        logging.error("Can't start daemon %s: %s", daemon, result.output)
+        ok = False
+  return ok
+
+
+def StopMaster(stop_daemons):
   """Deactivate this node as master.
 
-  This runs the master stop script.
+  The function will always try to deactivate the IP address of the
+  master. Then, if the stop_daemons parameter is True, it will also
+  stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
+  master_netdev, master_ip, _ = GetMasterInfo()
+  if not master_netdev:
+    return False
 
+  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
+                         "dev", master_netdev])
   if result.failed:
-    logger.Error("could not deactivate cluster interface with command %s,"
-                 " error: '%s'" % (result.cmd, result.output))
-    return False
+    logging.error("Can't remove the master IP, error: %s", result.output)
+    # but otherwise ignore the failure
+
+  if stop_daemons:
+    # stop/kill the rapi and the master daemon
+    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
 
   return True
 
@@ -100,7 +191,7 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
                                                     mkdir=True)
   except errors.OpExecError, err:
-    logger.Error("Error while processing user ssh files: %s" % err)
+    logging.exception("Error while processing user ssh files")
     return False
 
   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
@@ -117,16 +208,13 @@ def LeaveCluster():
   """Cleans up the current node and prepares it to be removed from the cluster.
 
   """
-  if os.path.isdir(constants.DATA_DIR):
-    for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
-      full_name = os.path.join(constants.DATA_DIR, rel_name)
-      if os.path.isfile(full_name) and not os.path.islink(full_name):
-        utils.RemoveFile(full_name)
+  _CleanDirectory(constants.DATA_DIR)
+  JobQueuePurge()
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
-  except errors.OpExecError, err:
-    logger.Error("Error while processing ssh files: %s" % err)
+  except errors.OpExecError:
+    logging.exception("Error while processing ssh files")
     return
 
   f = open(pub_key, 'r')
@@ -138,19 +226,25 @@ def LeaveCluster():
   utils.RemoveFile(priv_key)
   utils.RemoveFile(pub_key)
 
+  # Return a reassuring string to the caller, and quit
+  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
 
-def GetNodeInfo(vgname):
+
+def GetNodeInfo(vgname, hypervisor_type):
   """Gives back a hash with different informations about the node.
 
-  Returns:
-    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
-      'memory_free' : xxx, 'memory_total' : xxx }
-    where
-    vg_size is the size of the configured volume group in MiB
-    vg_free is the free size of the volume group in MiB
-    memory_dom0 is the memory allocated for domain0 in MiB
-    memory_free is the currently available (free) ram in MiB
-    memory_total is the total number of ram in MiB
+  @type vgname: C{string}
+  @param vgname: the name of the volume group to ask for disk space information
+  @type hypervisor_type: C{str}
+  @param hypervisor_type: the name of the hypervisor to ask for
+      memory information
+  @rtype: C{dict}
+  @return: dictionary with the following keys:
+      - vg_size is the size of the configured volume group in MiB
+      - vg_free is the free size of the volume group in MiB
+      - memory_dom0 is the memory allocated for domain0 in MiB
+      - memory_free is the currently available (free) ram in MiB
+      - memory_total is the total number of ram in MiB
 
   """
   outputarray = {}
@@ -158,7 +252,7 @@ def GetNodeInfo(vgname):
   outputarray['vg_size'] = vginfo['vg_size']
   outputarray['vg_free'] = vginfo['vg_free']
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(hypervisor_type)
   hyp_info = hyper.GetNodeInfo()
   if hyp_info is not None:
     outputarray.update(hyp_info)
@@ -172,38 +266,76 @@ def GetNodeInfo(vgname):
   return outputarray
 
 
-def VerifyNode(what):
+def VerifyNode(what, cluster_name):
   """Verify the status of the local node.
 
-  Args:
-    what - a dictionary of things to check:
-      'filelist' : list of files for which to compute checksums
-      'nodelist' : list of nodes we should check communication with
-      'hypervisor': run the hypervisor-specific verify
+  Based on the input L{what} parameter, various checks are done on the
+  local node.
+
+  If the I{filelist} key is present, this list of
+  files is checksummed and the file/checksum pairs are returned.
+
+  If the I{nodelist} key is present, we check that we have
+  connectivity via ssh with the target nodes (and check the hostname
+  report).
 
-  Requested files on local node are checksummed and the result returned.
+  If the I{node-net-test} key is present, we check that we have
+  connectivity to the given nodes via both primary IP and, if
+  applicable, secondary IPs.
 
-  The nodelist is traversed, with the following checks being made
-  for each node:
-  - known_hosts key correct
-  - correct resolving of node name (target node returns its own hostname
-    by ssh-execution of 'hostname', result compared against name in list.
+  @type what: C{dict}
+  @param what: a dictionary of things to check:
+      - filelist: list of files for which to compute checksums
+      - nodelist: list of nodes we should check ssh communication with
+      - node-net-test: list of nodes we should check node daemon port
+        connectivity with
+      - hypervisor: list with hypervisors to run the verify for
 
   """
   result = {}
 
   if 'hypervisor' in what:
-    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
+    result['hypervisor'] = my_dict = {}
+    for hv_name in what['hypervisor']:
+      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
   if 'filelist' in what:
     result['filelist'] = utils.FingerprintFiles(what['filelist'])
 
   if 'nodelist' in what:
     result['nodelist'] = {}
+    random.shuffle(what['nodelist'])
     for node in what['nodelist']:
-      success, message = _GetSshRunner().VerifyNodeHostname(node)
+      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
         result['nodelist'][node] = message
+  if 'node-net-test' in what:
+    result['node-net-test'] = {}
+    my_name = utils.HostInfo().name
+    my_pip = my_sip = None
+    for name, pip, sip in what['node-net-test']:
+      if name == my_name:
+        my_pip = pip
+        my_sip = sip
+        break
+    if not my_pip:
+      result['node-net-test'][my_name] = ("Can't find my own"
+                                          " primary/secondary IP"
+                                          " in the node list")
+    else:
+      port = utils.GetNodeDaemonPort()
+      for name, pip, sip in what['node-net-test']:
+        fail = []
+        if not utils.TcpPing(pip, port, source=my_pip):
+          fail.append("primary")
+        if sip != pip:
+          if not utils.TcpPing(sip, port, source=my_sip):
+            fail.append("secondary")
+        if fail:
+          result['node-net-test'][name] = ("failure using the %s"
+                                           " interface(s)" %
+                                           " and ".join(fail))
+
   return result
 
 
@@ -222,15 +354,18 @@ def GetVolumeList(vg_name):
                          "--separator=%s" % sep,
                          "-olv_name,lv_size,lv_attr", vg_name])
   if result.failed:
-    logger.Error("Failed to list logical volumes, lvs output: %s" %
-                 result.output)
+    logging.error("Failed to list logical volumes, lvs output: %s",
+                  result.output)
     return result.output
 
+  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
   for line in result.stdout.splitlines():
-    line = line.strip().rstrip(sep)
-    name, size, attr = line.split(sep)
-    if len(attr) != 6:
-      attr = '------'
+    line = line.strip()
+    match = valid_line_re.match(line)
+    if not match:
+      logging.error("Invalid line returned from lvs output: '%s'", line)
+      continue
+    name, size, attr = match.groups()
     inactive = attr[4] == '-'
     online = attr[5] == 'o'
     lvs[name] = (size, inactive, online)
@@ -256,8 +391,8 @@ def NodeVolumes():
                          "--separator=|",
                          "--options=lv_name,lv_size,devices,vg_name"])
   if result.failed:
-    logger.Error("Failed to list logical volumes, lvs output: %s" %
-                 result.output)
+    logging.error("Failed to list logical volumes, lvs output: %s",
+                  result.output)
     return {}
 
   def parse_dev(dev):
@@ -274,7 +409,8 @@ def NodeVolumes():
       'vg': line[3].strip(),
     }
 
-  return [map_line(line.split('|')) for line in result.stdout.splitlines()]
+  return [map_line(line.split('|')) for line in result.stdout.splitlines()
+          if line.count('|') >= 3]
 
 
 def BridgesExist(bridges_list):
@@ -291,41 +427,49 @@ def BridgesExist(bridges_list):
   return True
 
 
-def GetInstanceList():
+def GetInstanceList(hypervisor_list):
   """Provides a list of instances.
 
-  Returns:
-    A list of all running instances on the current node
-    - instance1.example.com
-    - instance2.example.com
+  @type hypervisor_list: list
+  @param hypervisor_list: the list of hypervisors to query information
+
+  @rtype: list
+  @return: a list of all running instances on the current node
+             - instance1.example.com
+             - instance2.example.com
 
   """
-  try:
-    names = hypervisor.GetHypervisor().ListInstances()
-  except errors.HypervisorError, err:
-    logger.Error("error enumerating instances: %s" % str(err))
-    raise
+  results = []
+  for hname in hypervisor_list:
+    try:
+      names = hypervisor.GetHypervisor(hname).ListInstances()
+      results.extend(names)
+    except errors.HypervisorError, err:
+      logging.exception("Error enumerating instances for hypevisor %s", hname)
+      # FIXME: should we somehow not propagate this to the master?
+      raise
 
-  return names
+  return results
 
 
-def GetInstanceInfo(instance):
+def GetInstanceInfo(instance, hname):
   """Gives back the informations about an instance as a dictionary.
 
-  Args:
-    instance: name of the instance (ex. instance1.example.com)
+  @type instance: string
+  @param instance: the instance name
+  @type hname: string
+  @param hname: the hypervisor type of the instance
 
-  Returns:
-    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
-    where
-    memory: memory size of instance (int)
-    state: xen state of instance (string)
-    time: cpu time of instance (float)
+  @rtype: dict
+  @return: dictionary with the following keys:
+      - memory: memory size of instance (int)
+      - state: xen state of instance (string)
+      - time: cpu time of instance (float)
 
   """
   output = {}
 
-  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
+  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
   if iinfo is not None:
     output['memory'] = iinfo[2]
     output['state'] = iinfo[4]
@@ -334,128 +478,88 @@ def GetInstanceInfo(instance):
   return output
 
 
-def GetAllInstancesInfo():
+def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
   This is the equivalent of `GetInstanceInfo()`, except that it
   computes data for all instances at once, thus being faster if one
   needs data about more than one instance.
 
-  Returns: a dictionary of dictionaries, keys being the instance name,
-    and with values:
-    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
-    where
-    memory: memory size of instance (int)
-    state: xen state of instance (string)
-    time: cpu time of instance (float)
-    vcpus: the number of cpus
+  @type hypervisor_list: list
+  @param hypervisor_list: list of hypervisors to query for instance data
+
+  @rtype: dict of dicts
+  @return: dictionary of instance: data, with data having the following keys:
+      - memory: memory size of instance (int)
+      - state: xen state of instance (string)
+      - time: cpu time of instance (float)
+      - vcpuus: the number of vcpus
 
   """
   output = {}
 
-  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
-  if iinfo:
-    for name, inst_id, memory, vcpus, state, times in iinfo:
-      output[name] = {
-        'memory': memory,
-        'vcpus': vcpus,
-        'state': state,
-        'time': times,
-        }
+  for hname in hypervisor_list:
+    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
+    if iinfo:
+      for name, inst_id, memory, vcpus, state, times in iinfo:
+        value = {
+          'memory': memory,
+          'vcpus': vcpus,
+          'state': state,
+          'time': times,
+          }
+        if name in output and output[name] != value:
+          raise errors.HypervisorError("Instance %s running duplicate"
+                                       " with different parameters" % name)
+        output[name] = value
 
   return output
 
 
-def AddOSToInstance(instance, os_disk, swap_disk):
+def AddOSToInstance(instance):
   """Add an OS to an instance.
 
-  Args:
-    instance: the instance object
-    os_disk: the instance-visible name of the os device
-    swap_disk: the instance-visible name of the swap device
+  @type instance: L{objects.Instance}
+  @param instance: Instance whose OS is to be installed
 
   """
   inst_os = OSFromDisk(instance.os)
 
   create_script = inst_os.create_script
-
-  os_device = instance.FindDisk(os_disk)
-  if os_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % os_disk)
-    return False
-
-  swap_device = instance.FindDisk(swap_disk)
-  if swap_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
-    return False
-
-  real_os_dev = _RecursiveFindBD(os_device)
-  if real_os_dev is None:
-    raise errors.BlockDeviceError("Block device '%s' is not set up" %
-                                  str(os_device))
-  real_os_dev.Open()
-
-  real_swap_dev = _RecursiveFindBD(swap_device)
-  if real_swap_dev is None:
-    raise errors.BlockDeviceError("Block device '%s' is not set up" %
-                                  str(swap_device))
-  real_swap_dev.Open()
+  create_env = OSEnvironment(instance)
 
   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
                                      instance.name, int(time.time()))
   if not os.path.exists(constants.LOG_OS_DIR):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
-  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
-                                inst_os.path, create_script, instance.name,
-                                real_os_dev.dev_path, real_swap_dev.dev_path,
-                                logfile)
+  command = utils.BuildShellCmd("cd %s && %s &>%s",
+                                inst_os.path, create_script, logfile)
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=create_env)
   if result.failed:
-    logger.Error("os create command '%s' returned error: %s, logfile: %s,"
-                 " output: %s" %
-                 (command, result.fail_reason, logfile, result.output))
+    logging.error("os create command '%s' returned error: %s, logfile: %s,"
+                  " output: %s", command, result.fail_reason, logfile,
+                  result.output)
     return False
 
   return True
 
 
-def RunRenameInstance(instance, old_name, os_disk, swap_disk):
+def RunRenameInstance(instance, old_name):
   """Run the OS rename script for an instance.
 
-  Args:
-    instance: the instance object
-    old_name: the old name of the instance
-    os_disk: the instance-visible name of the os device
-    swap_disk: the instance-visible name of the swap device
+  @type instance: objects.Instance
+  @param instance: Instance whose OS is to be installed
+  @type old_name: string
+  @param old_name: previous instance name
 
   """
   inst_os = OSFromDisk(instance.os)
 
   script = inst_os.rename_script
-
-  os_device = instance.FindDisk(os_disk)
-  if os_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % os_disk)
-    return False
-
-  swap_device = instance.FindDisk(swap_disk)
-  if swap_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
-    return False
-
-  real_os_dev = _RecursiveFindBD(os_device)
-  if real_os_dev is None:
-    raise errors.BlockDeviceError("Block device '%s' is not set up" %
-                                  str(os_device))
-  real_os_dev.Open()
-
-  real_swap_dev = _RecursiveFindBD(swap_device)
-  if real_swap_dev is None:
-    raise errors.BlockDeviceError("Block device '%s' is not set up" %
-                                  str(swap_device))
-  real_swap_dev.Open()
+  rename_env = OSEnvironment(instance)
+  rename_env['OLD_INSTANCE_NAME'] = old_name
 
   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
                                            old_name,
@@ -463,17 +567,14 @@ def RunRenameInstance(instance, old_name, os_disk, swap_disk):
   if not os.path.exists(constants.LOG_OS_DIR):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
-  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
-                                inst_os.path, script, old_name, instance.name,
-                                real_os_dev.dev_path, real_swap_dev.dev_path,
-                                logfile)
+  command = utils.BuildShellCmd("cd %s && %s &>%s",
+                                inst_os.path, script, logfile)
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=rename_env)
 
   if result.failed:
-    logger.Error("os create command '%s' returned error: %s"
-                 " output: %s" %
-                 (command, result.fail_reason, result.output))
+    logging.error("os create command '%s' returned error: %s output: %s",
+                  command, result.fail_reason, result.output)
     return False
 
   return True
@@ -502,8 +603,7 @@ def _GetVGInfo(vg_name):
                          "--nosuffix", "--units=m", "--separator=:", vg_name])
 
   if retval.failed:
-    errmsg = "volume group %s not present" % vg_name
-    logger.Error(errmsg)
+    logging.error("volume group %s not present", vg_name)
     return retdic
   valarr = retval.stdout.strip().rstrip(':').split(':')
   if len(valarr) == 3:
@@ -514,10 +614,10 @@ def _GetVGInfo(vg_name):
         "pv_count": int(valarr[2]),
         }
     except ValueError, err:
-      logger.Error("Fail to parse vgs output: %s" % str(err))
+      logging.exception("Fail to parse vgs output")
   else:
-    logger.Error("vgs output has the wrong number of fields (expected"
-                 " three): %s" % str(valarr))
+    logging.error("vgs output has the wrong number of fields (expected"
+                  " three): %s", str(valarr))
   return retdic
 
 
@@ -542,22 +642,24 @@ def _GatherBlockDevs(instance):
 def StartInstance(instance, extra_args):
   """Start an instance.
 
-  Args:
-    instance - name of instance to start.
+  @type instance: instance object
+  @param instance: the instance object
+  @rtype: boolean
+  @return: whether the startup was successful or not
 
   """
-  running_instances = GetInstanceList()
+  running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
     return True
 
   block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.StartInstance(instance, block_devices, extra_args)
   except errors.HypervisorError, err:
-    logger.Error("Failed to start instance: %s" % err)
+    logging.exception("Failed to start instance")
     return False
 
   return True
@@ -566,20 +668,23 @@ def StartInstance(instance, extra_args):
 def ShutdownInstance(instance):
   """Shut an instance down.
 
-  Args:
-    instance - name of instance to shutdown.
+  @type instance: instance object
+  @param instance: the instance object
+  @rtype: boolean
+  @return: whether the startup was successful or not
 
   """
-  running_instances = GetInstanceList()
+  hv_name = instance.hypervisor
+  running_instances = GetInstanceList([hv_name])
 
   if instance.name not in running_instances:
     return True
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(hv_name)
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
-    logger.Error("Failed to stop instance: %s" % err)
+    logging.error("Failed to stop instance")
     return False
 
   # test every 10secs for 2min
@@ -587,22 +692,23 @@ def ShutdownInstance(instance):
 
   time.sleep(1)
   for dummy in range(11):
-    if instance.name not in GetInstanceList():
+    if instance.name not in GetInstanceList([hv_name]):
       break
     time.sleep(10)
   else:
     # the shutdown did not succeed
-    logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance)
+    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      logger.Error("Failed to stop instance: %s" % err)
+      logging.exception("Failed to stop instance")
       return False
 
     time.sleep(1)
-    if instance.name in GetInstanceList():
-      logger.Error("could not shutdown instance '%s' even by destroy")
+    if instance.name in GetInstanceList([hv_name]):
+      logging.error("could not shutdown instance '%s' even by destroy",
+                    instance.name)
       return False
 
   return True
@@ -616,33 +722,59 @@ def RebootInstance(instance, reboot_type, extra_args):
     reboot_type - how to reboot [soft,hard,full]
 
   """
-  running_instances = GetInstanceList()
+  running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name not in running_instances:
-    logger.Error("Cannot reboot instance that is not running")
+    logging.error("Cannot reboot instance that is not running")
     return False
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
     try:
       hyper.RebootInstance(instance)
     except errors.HypervisorError, err:
-      logger.Error("Failed to soft reboot instance: %s" % err)
+      logging.exception("Failed to soft reboot instance")
       return False
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
       ShutdownInstance(instance)
       StartInstance(instance, extra_args)
     except errors.HypervisorError, err:
-      logger.Error("Failed to hard reboot instance: %s" % err)
+      logging.exception("Failed to hard reboot instance")
       return False
   else:
     raise errors.ParameterError("reboot_type invalid")
 
-
   return True
 
 
+def MigrateInstance(instance, target, live):
+  """Migrates an instance to another node.
+
+  @type instance: C{objects.Instance}
+  @param instance: the instance definition
+  @type target: string
+  @param target: the target node name
+  @type live: boolean
+  @param live: whether the migration should be done live or not (the
+      interpretation of this parameter is left to the hypervisor)
+  @rtype: tuple
+  @return: a tuple of (success, msg) where:
+      - succes is a boolean denoting the success/failure of the operation
+      - msg is a string with details in case of failure
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
+
+  try:
+    hyper.MigrateInstance(instance.name, target, live)
+  except errors.HypervisorError, err:
+    msg = "Failed to migrate instance: %s" % str(err)
+    logging.error(msg)
+    return (False, msg)
+  return (True, "Migration successfull")
+
+
 def CreateBlockDevice(disk, size, owner, on_primary, info):
   """Creates a block device for an instance.
 
@@ -671,7 +803,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   try:
     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
     if device is not None:
-      logger.Info("removing existing device %s" % disk)
+      logging.info("removing existing device %s", disk)
       device.Remove()
   except errors.BlockDeviceError, err:
     pass
@@ -684,7 +816,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
       errorstring = "Can't assemble device after creation"
-      logger.Error(errorstring)
+      logging.error(errorstring)
       raise errors.BlockDeviceError("%s, very unusual event - check the node"
                                     " daemon logs" % errorstring)
     device.SetSyncSpeed(constants.SYNC_SPEED)
@@ -711,7 +843,7 @@ def RemoveBlockDevice(disk):
     rdev = _RecursiveFindBD(disk, allow_partial=True)
   except errors.BlockDeviceError, err:
     # probably can't attach
-    logger.Info("Can't attach to device %s in remove" % disk)
+    logging.info("Can't attach to device %s in remove", disk)
     rdev = None
   if rdev is not None:
     r_path = rdev.dev_path
@@ -757,7 +889,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
         if children.count(None) >= mcn:
           raise
         cdev = None
-        logger.Debug("Error in child activation: %s" % str(err))
+        logging.debug("Error in child activation: %s", str(err))
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
@@ -821,12 +953,12 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   """
   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
   if parent_bdev is None:
-    logger.Error("Can't find parent device")
+    logging.error("Can't find parent device")
     return False
   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
   if new_bdevs.count(None) > 0:
-    logger.Error("Can't find new device(s) to add: %s:%s" %
-                 (new_bdevs, new_cdevs))
+    logging.error("Can't find new device(s) to add: %s:%s",
+                  new_bdevs, new_cdevs)
     return False
   parent_bdev.AddChildren(new_bdevs)
   return True
@@ -838,7 +970,7 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs):
   """
   parent_bdev = _RecursiveFindBD(parent_cdev)
   if parent_bdev is None:
-    logger.Error("Can't find parent in remove children: %s" % parent_cdev)
+    logging.error("Can't find parent in remove children: %s", parent_cdev)
     return False
   devs = []
   for disk in new_cdevs:
@@ -846,8 +978,8 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs):
     if rpath is None:
       bd = _RecursiveFindBD(disk)
       if bd is None:
-        logger.Error("Can't find dynamic device %s while removing children" %
-                     disk)
+        logging.error("Can't find dynamic device %s while removing children",
+                      disk)
         return False
       else:
         devs.append(bd.dev_path)
@@ -927,19 +1059,20 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
 
   """
   if not os.path.isabs(file_name):
-    logger.Error("Filename passed to UploadFile is not absolute: '%s'" %
-                 file_name)
+    logging.error("Filename passed to UploadFile is not absolute: '%s'",
+                  file_name)
     return False
 
   allowed_files = [
     constants.CLUSTER_CONF_FILE,
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
+    constants.VNC_PASSWORD_FILE,
     ]
-  allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
   if file_name not in allowed_files:
-    logger.Error("Filename passed to UploadFile not in allowed"
-                 " upload targets: '%s'" % file_name)
+    logging.error("Filename passed to UploadFile not in allowed"
+                 " upload targets: '%s'", file_name)
     return False
 
   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
@@ -962,28 +1095,6 @@ def _ErrnoOrStr(err):
   return detail
 
 
-def _OSSearch(name, search_path=None):
-  """Search for OSes with the given name in the search_path.
-
-  Args:
-    name: The name of the OS to look for
-    search_path: List of dirs to search (defaults to constants.OS_SEARCH_PATH)
-
-  Returns:
-    The base_dir the OS resides in
-
-  """
-  if search_path is None:
-    search_path = constants.OS_SEARCH_PATH
-
-  for dir_name in search_path:
-    t_os_dir = os.path.sep.join([dir_name, name])
-    if os.path.isdir(t_os_dir):
-      return dir_name
-
-  return None
-
-
 def _OSOndiskVersion(name, os_dir):
   """Compute and return the API version of a given OS.
 
@@ -1009,21 +1120,21 @@ def _OSOndiskVersion(name, os_dir):
   try:
     f = open(api_file)
     try:
-      api_version = f.read(256)
+      api_versions = f.readlines()
     finally:
       f.close()
   except EnvironmentError, err:
     raise errors.InvalidOS(name, os_dir, "error while reading the"
                            " API version (%s)" % _ErrnoOrStr(err))
 
-  api_version = api_version.strip()
+  api_versions = [version.strip() for version in api_versions]
   try:
-    api_version = int(api_version)
+    api_versions = [int(version) for version in api_versions]
   except (TypeError, ValueError), err:
     raise errors.InvalidOS(name, os_dir,
                            "API version is not integer (%s)" % str(err))
 
-  return api_version
+  return api_versions
 
 
 def DiagnoseOS(top_dirs=None):
@@ -1045,8 +1156,7 @@ def DiagnoseOS(top_dirs=None):
       try:
         f_names = utils.ListVisibleFiles(dir_name)
       except EnvironmentError, err:
-        logger.Error("Can't list the OS directory %s: %s" %
-                     (dir_name, str(err)))
+        logging.exception("Can't list the OS directory %s", dir_name)
         break
       for name in f_names:
         try:
@@ -1066,28 +1176,28 @@ def OSFromDisk(name, base_dir=None):
   `errors.InvalidOS` exception, detailing why this is not a valid
   OS.
 
-  Args:
-    os_dir: Directory containing the OS scripts. Defaults to a search
-            in all the OS_SEARCH_PATH directories.
+  @type base_dir: string
+  @keyword base_dir: Base directory containing OS installations.
+                     Defaults to a search in all the OS_SEARCH_PATH dirs.
 
   """
 
   if base_dir is None:
-    base_dir = _OSSearch(name)
-
-  if base_dir is None:
-    raise errors.InvalidOS(name, None, "OS dir not found in search path")
+    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
+    if os_dir is None:
+      raise errors.InvalidOS(name, None, "OS dir not found in search path")
+  else:
+    os_dir = os.path.sep.join([base_dir, name])
 
-  os_dir = os.path.sep.join([base_dir, name])
-  api_version = _OSOndiskVersion(name, os_dir)
+  api_versions = _OSOndiskVersion(name, os_dir)
 
-  if api_version != constants.OS_API_VERSION:
+  if constants.OS_API_VERSION not in api_versions:
     raise errors.InvalidOS(name, os_dir, "API version mismatch"
                            " (found %s want %s)"
-                           % (api_version, constants.OS_API_VERSION))
+                           % (api_versions, constants.OS_API_VERSION))
 
   # OS Scripts dictionary, we will populate it with the actual script names
-  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
+  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
 
   for script in os_scripts:
     os_scripts[script] = os.path.sep.join([os_dir, script])
@@ -1108,11 +1218,82 @@ def OSFromDisk(name, base_dir=None):
 
 
   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
-                    create_script=os_scripts['create'],
-                    export_script=os_scripts['export'],
-                    import_script=os_scripts['import'],
-                    rename_script=os_scripts['rename'],
-                    api_version=api_version)
+                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
+                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
+                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
+                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
+                    api_versions=api_versions)
+
+def OSEnvironment(instance, debug=0):
+  """Calculate the environment for an os script.
+
+  @type instance: instance object
+  @param instance: target instance for the os script run
+  @type debug: integer
+  @param debug: debug level (0 or 1, for os api 10)
+  @rtype: dict
+  @return: dict of environment variables
+
+  """
+  result = {}
+  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
+  result['INSTANCE_NAME'] = instance.name
+  result['HYPERVISOR'] = instance.hypervisor
+  result['DISK_COUNT'] = '%d' % len(instance.disks)
+  result['NIC_COUNT'] = '%d' % len(instance.nics)
+  result['DEBUG_LEVEL'] = '%d' % debug
+  for idx, disk in enumerate(instance.disks):
+    real_disk = _RecursiveFindBD(disk)
+    if real_disk is None:
+      raise errors.BlockDeviceError("Block device '%s' is not set up" %
+                                    str(disk))
+    real_disk.Open()
+    result['DISK_%d_PATH' % idx] = real_disk.dev_path
+    # FIXME: When disks will have read-only mode, populate this
+    result['DISK_%d_ACCESS' % idx] = 'W'
+    if constants.HV_DISK_TYPE in instance.hvparams:
+      result['DISK_%d_FRONTEND_TYPE' % idx] = \
+        instance.hvparams[constants.HV_DISK_TYPE]
+    if disk.dev_type in constants.LDS_BLOCK:
+      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
+    elif disk.dev_type == constants.LD_FILE:
+      result['DISK_%d_BACKEND_TYPE' % idx] = \
+        'file:%s' % disk.physical_id[0]
+  for idx, nic in enumerate(instance.nics):
+    result['NIC_%d_MAC' % idx] = nic.mac
+    if nic.ip:
+      result['NIC_%d_IP' % idx] = nic.ip
+    result['NIC_%d_BRIDGE' % idx] = nic.bridge
+    if constants.HV_NIC_TYPE in instance.hvparams:
+      result['NIC_%d_FRONTEND_TYPE' % idx] = \
+        instance.hvparams[constants.HV_NIC_TYPE]
+
+  return result
+
+def GrowBlockDevice(disk, amount):
+  """Grow a stack of block devices.
+
+  This function is called recursively, with the childrens being the
+  first one resize.
+
+  Args:
+    disk: the disk to be grown
+
+  Returns: a tuple of (status, result), with:
+    status: the result (true/false) of the operation
+    result: the error message if the operation failed, otherwise not used
+
+  """
+  r_dev = _RecursiveFindBD(disk)
+  if r_dev is None:
+    return False, "Cannot find block device %s" % (disk,)
+
+  try:
+    r_dev.Grow(amount)
+  except errors.BlockDeviceError, err:
+    return False, str(err)
+
+  return True, None
 
 
 def SnapshotBlockDevice(disk):
@@ -1151,7 +1332,7 @@ def SnapshotBlockDevice(disk):
                                  (disk.unique_id, disk.dev_type))
 
 
-def ExportSnapshot(disk, dest_node, instance):
+def ExportSnapshot(disk, dest_node, instance, cluster_name):
   """Export a block device snapshot to a remote node.
 
   Args:
@@ -1163,6 +1344,10 @@ def ExportSnapshot(disk, dest_node, instance):
     True if successful, False otherwise.
 
   """
+  # TODO(ultrotter): Import/Export still to be converted to OS API 10
+  logging.error("Import/Export still to be converted to OS API 10")
+  return False
+
   inst_os = OSFromDisk(instance.os)
   export_script = inst_os.export_script
 
@@ -1192,8 +1377,9 @@ def ExportSnapshot(disk, dest_node, instance):
 
   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
                                 destdir, destdir, destfile)
-  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
-                                       destcmd)
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
 
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
@@ -1201,9 +1387,8 @@ def ExportSnapshot(disk, dest_node, instance):
   result = utils.RunCmd(command)
 
   if result.failed:
-    logger.Error("os snapshot export command '%s' returned error: %s"
-                 " output: %s" %
-                 (command, result.fail_reason, result.output))
+    logging.error("os snapshot export command '%s' returned error: %s"
+                  " output: %s", command, result.fail_reason, result.output)
     return False
 
   return True
@@ -1234,17 +1419,23 @@ def FinalizeExport(instance, snap_disks):
 
   config.add_section(constants.INISECT_INS)
   config.set(constants.INISECT_INS, 'name', instance.name)
-  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
-  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
+  config.set(constants.INISECT_INS, 'memory', '%d' %
+             instance.beparams[constants.BE_MEMORY])
+  config.set(constants.INISECT_INS, 'vcpus', '%d' %
+             instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
+
+  nic_count = 0
   for nic_count, nic in enumerate(instance.nics):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+               '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
+  disk_count = 0
   for disk_count, disk in enumerate(snap_disks):
     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
                ('%s' % disk.iv_name))
@@ -1289,7 +1480,8 @@ def ExportInfo(dest):
   return config
 
 
-def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
+def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
+                         cluster_name):
   """Import an os image into an instance.
 
   Args:
@@ -1303,17 +1495,21 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
     False in case of error, True otherwise.
 
   """
+  # TODO(ultrotter): Import/Export still to be converted to OS API 10
+  logging.error("Import/Export still to be converted to OS API 10")
+  return False
+
   inst_os = OSFromDisk(instance.os)
   import_script = inst_os.import_script
 
   os_device = instance.FindDisk(os_disk)
   if os_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % os_disk)
+    logging.error("Can't find this device-visible name '%s'", os_disk)
     return False
 
   swap_device = instance.FindDisk(swap_disk)
   if swap_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
+    logging.error("Can't find this device-visible name '%s'", swap_disk)
     return False
 
   real_os_dev = _RecursiveFindBD(os_device)
@@ -1334,8 +1530,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
   destcmd = utils.BuildShellCmd('cat %s', src_image)
-  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
-                                       destcmd)
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
 
   comprcmd = "gunzip"
   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
@@ -1344,13 +1541,13 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+  env = {'HYPERVISOR': instance.hypervisor}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
 
   if result.failed:
-    logger.Error("os import command '%s' returned error: %s"
-                 " output: %s" %
-                 (command, result.fail_reason, result.output))
+    logging.error("os import command '%s' returned error: %s"
+                  " output: %s", command, result.fail_reason, result.output)
     return False
 
   return True
@@ -1411,8 +1608,7 @@ def RenameBlockDevices(devlist):
         # cache? for now, we only lose lvm data when we rename, which
         # is less critical than DRBD or MD
     except errors.BlockDeviceError, err:
-      logger.Error("Can't rename device '%s' to '%s': %s" %
-                   (dev, unique_id, err))
+      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
       result = False
   return result
 
@@ -1426,18 +1622,19 @@ def _TransformFileStorageDir(file_storage_dir):
 
   Args:
     file_storage_dir: string with path
-  
+
   Returns:
     normalized file_storage_dir (string) if valid, None otherwise
 
   """
+  cfg = _GetConfig()
   file_storage_dir = os.path.normpath(file_storage_dir)
-  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
+  base_file_storage_dir = cfg.GetFileStorageDir()
   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
       base_file_storage_dir):
-    logger.Error("file storage directory '%s' is not under base file"
-                 " storage directory '%s'" %
-                 (file_storage_dir, base_file_storage_dir))
+    logging.error("file storage directory '%s' is not under base file"
+                  " storage directory '%s'",
+                  file_storage_dir, base_file_storage_dir)
     return None
   return file_storage_dir
 
@@ -1460,14 +1657,14 @@ def CreateFileStorageDir(file_storage_dir):
   else:
     if os.path.exists(file_storage_dir):
       if not os.path.isdir(file_storage_dir):
-        logger.Error("'%s' is not a directory" % file_storage_dir)
+        logging.error("'%s' is not a directory", file_storage_dir)
         result = False,
     else:
       try:
         os.makedirs(file_storage_dir, 0750)
       except OSError, err:
-        logger.Error("Cannot create file storage directory '%s': %s" %
-                     (file_storage_dir, err))
+        logging.error("Cannot create file storage directory '%s': %s",
+                      file_storage_dir, err)
         result = False,
   return result
 
@@ -1492,14 +1689,14 @@ def RemoveFileStorageDir(file_storage_dir):
   else:
     if os.path.exists(file_storage_dir):
       if not os.path.isdir(file_storage_dir):
-        logger.Error("'%s' is not a directory" % file_storage_dir)
+        logging.error("'%s' is not a directory", file_storage_dir)
         result = False,
       # deletes dir only if empty, otherwise we want to return False
       try:
         os.rmdir(file_storage_dir)
       except OSError, err:
-        logger.Error("Cannot remove file storage directory '%s': %s" %
-                     (file_storage_dir, err))
+        logging.exception("Cannot remove file storage directory '%s'",
+                          file_storage_dir)
         result = False,
   return result
 
@@ -1527,20 +1724,120 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
         try:
           os.rename(old_file_storage_dir, new_file_storage_dir)
         except OSError, err:
-          logger.Error("Cannot rename '%s' to '%s': %s"
-                       % (old_file_storage_dir, new_file_storage_dir, err))
+          logging.exception("Cannot rename '%s' to '%s'",
+                            old_file_storage_dir, new_file_storage_dir)
           result =  False,
       else:
-        logger.Error("'%s' is not a directory" % old_file_storage_dir)
+        logging.error("'%s' is not a directory", old_file_storage_dir)
         result = False,
     else:
       if os.path.exists(old_file_storage_dir):
-        logger.Error("Cannot rename '%s' to '%s'. Both locations exist." %
-                     old_file_storage_dir, new_file_storage_dir)
+        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
+                      old_file_storage_dir, new_file_storage_dir)
         result = False,
   return result
 
 
+def _IsJobQueueFile(file_name):
+  """Checks whether the given filename is in the queue directory.
+
+  """
+  queue_dir = os.path.normpath(constants.QUEUE_DIR)
+  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+  if not result:
+    logging.error("'%s' is not a file in the queue directory",
+                  file_name)
+
+  return result
+
+
+def JobQueueUpdate(file_name, content):
+  """Updates a file in the queue directory.
+
+  """
+  if not _IsJobQueueFile(file_name):
+    return False
+
+  # Write and replace the file atomically
+  utils.WriteFile(file_name, data=content)
+
+  return True
+
+
+def JobQueueRename(old, new):
+  """Renames a job queue file.
+
+  """
+  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+    return False
+
+  os.rename(old, new)
+
+  return True
+
+
+def JobQueueSetDrainFlag(drain_flag):
+  """Set the drain flag for the queue.
+
+  This will set or unset the queue drain flag.
+
+  @type drain_flag: bool
+  @param drain_flag: if True, will set the drain flag, otherwise reset it.
+
+  """
+  if drain_flag:
+    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
+  else:
+    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+
+  return True
+
+
+def CloseBlockDevices(disks):
+  """Closes the given block devices.
+
+  This means they will be switched to secondary mode (in case of DRBD).
+
+  """
+  bdevs = []
+  for cf in disks:
+    rd = _RecursiveFindBD(cf)
+    if rd is None:
+      return (False, "Can't find device %s" % cf)
+    bdevs.append(rd)
+
+  msg = []
+  for rd in bdevs:
+    try:
+      rd.Close()
+    except errors.BlockDeviceError, err:
+      msg.append(str(err))
+  if msg:
+    return (False, "Can't make devices secondary: %s" % ",".join(msg))
+  else:
+    return (True, "All devices secondary")
+
+
+def ValidateHVParams(hvname, hvparams):
+  """Validates the given hypervisor parameters.
+
+  @type hvname: string
+  @param hvname: the hypervisor name
+  @type hvparams: dict
+  @param hvparams: the hypervisor parameters to be validated
+  @rtype: tuple (bool, str)
+  @return: tuple of (success, message)
+
+  """
+  try:
+    hv_type = hypervisor.GetHypervisor(hvname)
+    hv_type.ValidateParameters(hvparams)
+    return (True, "Validation passed")
+  except errors.HypervisorError, err:
+    return (False, str(err))
+
+
 class HooksRunner(object):
   """Hook runner.
 
@@ -1556,9 +1853,6 @@ class HooksRunner(object):
     Args:
       - hooks_base_dir: if not None, this overrides the
         constants.HOOKS_BASE_DIR (useful for unittests)
-      - logs_base_dir: if not None, this overrides the
-        constants.LOG_HOOKS_DIR (useful for unittests)
-      - logging: enable or disable logging of script output
 
     """
     if hooks_base_dir is None:
@@ -1570,7 +1864,6 @@ class HooksRunner(object):
     """Exec one hook script.
 
     Args:
-     - phase: the phase
      - script: the full path to the script
      - env: the environment with which to exec the script
 
@@ -1605,7 +1898,7 @@ class HooksRunner(object):
             fd.close()
           except EnvironmentError, err:
             # just log the error
-            #logger.Error("While closing fd %s: %s" % (fd, err))
+            #logging.exception("Error while closing fd %s", fd)
             pass
 
     return result == 0, output
@@ -1652,6 +1945,42 @@ class HooksRunner(object):
     return rr
 
 
+class IAllocatorRunner(object):
+  """IAllocator runner.
+
+  This class is instantiated on the node side (ganeti-noded) and not on
+  the master side.
+
+  """
+  def Run(self, name, idata):
+    """Run an iallocator script.
+
+    Return value: tuple of:
+       - run status (one of the IARUN_ constants)
+       - stdout
+       - stderr
+       - fail reason (as from utils.RunResult)
+
+    """
+    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
+                                  os.path.isfile)
+    if alloc_script is None:
+      return (constants.IARUN_NOTFOUND, None, None, None)
+
+    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
+    try:
+      os.write(fd, idata)
+      os.close(fd)
+      result = utils.RunCmd([alloc_script, fin_name])
+      if result.failed:
+        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
+                result.fail_reason)
+    finally:
+      os.unlink(fin_name)
+
+    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
+
+
 class DevCacheManager(object):
   """Simple class for managing a cache of block device information.
 
@@ -1679,7 +2008,7 @@ class DevCacheManager(object):
 
     """
     if dev_path is None:
-      logger.Error("DevCacheManager.UpdateCache got a None dev_path")
+      logging.error("DevCacheManager.UpdateCache got a None dev_path")
       return
     fpath = cls._ConvertPath(dev_path)
     if on_primary:
@@ -1692,8 +2021,7 @@ class DevCacheManager(object):
     try:
       utils.WriteFile(fpath, data=fdata)
     except EnvironmentError, err:
-      logger.Error("Can't update bdev cache for %s, error %s" %
-                   (dev_path, str(err)))
+      logging.exception("Can't update bdev cache for %s", dev_path)
 
   @classmethod
   def RemoveCache(cls, dev_path):
@@ -1701,11 +2029,10 @@ class DevCacheManager(object):
 
     """
     if dev_path is None:
-      logger.Error("DevCacheManager.RemoveCache got a None dev_path")
+      logging.error("DevCacheManager.RemoveCache got a None dev_path")
       return
     fpath = cls._ConvertPath(dev_path)
     try:
       utils.RemoveFile(fpath)
     except EnvironmentError, err:
-      logger.Error("Can't update bdev cache for %s, error %s" %
-                   (dev_path, str(err)))
+      logging.exception("Can't update bdev cache for %s", dev_path)