Move the hooks file mask into constants.py
[ganeti-local] / lib / backend.py
index 70afa60..44e45f6 100644 (file)
 # 02110-1301, USA.
 
 
 # 02110-1301, USA.
 
 
-"""Functions used by the node daemon"""
+"""Functions used by the node daemon
+
+@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
+     the L{UploadFile} function
+
+"""
 
 
 import os
 
 
 import os
@@ -33,6 +38,8 @@ import subprocess
 import random
 import logging
 import tempfile
 import random
 import logging
 import tempfile
+import zlib
+import base64
 
 from ganeti import errors
 from ganeti import utils
 
 from ganeti import errors
 from ganeti import utils
@@ -67,7 +74,27 @@ def _GetSshRunner(cluster_name):
   return ssh.SshRunner(cluster_name)
 
 
   return ssh.SshRunner(cluster_name)
 
 
-def _CleanDirectory(path, exclude=[]):
+def _Decompress(data):
+  """Unpacks data compressed by the RPC client.
+
+  @type data: list or tuple
+  @param data: Data sent by RPC client
+  @rtype: str
+  @return: Decompressed data
+
+  """
+  assert isinstance(data, (list, tuple))
+  assert len(data) == 2
+  (encoding, content) = data
+  if encoding == constants.RPC_ENCODING_NONE:
+    return content
+  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
+    return zlib.decompress(base64.b64decode(content))
+  else:
+    raise AssertionError("Unknown data encoding")
+
+
+def _CleanDirectory(path, exclude=None):
   """Removes all regular files in a directory.
 
   @type path: str
   """Removes all regular files in a directory.
 
   @type path: str
@@ -75,14 +102,15 @@ def _CleanDirectory(path, exclude=[]):
   @type exclude: list
   @param exclude: list of files to be excluded, defaults
       to the empty list
   @type exclude: list
   @param exclude: list of files to be excluded, defaults
       to the empty list
-  @rtype: None
 
   """
   if not os.path.isdir(path):
     return
 
   """
   if not os.path.isdir(path):
     return
-
-  # Normalize excluded paths
-  exclude = [os.path.normpath(i) for i in exclude]
+  if exclude is None:
+    exclude = []
+  else:
+    # Normalize excluded paths
+    exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
     full_name = os.path.normpath(os.path.join(path, rel_name))
 
   for rel_name in utils.ListVisibleFiles(path):
     full_name = os.path.normpath(os.path.join(path, rel_name))
@@ -92,6 +120,23 @@ def _CleanDirectory(path, exclude=[]):
       utils.RemoveFile(full_name)
 
 
       utils.RemoveFile(full_name)
 
 
+def _BuildUploadFileList():
+  """Build the list of allowed upload files.
+
+  This is abstracted so that it's built only once at module import time.
+
+  """
+  return frozenset([
+      constants.CLUSTER_CONF_FILE,
+      constants.ETC_HOSTS,
+      constants.SSH_KNOWN_HOSTS_FILE,
+      constants.VNC_PASSWORD_FILE,
+      ])
+
+
+_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
+
+
 def JobQueuePurge():
   """Removes job queue files and archived jobs.
 
 def JobQueuePurge():
   """Removes job queue files and archived jobs.
 
@@ -118,13 +163,13 @@ def GetMasterInfo():
     master_netdev = cfg.GetMasterNetdev()
     master_ip = cfg.GetMasterIP()
     master_node = cfg.GetMasterNode()
     master_netdev = cfg.GetMasterNetdev()
     master_ip = cfg.GetMasterIP()
     master_node = cfg.GetMasterNode()
-  except errors.ConfigurationError, err:
+  except errors.ConfigurationError:
     logging.exception("Cluster configuration incomplete")
     return (None, None, None)
   return (master_netdev, master_ip, master_node)
 
 
     logging.exception("Cluster configuration incomplete")
     return (None, None, None)
   return (master_netdev, master_ip, master_node)
 
 
-def StartMaster(start_daemons):
+def StartMaster(start_daemons, no_voting):
   """Activate local node as master node.
 
   The function will always try activate the IP address of the master
   """Activate local node as master node.
 
   The function will always try activate the IP address of the master
@@ -134,6 +179,9 @@ def StartMaster(start_daemons):
   @type start_daemons: boolean
   @param start_daemons: whther to also start the master
       daemons (ganeti-masterd and ganeti-rapi)
   @type start_daemons: boolean
   @param start_daemons: whther to also start the master
       daemons (ganeti-masterd and ganeti-rapi)
+  @type no_voting: boolean
+  @param no_voting: whether to start ganeti-masterd without a node vote
+      (if start_daemons is True), but still non-interactively
   @rtype: None
 
   """
   @rtype: None
 
   """
@@ -163,8 +211,17 @@ def StartMaster(start_daemons):
 
   # and now start the master and rapi daemons
   if start_daemons:
 
   # and now start the master and rapi daemons
   if start_daemons:
-    for daemon in 'ganeti-masterd', 'ganeti-rapi':
-      result = utils.RunCmd([daemon])
+    daemons_params = {
+        'ganeti-masterd': [],
+        'ganeti-rapi': [],
+        }
+    if no_voting:
+      daemons_params['ganeti-masterd'].append('--no-voting')
+      daemons_params['ganeti-masterd'].append('--yes-do-it')
+    for daemon in daemons_params:
+      cmd = [daemon]
+      cmd.extend(daemons_params[daemon])
+      result = utils.RunCmd(cmd)
       if result.failed:
         logging.error("Can't start daemon %s: %s", daemon, result.output)
         ok = False
       if result.failed:
         logging.error("Can't start daemon %s: %s", daemon, result.output)
         ok = False
@@ -237,8 +294,9 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
                                                     mkdir=True)
   except errors.OpExecError, err:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
                                                     mkdir=True)
   except errors.OpExecError, err:
-    logging.exception("Error while processing user ssh files")
-    return False
+    msg = "Error while processing user ssh files"
+    logging.exception(msg)
+    return (False, "%s: %s" % (msg, err))
 
   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
     utils.WriteFile(name, data=content, mode=0600)
 
   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
     utils.WriteFile(name, data=content, mode=0600)
@@ -247,7 +305,7 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
 
   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
 
 
   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
 
-  return True
+  return (True, "Node added successfully")
 
 
 def LeaveCluster():
 
 
 def LeaveCluster():
@@ -257,7 +315,7 @@ def LeaveCluster():
   from the cluster.
 
   If processing is successful, then it raises an
   from the cluster.
 
   If processing is successful, then it raises an
-  L{errors.GanetiQuitException} which is used as a special case to
+  L{errors.QuitGanetiException} which is used as a special case to
   shutdown the node daemon.
 
   """
   shutdown the node daemon.
 
   """
@@ -284,7 +342,7 @@ def LeaveCluster():
 
 
 def GetNodeInfo(vgname, hypervisor_type):
 
 
 def GetNodeInfo(vgname, hypervisor_type):
-  """Gives back a hash with different informations about the node.
+  """Gives back a hash with different information about the node.
 
   @type vgname: C{string}
   @param vgname: the name of the volume group to ask for disk space information
 
   @type vgname: C{string}
   @param vgname: the name of the volume group to ask for disk space information
@@ -350,37 +408,38 @@ def VerifyNode(what, cluster_name):
   """
   result = {}
 
   """
   result = {}
 
-  if 'hypervisor' in what:
-    result['hypervisor'] = my_dict = {}
-    for hv_name in what['hypervisor']:
-      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+  if constants.NV_HYPERVISOR in what:
+    result[constants.NV_HYPERVISOR] = tmp = {}
+    for hv_name in what[constants.NV_HYPERVISOR]:
+      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
 
-  if 'filelist' in what:
-    result['filelist'] = utils.FingerprintFiles(what['filelist'])
+  if constants.NV_FILELIST in what:
+    result[constants.NV_FILELIST] = utils.FingerprintFiles(
+      what[constants.NV_FILELIST])
 
 
-  if 'nodelist' in what:
-    result['nodelist'] = {}
-    random.shuffle(what['nodelist'])
-    for node in what['nodelist']:
+  if constants.NV_NODELIST in what:
+    result[constants.NV_NODELIST] = tmp = {}
+    random.shuffle(what[constants.NV_NODELIST])
+    for node in what[constants.NV_NODELIST]:
       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
-        result['nodelist'][node] = message
-  if 'node-net-test' in what:
-    result['node-net-test'] = {}
+        tmp[node] = message
+
+  if constants.NV_NODENETTEST in what:
+    result[constants.NV_NODENETTEST] = tmp = {}
     my_name = utils.HostInfo().name
     my_pip = my_sip = None
     my_name = utils.HostInfo().name
     my_pip = my_sip = None
-    for name, pip, sip in what['node-net-test']:
+    for name, pip, sip in what[constants.NV_NODENETTEST]:
       if name == my_name:
         my_pip = pip
         my_sip = sip
         break
     if not my_pip:
       if name == my_name:
         my_pip = pip
         my_sip = sip
         break
     if not my_pip:
-      result['node-net-test'][my_name] = ("Can't find my own"
-                                          " primary/secondary IP"
-                                          " in the node list")
+      tmp[my_name] = ("Can't find my own primary/secondary IP"
+                      " in the node list")
     else:
       port = utils.GetNodeDaemonPort()
     else:
       port = utils.GetNodeDaemonPort()
-      for name, pip, sip in what['node-net-test']:
+      for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
@@ -388,9 +447,34 @@ def VerifyNode(what, cluster_name):
           if not utils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
           if not utils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
-          result['node-net-test'][name] = ("failure using the %s"
-                                           " interface(s)" %
-                                           " and ".join(fail))
+          tmp[name] = ("failure using the %s interface(s)" %
+                       " and ".join(fail))
+
+  if constants.NV_LVLIST in what:
+    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+
+  if constants.NV_INSTANCELIST in what:
+    result[constants.NV_INSTANCELIST] = GetInstanceList(
+      what[constants.NV_INSTANCELIST])
+
+  if constants.NV_VGLIST in what:
+    result[constants.NV_VGLIST] = ListVolumeGroups()
+
+  if constants.NV_VERSION in what:
+    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
+                                    constants.RELEASE_VERSION)
+
+  if constants.NV_HVINFO in what:
+    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
+    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
+
+  if constants.NV_DRBDLIST in what:
+    try:
+      used_minors = bdev.DRBD8.GetUsedDevs().keys()
+    except errors.BlockDeviceError, err:
+      logging.warning("Can't get used minors list", exc_info=True)
+      used_minors = str(err)
+    result[constants.NV_DRBDLIST] = used_minors
 
   return result
 
 
   return result
 
@@ -523,16 +607,15 @@ def GetInstanceList(hypervisor_list):
     try:
       names = hypervisor.GetHypervisor(hname).ListInstances()
       results.extend(names)
     try:
       names = hypervisor.GetHypervisor(hname).ListInstances()
       results.extend(names)
-    except errors.HypervisorError, err:
+    except errors.HypervisorError:
       logging.exception("Error enumerating instances for hypevisor %s", hname)
       logging.exception("Error enumerating instances for hypevisor %s", hname)
-      # FIXME: should we somehow not propagate this to the master?
       raise
 
   return results
 
 
 def GetInstanceInfo(instance, hname):
       raise
 
   return results
 
 
 def GetInstanceInfo(instance, hname):
-  """Gives back the informations about an instance as a dictionary.
+  """Gives back the information about an instance as a dictionary.
 
   @type instance: string
   @param instance: the instance name
 
   @type instance: string
   @param instance: the instance name
@@ -557,6 +640,30 @@ def GetInstanceInfo(instance, hname):
   return output
 
 
   return output
 
 
+def GetInstanceMigratable(instance):
+  """Gives whether an instance can be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: object representing the instance to be checked.
+
+  @rtype: tuple
+  @return: tuple of (result, description) where:
+      - result: whether the instance can be migrated or not
+      - description: a description of the issue, if relevant
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  if instance.name not in hyper.ListInstances():
+    return (False, 'not running')
+
+  for idx in range(len(instance.disks)):
+    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
+    if not os.path.islink(link_name):
+      return (False, 'not restarted since ganeti 1.2.5')
+
+  return (True, '')
+
+
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
@@ -587,15 +694,20 @@ def GetAllInstancesInfo(hypervisor_list):
           'state': state,
           'time': times,
           }
           'state': state,
           'time': times,
           }
-        if name in output and output[name] != value:
-          raise errors.HypervisorError("Instance %s running duplicate"
-                                       " with different parameters" % name)
+        if name in output:
+          # we only check static parameters, like memory and vcpus,
+          # and not state and time which can change between the
+          # invocations of the different hypervisors
+          for key in 'memory', 'vcpus':
+            if value[key] != output[name][key]:
+              raise errors.HypervisorError("Instance %s is running twice"
+                                           " with different parameters" % name)
         output[name] = value
 
   return output
 
 
         output[name] = value
 
   return output
 
 
-def AddOSToInstance(instance):
+def InstanceOsAdd(instance):
   """Add an OS to an instance.
 
   @type instance: L{objects.Instance}
   """Add an OS to an instance.
 
   @type instance: L{objects.Instance}
@@ -604,7 +716,15 @@ def AddOSToInstance(instance):
   @return: the success of the operation
 
   """
   @return: the success of the operation
 
   """
-  inst_os = OSFromDisk(instance.os)
+  try:
+    inst_os = OSFromDisk(instance.os)
+  except errors.InvalidOS, err:
+    os_name, os_dir, os_err = err.args
+    if os_dir is None:
+      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
+    else:
+      return (False, "Error parsing OS '%s' in directory %s: %s" %
+              (os_name, os_dir, os_err))
 
   create_env = OSEnvironment(instance)
 
 
   create_env = OSEnvironment(instance)
 
@@ -617,9 +737,12 @@ def AddOSToInstance(instance):
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
-    return False
+    lines = [utils.SafeEncode(val)
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS create script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
 
-  return True
+  return (True, "Successfully installed")
 
 
 def RunRenameInstance(instance, old_name):
 
 
 def RunRenameInstance(instance, old_name):
@@ -648,13 +771,16 @@ def RunRenameInstance(instance, old_name):
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
                   result.cmd, result.fail_reason, result.output)
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
                   result.cmd, result.fail_reason, result.output)
-    return False
+    lines = [utils.SafeEncode(val)
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS rename script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
 
-  return True
+  return (True, "Rename successful")
 
 
 def _GetVGInfo(vg_name):
 
 
 def _GetVGInfo(vg_name):
-  """Get informations about the volume group.
+  """Get information about the volume group.
 
   @type vg_name: str
   @param vg_name: the volume group which we query
 
   @type vg_name: str
   @param vg_name: the volume group which we query
@@ -693,7 +819,53 @@ def _GetVGInfo(vg_name):
   return retdic
 
 
   return retdic
 
 
-def _GatherBlockDevs(instance):
+def _GetBlockDevSymlinkPath(instance_name, idx):
+  return os.path.join(constants.DISK_LINKS_DIR,
+                      "%s:%d" % (instance_name, idx))
+
+
+def _SymlinkBlockDev(instance_name, device_path, idx):
+  """Set up symlinks to a instance's block device.
+
+  This is an auxiliary function run when an instance is start (on the primary
+  node) or when an instance is migrated (on the target node).
+
+
+  @param instance_name: the name of the target instance
+  @param device_path: path of the physical block device, on the node
+  @param idx: the disk index
+  @return: absolute path to the disk's symlink
+
+  """
+  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+  try:
+    os.symlink(device_path, link_name)
+  except OSError, err:
+    if err.errno == errno.EEXIST:
+      if (not os.path.islink(link_name) or
+          os.readlink(link_name) != device_path):
+        os.remove(link_name)
+        os.symlink(device_path, link_name)
+    else:
+      raise
+
+  return link_name
+
+
+def _RemoveBlockDevLinks(instance_name, disks):
+  """Remove the block device symlinks belonging to the given instance.
+
+  """
+  for idx, disk in enumerate(disks):
+    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+    if os.path.islink(link_name):
+      try:
+        os.remove(link_name)
+      except OSError:
+        logging.exception("Can't remove symlink '%s'", link_name)
+
+
+def _GatherAndLinkBlockDevs(instance):
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
@@ -701,22 +873,29 @@ def _GatherBlockDevs(instance):
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
-  @rtype: list of L{bdev.BlockDev}
-  @return: list of the block devices
+  @rtype: list
+  @return: list of (disk_object, device_path)
 
   """
   block_devices = []
 
   """
   block_devices = []
-  for disk in instance.disks:
+  for idx, disk in enumerate(instance.disks):
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
-    block_devices.append((disk, device))
+    try:
+      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
+    except OSError, e:
+      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
+                                    e.strerror)
+
+    block_devices.append((disk, link_name))
+
   return block_devices
 
 
   return block_devices
 
 
-def StartInstance(instance, extra_args):
+def StartInstance(instance):
   """Start an instance.
 
   @type instance: L{objects.Instance}
   """Start an instance.
 
   @type instance: L{objects.Instance}
@@ -728,21 +907,24 @@ def StartInstance(instance, extra_args):
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
-    return True
-
-  block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+    return (True, "Already running")
 
   try:
 
   try:
-    hyper.StartInstance(instance, block_devices, extra_args)
+    block_devices = _GatherAndLinkBlockDevs(instance)
+    hyper = hypervisor.GetHypervisor(instance.hypervisor)
+    hyper.StartInstance(instance, block_devices)
+  except errors.BlockDeviceError, err:
+    logging.exception("Failed to start instance")
+    return (False, "Block device error: %s" % str(err))
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
-    return False
+    _RemoveBlockDevLinks(instance.name, instance.disks)
+    return (False, "Hypervisor error: %s" % str(err))
 
 
-  return True
+  return (True, "Instance started successfully")
 
 
 
 
-def ShutdownInstance(instance):
+def InstanceShutdown(instance):
   """Shut an instance down.
 
   @note: this functions uses polling with a hardcoded timeout.
   """Shut an instance down.
 
   @note: this functions uses polling with a hardcoded timeout.
@@ -757,43 +939,48 @@ def ShutdownInstance(instance):
   running_instances = GetInstanceList([hv_name])
 
   if instance.name not in running_instances:
   running_instances = GetInstanceList([hv_name])
 
   if instance.name not in running_instances:
-    return True
+    return (True, "Instance already stopped")
 
   hyper = hypervisor.GetHypervisor(hv_name)
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
 
   hyper = hypervisor.GetHypervisor(hv_name)
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
-    logging.error("Failed to stop instance")
-    return False
+    msg = "Failed to stop instance %s: %s" % (instance.name, err)
+    logging.error(msg)
+    return (False, msg)
 
   # test every 10secs for 2min
 
   # test every 10secs for 2min
-  shutdown_ok = False
 
   time.sleep(1)
 
   time.sleep(1)
-  for dummy in range(11):
+  for _ in range(11):
     if instance.name not in GetInstanceList([hv_name]):
       break
     time.sleep(10)
   else:
     # the shutdown did not succeed
     if instance.name not in GetInstanceList([hv_name]):
       break
     time.sleep(10)
   else:
     # the shutdown did not succeed
-    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
+    logging.error("Shutdown of '%s' unsuccessful, using destroy",
+                  instance.name)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      logging.exception("Failed to stop instance")
-      return False
+      msg = "Failed to force stop instance %s: %s" % (instance.name, err)
+      logging.error(msg)
+      return (False, msg)
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
-      logging.error("could not shutdown instance '%s' even by destroy",
-                    instance.name)
-      return False
+      msg = ("Could not shutdown instance %s even by destroy" %
+             instance.name)
+      logging.error(msg)
+      return (False, msg)
 
 
-  return True
+  _RemoveBlockDevLinks(instance.name, instance.disks)
+
+  return (True, "Instance has been shutdown successfully")
 
 
 
 
-def RebootInstance(instance, reboot_type, extra_args):
+def InstanceReboot(instance, reboot_type):
   """Reboot an instance.
 
   @type instance: L{objects.Instance}
   """Reboot an instance.
 
   @type instance: L{objects.Instance}
@@ -805,9 +992,10 @@ def RebootInstance(instance, reboot_type, extra_args):
         instance OS, do not recreate the VM
       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
         restart the VM (at the hypervisor level)
         instance OS, do not recreate the VM
       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
         restart the VM (at the hypervisor level)
-      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
-        is not accepted here, since that mode is handled
-        differently
+      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
+        not accepted here, since that mode is handled differently, in
+        cmdlib, and translates into full stop and start of the
+        instance (instead of a call_instance_reboot RPC)
   @rtype: boolean
   @return: the success of the operation
 
   @rtype: boolean
   @return: the success of the operation
 
@@ -815,27 +1003,91 @@ def RebootInstance(instance, reboot_type, extra_args):
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name not in running_instances:
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name not in running_instances:
-    logging.error("Cannot reboot instance that is not running")
-    return False
+    msg = "Cannot reboot instance %s that is not running" % instance.name
+    logging.error(msg)
+    return (False, msg)
 
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
     try:
       hyper.RebootInstance(instance)
     except errors.HypervisorError, err:
 
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
     try:
       hyper.RebootInstance(instance)
     except errors.HypervisorError, err:
-      logging.exception("Failed to soft reboot instance")
-      return False
+      msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
+      logging.error(msg)
+      return (False, msg)
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
-      ShutdownInstance(instance)
-      StartInstance(instance, extra_args)
+      stop_result = InstanceShutdown(instance)
+      if not stop_result[0]:
+        return stop_result
+      return StartInstance(instance)
     except errors.HypervisorError, err:
     except errors.HypervisorError, err:
-      logging.exception("Failed to hard reboot instance")
-      return False
+      msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
+      logging.error(msg)
+      return (False, msg)
   else:
   else:
-    raise errors.ParameterError("reboot_type invalid")
+    return (False, "Invalid reboot_type received: %s" % (reboot_type,))
+
+  return (True, "Reboot successful")
 
 
-  return True
+
+def MigrationInfo(instance):
+  """Gather information about an instance to be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    info = hyper.MigrationInfo(instance)
+  except errors.HypervisorError, err:
+    msg = "Failed to fetch migration information"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, info)
+
+
+def AcceptInstance(instance, info, target):
+  """Prepare the node to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type target: string
+  @param target: target host (usually ip), on this node
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.AcceptInstance(instance, info, target)
+  except errors.HypervisorError, err:
+    msg = "Failed to accept instance"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Accept successful")
+
+
+def FinalizeMigration(instance, info, success):
+  """Finalize any preparation to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type success: boolean
+  @param success: whether the migration was a success or a failure
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.FinalizeMigration(instance, info, success)
+  except errors.HypervisorError, err:
+    msg = "Failed to finalize migration"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Migration Finalized")
 
 
 def MigrateInstance(instance, target, live):
 
 
 def MigrateInstance(instance, target, live):
@@ -854,18 +1106,18 @@ def MigrateInstance(instance, target, live):
       - msg is a string with details in case of failure
 
   """
       - msg is a string with details in case of failure
 
   """
-  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
-    msg = "Failed to migrate instance: %s" % str(err)
-    logging.error(msg)
-    return (False, msg)
-  return (True, "Migration successfull")
+    msg = "Failed to migrate instance"
+    logging.exception(msg)
+    return (False, "%s: %s" % (msg, err))
+  return (True, "Migration successful")
 
 
 
 
-def CreateBlockDevice(disk, size, owner, on_primary, info):
+def BlockdevCreate(disk, size, owner, on_primary, info):
   """Creates a block device for an instance.
 
   @type disk: L{objects.Disk}
   """Creates a block device for an instance.
 
   @type disk: L{objects.Disk}
@@ -889,73 +1141,91 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   clist = []
   if disk.children:
     for child in disk.children:
   clist = []
   if disk.children:
     for child in disk.children:
-      crdev = _RecursiveAssembleBD(child, owner, on_primary)
+      try:
+        crdev = _RecursiveAssembleBD(child, owner, on_primary)
+      except errors.BlockDeviceError, err:
+        errmsg = "Can't assemble device %s: %s" % (child, err)
+        logging.error(errmsg)
+        return False, errmsg
       if on_primary or disk.AssembleOnSecondary():
         # we need the children open in case the device itself has to
         # be assembled
       if on_primary or disk.AssembleOnSecondary():
         # we need the children open in case the device itself has to
         # be assembled
-        crdev.Open()
+        try:
+          crdev.Open()
+        except errors.BlockDeviceError, err:
+          errmsg = "Can't make child '%s' read-write: %s" % (child, err)
+          logging.error(errmsg)
+          return False, errmsg
       clist.append(crdev)
       clist.append(crdev)
+
   try:
   try:
-    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
-    if device is not None:
-      logging.info("removing existing device %s", disk)
-      device.Remove()
+    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
   except errors.BlockDeviceError, err:
   except errors.BlockDeviceError, err:
-    pass
+    return False, "Can't create block device: %s" % str(err)
 
 
-  device = bdev.Create(disk.dev_type, disk.physical_id,
-                       clist, size)
-  if device is None:
-    raise ValueError("Can't create child device for %s, %s" %
-                     (disk, size))
   if on_primary or disk.AssembleOnSecondary():
   if on_primary or disk.AssembleOnSecondary():
-    if not device.Assemble():
-      errorstring = "Can't assemble device after creation"
-      logging.error(errorstring)
-      raise errors.BlockDeviceError("%s, very unusual event - check the node"
-                                    " daemon logs" % errorstring)
+    try:
+      device.Assemble()
+    except errors.BlockDeviceError, err:
+      errmsg = ("Can't assemble device after creation, very"
+                " unusual event: %s" % str(err))
+      logging.error(errmsg)
+      return False, errmsg
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
-      device.Open(force=True)
+      try:
+        device.Open(force=True)
+      except errors.BlockDeviceError, err:
+        errmsg = ("Can't make device r/w after creation, very"
+                  " unusual event: %s" % str(err))
+        logging.error(errmsg)
+        return False, errmsg
     DevCacheManager.UpdateCache(device.dev_path, owner,
                                 on_primary, disk.iv_name)
 
   device.SetInfo(info)
 
   physical_id = device.unique_id
     DevCacheManager.UpdateCache(device.dev_path, owner,
                                 on_primary, disk.iv_name)
 
   device.SetInfo(info)
 
   physical_id = device.unique_id
-  return physical_id
+  return True, physical_id
 
 
 
 
-def RemoveBlockDevice(disk):
+def BlockdevRemove(disk):
   """Remove a block device.
 
   @note: This is intended to be called recursively.
 
   """Remove a block device.
 
   @note: This is intended to be called recursively.
 
-  @type disk: L{objects.disk}
+  @type disk: L{objects.Disk}
   @param disk: the disk object we should remove
   @rtype: boolean
   @return: the success of the operation
 
   """
   @param disk: the disk object we should remove
   @rtype: boolean
   @return: the success of the operation
 
   """
+  msgs = []
+  result = True
   try:
   try:
-    # since we are removing the device, allow a partial match
-    # this allows removal of broken mirrors
-    rdev = _RecursiveFindBD(disk, allow_partial=True)
+    rdev = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     # probably can't attach
     logging.info("Can't attach to device %s in remove", disk)
     rdev = None
   if rdev is not None:
     r_path = rdev.dev_path
   except errors.BlockDeviceError, err:
     # probably can't attach
     logging.info("Can't attach to device %s in remove", disk)
     rdev = None
   if rdev is not None:
     r_path = rdev.dev_path
-    result = rdev.Remove()
+    try:
+      rdev.Remove()
+    except errors.BlockDeviceError, err:
+      msgs.append(str(err))
+      result = False
     if result:
       DevCacheManager.RemoveCache(r_path)
     if result:
       DevCacheManager.RemoveCache(r_path)
-  else:
-    result = True
+
   if disk.children:
     for child in disk.children:
   if disk.children:
     for child in disk.children:
-      result = result and RemoveBlockDevice(child)
-  return result
+      c_status, c_msg = BlockdevRemove(child)
+      result = result and c_status
+      if c_msg: # not an empty message
+        msgs.append(c_msg)
+
+  return (result, "; ".join(msgs))
 
 
 def _RecursiveAssembleBD(disk, owner, as_primary):
 
 
 def _RecursiveAssembleBD(disk, owner, as_primary):
@@ -994,11 +1264,12 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
         if children.count(None) >= mcn:
           raise
         cdev = None
         if children.count(None) >= mcn:
           raise
         cdev = None
-        logging.debug("Error in child activation: %s", str(err))
+        logging.error("Error in child activation (but continuing): %s",
+                      str(err))
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
-    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
+    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
@@ -1011,7 +1282,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
   return result
 
 
   return result
 
 
-def AssembleBlockDevice(disk, owner, as_primary):
+def BlockdevAssemble(disk, owner, as_primary):
   """Activate a block device for an instance.
 
   This is a wrapper over _RecursiveAssembleBD.
   """Activate a block device for an instance.
 
   This is a wrapper over _RecursiveAssembleBD.
@@ -1021,17 +1292,24 @@ def AssembleBlockDevice(disk, owner, as_primary):
       C{True} for secondary nodes
 
   """
       C{True} for secondary nodes
 
   """
-  result = _RecursiveAssembleBD(disk, owner, as_primary)
-  if isinstance(result, bdev.BlockDev):
-    result = result.dev_path
-  return result
+  status = True
+  result = "no error information"
+  try:
+    result = _RecursiveAssembleBD(disk, owner, as_primary)
+    if isinstance(result, bdev.BlockDev):
+      result = result.dev_path
+  except errors.BlockDeviceError, err:
+    result = "Error while assembling disk: %s" % str(err)
+    status = False
+  return (status, result)
 
 
 
 
-def ShutdownBlockDevice(disk):
+def BlockdevShutdown(disk):
   """Shut down a block device.
 
   """Shut down a block device.
 
-  First, if the device is assembled (can L{Attach()}), then the device
-  is shutdown. Then the children of the device are shutdown.
+  First, if the device is assembled (Attach() is successful), then
+  the device is shutdown. Then the children of the device are
+  shutdown.
 
   This function is called recursively. Note that we don't cache the
   children or such, as oppossed to assemble, shutdown of different
 
   This function is called recursively. Note that we don't cache the
   children or such, as oppossed to assemble, shutdown of different
@@ -1044,21 +1322,29 @@ def ShutdownBlockDevice(disk):
   @return: the success of the operation
 
   """
   @return: the success of the operation
 
   """
+  msgs = []
+  result = True
   r_dev = _RecursiveFindBD(disk)
   if r_dev is not None:
     r_path = r_dev.dev_path
   r_dev = _RecursiveFindBD(disk)
   if r_dev is not None:
     r_path = r_dev.dev_path
-    result = r_dev.Shutdown()
-    if result:
+    try:
+      r_dev.Shutdown()
       DevCacheManager.RemoveCache(r_path)
       DevCacheManager.RemoveCache(r_path)
-  else:
-    result = True
+    except errors.BlockDeviceError, err:
+      msgs.append(str(err))
+      result = False
+
   if disk.children:
     for child in disk.children:
   if disk.children:
     for child in disk.children:
-      result = result and ShutdownBlockDevice(child)
-  return result
+      c_status, c_msg = BlockdevShutdown(child)
+      result = result and c_status
+      if c_msg: # not an empty message
+        msgs.append(c_msg)
 
 
+  return (result, "; ".join(msgs))
 
 
-def MirrorAddChildren(parent_cdev, new_cdevs):
+
+def BlockdevAddchildren(parent_cdev, new_cdevs):
   """Extend a mirrored block device.
 
   @type parent_cdev: L{objects.Disk}
   """Extend a mirrored block device.
 
   @type parent_cdev: L{objects.Disk}
@@ -1069,7 +1355,7 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   @return: the success of the operation
 
   """
   @return: the success of the operation
 
   """
-  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
+  parent_bdev = _RecursiveFindBD(parent_cdev)
   if parent_bdev is None:
     logging.error("Can't find parent device")
     return False
   if parent_bdev is None:
     logging.error("Can't find parent device")
     return False
@@ -1082,7 +1368,7 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   return True
 
 
   return True
 
 
-def MirrorRemoveChildren(parent_cdev, new_cdevs):
+def BlockdevRemovechildren(parent_cdev, new_cdevs):
   """Shrink a mirrored block device.
 
   @type parent_cdev: L{objects.Disk}
   """Shrink a mirrored block device.
 
   @type parent_cdev: L{objects.Disk}
@@ -1114,7 +1400,7 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs):
   return True
 
 
   return True
 
 
-def GetMirrorStatus(disks):
+def BlockdevGetmirrorstatus(disks):
   """Get the mirroring status of a list of devices.
 
   @type disks: list of L{objects.Disk}
   """Get the mirroring status of a list of devices.
 
   @type disks: list of L{objects.Disk}
@@ -1122,7 +1408,7 @@ def GetMirrorStatus(disks):
   @rtype: disk
   @return:
       a list of (mirror_done, estimated_time) tuples, which
   @rtype: disk
   @return:
       a list of (mirror_done, estimated_time) tuples, which
-      are the result of L{bdev.BlockDevice.CombinedSyncStatus}
+      are the result of L{bdev.BlockDev.CombinedSyncStatus}
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
@@ -1136,17 +1422,13 @@ def GetMirrorStatus(disks):
   return stats
 
 
   return stats
 
 
-def _RecursiveFindBD(disk, allow_partial=False):
+def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
   """Check if a device is activated.
 
-  If so, return informations about the real device.
+  If so, return information about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
-  @type allow_partial: boolean
-  @param allow_partial: if true, don't abort the find if a
-      child of the device can't be found; this is intended
-      to be used when repairing mirrors
 
   @return: None if the device can't be found,
       otherwise the device instance
 
   @return: None if the device can't be found,
       otherwise the device instance
@@ -1157,13 +1439,13 @@ def _RecursiveFindBD(disk, allow_partial=False):
     for chdisk in disk.children:
       children.append(_RecursiveFindBD(chdisk))
 
     for chdisk in disk.children:
       children.append(_RecursiveFindBD(chdisk))
 
-  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
+  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
 
 
 
 
-def FindBlockDevice(disk):
+def BlockdevFind(disk):
   """Check if a device is activated.
 
   """Check if a device is activated.
 
-  If it is, return informations about the real device.
+  If it is, return information about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk to find
 
   @type disk: L{objects.Disk}
   @param disk: the disk to find
@@ -1173,10 +1455,39 @@ def FindBlockDevice(disk):
       estimated_time, is_degraded)
 
   """
       estimated_time, is_degraded)
 
   """
-  rbd = _RecursiveFindBD(disk)
+  try:
+    rbd = _RecursiveFindBD(disk)
+  except errors.BlockDeviceError, err:
+    return (False, str(err))
   if rbd is None:
   if rbd is None:
-    return rbd
-  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
+    return (True, None)
+  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
+
+
+def BlockdevGetsize(disks):
+  """Computes the size of the given disks.
+
+  If a disk is not found, returns None instead.
+
+  @type disks: list of L{objects.Disk}
+  @param disks: the list of disk to compute the size for
+  @rtype: list
+  @return: list with elements None if the disk cannot be found,
+      otherwise the size
+
+  """
+  result = []
+  for cf in disks:
+    try:
+      rbd = _RecursiveFindBD(cf)
+    except errors.BlockDeviceError, err:
+      result.append(None)
+      continue
+    if rbd is None:
+      result.append(None)
+    else:
+      result.append(rbd.GetActualSize())
+  return result
 
 
 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
 
 
 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
@@ -1209,19 +1520,14 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
                   file_name)
     return False
 
                   file_name)
     return False
 
-  allowed_files = [
-    constants.CLUSTER_CONF_FILE,
-    constants.ETC_HOSTS,
-    constants.SSH_KNOWN_HOSTS_FILE,
-    constants.VNC_PASSWORD_FILE,
-    ]
-
-  if file_name not in allowed_files:
+  if file_name not in _ALLOWED_UPLOAD_FILES:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
     return False
 
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
     return False
 
-  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
+  raw_data = _Decompress(data)
+
+  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
                   atime=atime, mtime=mtime)
   return True
 
                   atime=atime, mtime=mtime)
   return True
 
@@ -1408,6 +1714,7 @@ def OSEnvironment(instance, debug=0):
   result = {}
   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
   result['INSTANCE_NAME'] = instance.name
   result = {}
   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
   result['INSTANCE_NAME'] = instance.name
+  result['INSTANCE_OS'] = instance.os
   result['HYPERVISOR'] = instance.hypervisor
   result['DISK_COUNT'] = '%d' % len(instance.disks)
   result['NIC_COUNT'] = '%d' % len(instance.nics)
   result['HYPERVISOR'] = instance.hypervisor
   result['DISK_COUNT'] = '%d' % len(instance.disks)
   result['NIC_COUNT'] = '%d' % len(instance.nics)
@@ -1419,8 +1726,7 @@ def OSEnvironment(instance, debug=0):
                                     str(disk))
     real_disk.Open()
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
                                     str(disk))
     real_disk.Open()
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
-    # FIXME: When disks will have read-only mode, populate this
-    result['DISK_%d_ACCESS' % idx] = 'W'
+    result['DISK_%d_ACCESS' % idx] = disk.mode
     if constants.HV_DISK_TYPE in instance.hvparams:
       result['DISK_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_DISK_TYPE]
     if constants.HV_DISK_TYPE in instance.hvparams:
       result['DISK_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_DISK_TYPE]
@@ -1438,9 +1744,13 @@ def OSEnvironment(instance, debug=0):
       result['NIC_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_NIC_TYPE]
 
       result['NIC_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_NIC_TYPE]
 
+  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
+    for key, value in source.items():
+      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
+
   return result
 
   return result
 
-def GrowBlockDevice(disk, amount):
+def BlockdevGrow(disk, amount):
   """Grow a stack of block devices.
 
   This function is called recursively, with the childrens being the
   """Grow a stack of block devices.
 
   This function is called recursively, with the childrens being the
@@ -1466,7 +1776,7 @@ def GrowBlockDevice(disk, amount):
   return True, None
 
 
   return True, None
 
 
-def SnapshotBlockDevice(disk):
+def BlockdevSnapshot(disk):
   """Create a snapshot copy of a block device.
 
   This function is called recursively, and the snapshot is actually created
   """Create a snapshot copy of a block device.
 
   This function is called recursively, and the snapshot is actually created
@@ -1481,13 +1791,13 @@ def SnapshotBlockDevice(disk):
   if disk.children:
     if len(disk.children) == 1:
       # only one child, let's recurse on it
   if disk.children:
     if len(disk.children) == 1:
       # only one child, let's recurse on it
-      return SnapshotBlockDevice(disk.children[0])
+      return BlockdevSnapshot(disk.children[0])
     else:
       # more than one child, choose one that matches
       for child in disk.children:
         if child.size == disk.size:
           # return implies breaking the loop
     else:
       # more than one child, choose one that matches
       for child in disk.children:
         if child.size == disk.size:
           # return implies breaking the loop
-          return SnapshotBlockDevice(child)
+          return BlockdevSnapshot(child)
   elif disk.dev_type == constants.LD_LV:
     r_dev = _RecursiveFindBD(disk)
     if r_dev is not None:
   elif disk.dev_type == constants.LD_LV:
     r_dev = _RecursiveFindBD(disk)
     if r_dev is not None:
@@ -1543,8 +1853,8 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
   # the target command is built out of three individual commands,
   # which are joined by pipes; we check each individual command for
   # valid parameters
   # the target command is built out of three individual commands,
   # which are joined by pipes; we check each individual command for
   # valid parameters
-  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
-                               export_script, logfile)
+  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
+                               inst_os.path, export_script, logfile)
 
   comprcmd = "gzip"
 
 
   comprcmd = "gzip"
 
@@ -1557,7 +1867,7 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
 
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
 
-  result = utils.RunCmd(command, env=export_env)
+  result = utils.RunCmd(["bash", "-c", command], env=export_env)
 
   if result.failed:
     logging.error("os snapshot export command '%s' returned error: %s"
 
   if result.failed:
     logging.error("os snapshot export command '%s' returned error: %s"
@@ -1601,15 +1911,16 @@ def FinalizeExport(instance, snap_disks):
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
-  nic_count = 0
+  nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
   for nic_count, nic in enumerate(instance.nics):
+    nic_total += 1
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
-  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
+  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
@@ -1736,7 +2047,7 @@ def RemoveExport(export):
   return True
 
 
   return True
 
 
-def RenameBlockDevices(devlist):
+def BlockdevRename(devlist):
   """Rename a list of block devices.
 
   @type devlist: list of tuples
   """Rename a list of block devices.
 
   @type devlist: list of tuples
@@ -1766,7 +2077,7 @@ def RenameBlockDevices(devlist):
         # but we don't have the owner here - maybe parse from existing
         # cache? for now, we only lose lvm data when we rename, which
         # is less critical than DRBD or MD
         # but we don't have the owner here - maybe parse from existing
         # cache? for now, we only lose lvm data when we rename, which
         # is less critical than DRBD or MD
-    except errors.BlockDeviceError, err:
+    except errors.BlockDeviceError:
       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
       result = False
   return result
       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
       result = False
   return result
@@ -1836,7 +2147,7 @@ def RemoveFileStorageDir(file_storage_dir):
   @param file_storage_dir: the directory we should cleanup
   @rtype: tuple (success,)
   @return: tuple of one element, C{success}, denoting
   @param file_storage_dir: the directory we should cleanup
   @rtype: tuple (success,)
   @return: tuple of one element, C{success}, denoting
-      whether the operation was successfull
+      whether the operation was successful
 
   """
   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
 
   """
   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
@@ -1851,7 +2162,7 @@ def RemoveFileStorageDir(file_storage_dir):
       # deletes dir only if empty, otherwise we want to return False
       try:
         os.rmdir(file_storage_dir)
       # deletes dir only if empty, otherwise we want to return False
       try:
         os.rmdir(file_storage_dir)
-      except OSError, err:
+      except OSError:
         logging.exception("Cannot remove file storage directory '%s'",
                           file_storage_dir)
         result = False,
         logging.exception("Cannot remove file storage directory '%s'",
                           file_storage_dir)
         result = False,
@@ -1880,7 +2191,7 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
       if os.path.isdir(old_file_storage_dir):
         try:
           os.rename(old_file_storage_dir, new_file_storage_dir)
       if os.path.isdir(old_file_storage_dir):
         try:
           os.rename(old_file_storage_dir, new_file_storage_dir)
-        except OSError, err:
+        except OSError:
           logging.exception("Cannot rename '%s' to '%s'",
                             old_file_storage_dir, new_file_storage_dir)
           result =  False,
           logging.exception("Cannot rename '%s' to '%s'",
                             old_file_storage_dir, new_file_storage_dir)
           result =  False,
@@ -1932,7 +2243,7 @@ def JobQueueUpdate(file_name, content):
     return False
 
   # Write and replace the file atomically
     return False
 
   # Write and replace the file atomically
-  utils.WriteFile(file_name, data=content)
+  utils.WriteFile(file_name, data=_Decompress(content))
 
   return True
 
 
   return True
 
@@ -1940,7 +2251,7 @@ def JobQueueUpdate(file_name, content):
 def JobQueueRename(old, new):
   """Renames a job queue file.
 
 def JobQueueRename(old, new):
   """Renames a job queue file.
 
-  This is just a wrapper over L{os.rename} with proper checking.
+  This is just a wrapper over os.rename with proper checking.
 
   @type old: str
   @param old: the old (actual) file name
 
   @type old: str
   @param old: the old (actual) file name
@@ -1953,7 +2264,7 @@ def JobQueueRename(old, new):
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
-  os.rename(old, new)
+  utils.RenameFile(old, new, mkdir=True)
 
   return True
 
 
   return True
 
@@ -1978,12 +2289,14 @@ def JobQueueSetDrainFlag(drain_flag):
   return True
 
 
   return True
 
 
-def CloseBlockDevices(disks):
+def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
+  @param instance_name: if the argument is not empty, the symlinks
+      of this instance will be removed
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
@@ -2009,6 +2322,8 @@ def CloseBlockDevices(disks):
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
+    if instance_name:
+      _RemoveBlockDevLinks(instance_name, disks)
     return (True, "All devices secondary")
 
 
     return (True, "All devices secondary")
 
 
@@ -2034,6 +2349,149 @@ def ValidateHVParams(hvname, hvparams):
     return (False, str(err))
 
 
     return (False, str(err))
 
 
+def DemoteFromMC():
+  """Demotes the current node from master candidate role.
+
+  """
+  # try to ensure we're not the master by mistake
+  master, myself = ssconf.GetMasterAndMyself()
+  if master == myself:
+    return (False, "ssconf status shows I'm the master node, will not demote")
+  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
+    return (False, "The master daemon is running, will not demote")
+  try:
+    if os.path.isfile(constants.CLUSTER_CONF_FILE):
+      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      return (False, "Error while backing up cluster file: %s" % str(err))
+  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
+  return (True, "Done")
+
+
+def _FindDisks(nodes_ip, disks):
+  """Sets the physical ID on disks and returns the block devices.
+
+  """
+  # set the correct physical ID
+  my_name = utils.HostInfo().name
+  for cf in disks:
+    cf.SetPhysicalID(my_name, nodes_ip)
+
+  bdevs = []
+
+  for cf in disks:
+    rd = _RecursiveFindBD(cf)
+    if rd is None:
+      return (False, "Can't find device %s" % cf)
+    bdevs.append(rd)
+  return (True, bdevs)
+
+
+def DrbdDisconnectNet(nodes_ip, disks):
+  """Disconnects the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  # disconnect disks
+  for rd in bdevs:
+    try:
+      rd.DisconnectNet()
+    except errors.BlockDeviceError, err:
+      logging.exception("Failed to go into standalone mode")
+      return (False, "Can't change network configuration: %s" % str(err))
+  return (True, "All disks are now disconnected")
+
+
+def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
+  """Attaches the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  if multimaster:
+    for idx, rd in enumerate(bdevs):
+      try:
+        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
+      except EnvironmentError, err:
+        return (False, "Can't create symlink: %s" % str(err))
+  # reconnect disks, switch to new master configuration and if
+  # needed primary mode
+  for rd in bdevs:
+    try:
+      rd.AttachNet(multimaster)
+    except errors.BlockDeviceError, err:
+      return (False, "Can't change network configuration: %s" % str(err))
+  # wait until the disks are connected; we need to retry the re-attach
+  # if the device becomes standalone, as this might happen if the one
+  # node disconnects and reconnects in a different mode before the
+  # other node reconnects; in this case, one or both of the nodes will
+  # decide it has wrong configuration and switch to standalone
+  RECONNECT_TIMEOUT = 2 * 60
+  sleep_time = 0.100 # start with 100 miliseconds
+  timeout_limit = time.time() + RECONNECT_TIMEOUT
+  while time.time() < timeout_limit:
+    all_connected = True
+    for rd in bdevs:
+      stats = rd.GetProcStatus()
+      if not (stats.is_connected or stats.is_in_resync):
+        all_connected = False
+      if stats.is_standalone:
+        # peer had different config info and this node became
+        # standalone, even though this should not happen with the
+        # new staged way of changing disk configs
+        try:
+          rd.AttachNet(multimaster)
+        except errors.BlockDeviceError, err:
+          return (False, "Can't change network configuration: %s" % str(err))
+    if all_connected:
+      break
+    time.sleep(sleep_time)
+    sleep_time = min(5, sleep_time * 1.5)
+  if not all_connected:
+    return (False, "Timeout in disk reconnecting")
+  if multimaster:
+    # change to primary mode
+    for rd in bdevs:
+      try:
+        rd.Open()
+      except errors.BlockDeviceError, err:
+        return (False, "Can't change to primary mode: %s" % str(err))
+  if multimaster:
+    msg = "multi-master and primary"
+  else:
+    msg = "single-master"
+  return (True, "Disks are now configured as %s" % msg)
+
+
+def DrbdWaitSync(nodes_ip, disks):
+  """Wait until DRBDs have synchronized.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  min_resync = 100
+  alldone = True
+  failure = False
+  for rd in bdevs:
+    stats = rd.GetProcStatus()
+    if not (stats.is_connected or stats.is_in_resync):
+      failure = True
+      break
+    alldone = alldone and (not stats.is_in_resync)
+    if stats.sync_percent is not None:
+      min_resync = min(min_resync, stats.sync_percent)
+  return (not failure, (alldone, min_resync))
+
+
 class HooksRunner(object):
   """Hook runner.
 
 class HooksRunner(object):
   """Hook runner.
 
@@ -2041,8 +2499,6 @@ class HooksRunner(object):
   on the master side.
 
   """
   on the master side.
 
   """
-  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
-
   def __init__(self, hooks_base_dir=None):
     """Constructor for hooks runner.
 
   def __init__(self, hooks_base_dir=None):
     """Constructor for hooks runner.
 
@@ -2103,7 +2559,7 @@ class HooksRunner(object):
             #logging.exception("Error while closing fd %s", fd)
             pass
 
             #logging.exception("Error while closing fd %s", fd)
             pass
 
-    return result == 0, output
+    return result == 0, utils.SafeEncode(output.strip())
 
   def RunHooks(self, hpath, phase, env):
     """Run the scripts in the hooks directory.
 
   def RunHooks(self, hpath, phase, env):
     """Run the scripts in the hooks directory.
@@ -2139,7 +2595,7 @@ class HooksRunner(object):
     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
     try:
       dir_contents = utils.ListVisibleFiles(dir_name)
     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
     try:
       dir_contents = utils.ListVisibleFiles(dir_name)
-    except OSError, err:
+    except OSError:
       # FIXME: must log output in case of failures
       return rr
 
       # FIXME: must log output in case of failures
       return rr
 
@@ -2149,7 +2605,7 @@ class HooksRunner(object):
     for relname in dir_contents:
       fname = os.path.join(dir_name, relname)
       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
     for relname in dir_contents:
       fname = os.path.join(dir_name, relname)
       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
-          self.RE_MASK.match(relname) is not None):
+              constants.EXT_PLUGIN_MASK.match(relname) is not None):
         rrval = constants.HKR_SKIP
         output = ""
       else:
         rrval = constants.HKR_SKIP
         output = ""
       else:
@@ -2244,7 +2700,7 @@ class DevCacheManager(object):
         node nor not
     @type iv_name: str
     @param iv_name: the instance-visible name of the
         node nor not
     @type iv_name: str
     @param iv_name: the instance-visible name of the
-        device, as in L{objects.Disk.iv_name}
+        device, as in objects.Disk.iv_name
 
     @rtype: None
 
 
     @rtype: None
 
@@ -2262,7 +2718,7 @@ class DevCacheManager(object):
     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
     try:
       utils.WriteFile(fpath, data=fdata)
     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
     try:
       utils.WriteFile(fpath, data=fdata)
-    except EnvironmentError, err:
+    except EnvironmentError:
       logging.exception("Can't update bdev cache for %s", dev_path)
 
   @classmethod
       logging.exception("Can't update bdev cache for %s", dev_path)
 
   @classmethod
@@ -2284,5 +2740,5 @@ class DevCacheManager(object):
     fpath = cls._ConvertPath(dev_path)
     try:
       utils.RemoveFile(fpath)
     fpath = cls._ConvertPath(dev_path)
     try:
       utils.RemoveFile(fpath)
-    except EnvironmentError, err:
+    except EnvironmentError:
       logging.exception("Can't update bdev cache for %s", dev_path)
       logging.exception("Can't update bdev cache for %s", dev_path)