Change BlockDev.Remove() failure result
[ganeti-local] / lib / backend.py
index 207de71..c9d2513 100644 (file)
@@ -33,6 +33,8 @@ import subprocess
 import random
 import logging
 import tempfile
+import zlib
+import base64
 
 from ganeti import errors
 from ganeti import utils
@@ -45,13 +47,13 @@ from ganeti import ssconf
 
 
 def _GetConfig():
-  """Simple wrapper to return a ConfigReader.
+  """Simple wrapper to return a SimpleStore.
 
-  @rtype: L{ssconf.SimpleConfigReader}
-  @return: a SimpleConfigReader instance
+  @rtype: L{ssconf.SimpleStore}
+  @return: a SimpleStore instance
 
   """
-  return ssconf.SimpleConfigReader()
+  return ssconf.SimpleStore()
 
 
 def _GetSshRunner(cluster_name):
@@ -67,7 +69,27 @@ def _GetSshRunner(cluster_name):
   return ssh.SshRunner(cluster_name)
 
 
-def _CleanDirectory(path, exclude=[]):
+def _Decompress(data):
+  """Unpacks data compressed by the RPC client.
+
+  @type data: list or tuple
+  @param data: Data sent by RPC client
+  @rtype: str
+  @return: Decompressed data
+
+  """
+  assert isinstance(data, (list, tuple))
+  assert len(data) == 2
+  (encoding, content) = data
+  if encoding == constants.RPC_ENCODING_NONE:
+    return content
+  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
+    return zlib.decompress(base64.b64decode(content))
+  else:
+    raise AssertionError("Unknown data encoding")
+
+
+def _CleanDirectory(path, exclude=None):
   """Removes all regular files in a directory.
 
   @type path: str
@@ -75,14 +97,15 @@ def _CleanDirectory(path, exclude=[]):
   @type exclude: list
   @param exclude: list of files to be excluded, defaults
       to the empty list
-  @rtype: None
 
   """
   if not os.path.isdir(path):
     return
-
-  # Normalize excluded paths
-  exclude = [os.path.normpath(i) for i in exclude]
+  if exclude is None:
+    exclude = []
+  else:
+    # Normalize excluded paths
+    exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
     full_name = os.path.normpath(os.path.join(path, rel_name))
@@ -237,8 +260,9 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
                                                     mkdir=True)
   except errors.OpExecError, err:
-    logging.exception("Error while processing user ssh files")
-    return False
+    msg = "Error while processing user ssh files"
+    logging.exception(msg)
+    return (False, "%s: %s" % (msg, err))
 
   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
     utils.WriteFile(name, data=content, mode=0600)
@@ -247,7 +271,7 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
 
   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
 
-  return True
+  return (True, "Node added successfully")
 
 
 def LeaveCluster():
@@ -257,7 +281,7 @@ def LeaveCluster():
   from the cluster.
 
   If processing is successful, then it raises an
-  L{errors.GanetiQuitException} which is used as a special case to
+  L{errors.QuitGanetiException} which is used as a special case to
   shutdown the node daemon.
 
   """
@@ -350,37 +374,38 @@ def VerifyNode(what, cluster_name):
   """
   result = {}
 
-  if 'hypervisor' in what:
-    result['hypervisor'] = my_dict = {}
-    for hv_name in what['hypervisor']:
-      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+  if constants.NV_HYPERVISOR in what:
+    result[constants.NV_HYPERVISOR] = tmp = {}
+    for hv_name in what[constants.NV_HYPERVISOR]:
+      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
-  if 'filelist' in what:
-    result['filelist'] = utils.FingerprintFiles(what['filelist'])
+  if constants.NV_FILELIST in what:
+    result[constants.NV_FILELIST] = utils.FingerprintFiles(
+      what[constants.NV_FILELIST])
 
-  if 'nodelist' in what:
-    result['nodelist'] = {}
-    random.shuffle(what['nodelist'])
-    for node in what['nodelist']:
+  if constants.NV_NODELIST in what:
+    result[constants.NV_NODELIST] = tmp = {}
+    random.shuffle(what[constants.NV_NODELIST])
+    for node in what[constants.NV_NODELIST]:
       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
-        result['nodelist'][node] = message
-  if 'node-net-test' in what:
-    result['node-net-test'] = {}
+        tmp[node] = message
+
+  if constants.NV_NODENETTEST in what:
+    result[constants.NV_NODENETTEST] = tmp = {}
     my_name = utils.HostInfo().name
     my_pip = my_sip = None
-    for name, pip, sip in what['node-net-test']:
+    for name, pip, sip in what[constants.NV_NODENETTEST]:
       if name == my_name:
         my_pip = pip
         my_sip = sip
         break
     if not my_pip:
-      result['node-net-test'][my_name] = ("Can't find my own"
-                                          " primary/secondary IP"
-                                          " in the node list")
+      tmp[my_name] = ("Can't find my own primary/secondary IP"
+                      " in the node list")
     else:
       port = utils.GetNodeDaemonPort()
-      for name, pip, sip in what['node-net-test']:
+      for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
@@ -388,9 +413,34 @@ def VerifyNode(what, cluster_name):
           if not utils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
-          result['node-net-test'][name] = ("failure using the %s"
-                                           " interface(s)" %
-                                           " and ".join(fail))
+          tmp[name] = ("failure using the %s interface(s)" %
+                       " and ".join(fail))
+
+  if constants.NV_LVLIST in what:
+    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+
+  if constants.NV_INSTANCELIST in what:
+    result[constants.NV_INSTANCELIST] = GetInstanceList(
+      what[constants.NV_INSTANCELIST])
+
+  if constants.NV_VGLIST in what:
+    result[constants.NV_VGLIST] = ListVolumeGroups()
+
+  if constants.NV_VERSION in what:
+    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
+                                    constants.RELEASE_VERSION)
+
+  if constants.NV_HVINFO in what:
+    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
+    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
+
+  if constants.NV_DRBDLIST in what:
+    try:
+      used_minors = bdev.DRBD8.GetUsedDevs().keys()
+    except errors.BlockDeviceError:
+      logging.warning("Can't get used minors list", exc_info=True)
+      used_minors = []
+    result[constants.NV_DRBDLIST] = used_minors
 
   return result
 
@@ -557,6 +607,30 @@ def GetInstanceInfo(instance, hname):
   return output
 
 
+def GetInstanceMigratable(instance):
+  """Gives whether an instance can be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: object representing the instance to be checked.
+
+  @rtype: tuple
+  @return: tuple of (result, description) where:
+      - result: whether the instance can be migrated or not
+      - description: a description of the issue, if relevant
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  if instance.name not in hyper.ListInstances():
+    return (False, 'not running')
+
+  for idx in range(len(instance.disks)):
+    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
+    if not os.path.islink(link_name):
+      return (False, 'not restarted since ganeti 1.2.5')
+
+  return (True, '')
+
+
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
@@ -567,7 +641,7 @@ def GetAllInstancesInfo(hypervisor_list):
   @type hypervisor_list: list
   @param hypervisor_list: list of hypervisors to query for instance data
 
-  @rtype: dict of dicts
+  @rtype: dict
   @return: dictionary of instance: data, with data having the following keys:
       - memory: memory size of instance (int)
       - state: xen state of instance (string)
@@ -595,7 +669,7 @@ def GetAllInstancesInfo(hypervisor_list):
   return output
 
 
-def AddOSToInstance(instance):
+def InstanceOsAdd(instance):
   """Add an OS to an instance.
 
   @type instance: L{objects.Instance}
@@ -604,27 +678,33 @@ def AddOSToInstance(instance):
   @return: the success of the operation
 
   """
-  inst_os = OSFromDisk(instance.os)
+  try:
+    inst_os = OSFromDisk(instance.os)
+  except errors.InvalidOS, err:
+    os_name, os_dir, os_err = err.args
+    if os_dir is None:
+      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
+    else:
+      return (False, "Error parsing OS '%s' in directory %s: %s" %
+              (os_name, os_dir, os_err))
 
-  create_script = inst_os.create_script
   create_env = OSEnvironment(instance)
 
   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
                                      instance.name, int(time.time()))
-  if not os.path.exists(constants.LOG_OS_DIR):
-    os.mkdir(constants.LOG_OS_DIR, 0750)
 
-  command = utils.BuildShellCmd("cd %s && %s &>%s",
-                                inst_os.path, create_script, logfile)
-
-  result = utils.RunCmd(command, env=create_env)
+  result = utils.RunCmd([inst_os.create_script], env=create_env,
+                        cwd=inst_os.path, output=logfile,)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
-                  " output: %s", command, result.fail_reason, logfile,
+                  " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
-    return False
+    lines = [utils.SafeEncode(val)
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS create script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
-  return True
+  return (True, "Successfully installed")
 
 
 def RunRenameInstance(instance, old_name):
@@ -640,27 +720,25 @@ def RunRenameInstance(instance, old_name):
   """
   inst_os = OSFromDisk(instance.os)
 
-  script = inst_os.rename_script
   rename_env = OSEnvironment(instance)
   rename_env['OLD_INSTANCE_NAME'] = old_name
 
   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
                                            old_name,
                                            instance.name, int(time.time()))
-  if not os.path.exists(constants.LOG_OS_DIR):
-    os.mkdir(constants.LOG_OS_DIR, 0750)
-
-  command = utils.BuildShellCmd("cd %s && %s &>%s",
-                                inst_os.path, script, logfile)
 
-  result = utils.RunCmd(command, env=rename_env)
+  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
+                        cwd=inst_os.path, output=logfile)
 
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
-                  command, result.fail_reason, result.output)
-    return False
+                  result.cmd, result.fail_reason, result.output)
+    lines = [utils.SafeEncode(val)
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS rename script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
-  return True
+  return (True, "Rename successful")
 
 
 def _GetVGInfo(vg_name):
@@ -703,7 +781,53 @@ def _GetVGInfo(vg_name):
   return retdic
 
 
-def _GatherBlockDevs(instance):
+def _GetBlockDevSymlinkPath(instance_name, idx):
+  return os.path.join(constants.DISK_LINKS_DIR,
+                      "%s:%d" % (instance_name, idx))
+
+
+def _SymlinkBlockDev(instance_name, device_path, idx):
+  """Set up symlinks to a instance's block device.
+
+  This is an auxiliary function run when an instance is start (on the primary
+  node) or when an instance is migrated (on the target node).
+
+
+  @param instance_name: the name of the target instance
+  @param device_path: path of the physical block device, on the node
+  @param idx: the disk index
+  @return: absolute path to the disk's symlink
+
+  """
+  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+  try:
+    os.symlink(device_path, link_name)
+  except OSError, err:
+    if err.errno == errno.EEXIST:
+      if (not os.path.islink(link_name) or
+          os.readlink(link_name) != device_path):
+        os.remove(link_name)
+        os.symlink(device_path, link_name)
+    else:
+      raise
+
+  return link_name
+
+
+def _RemoveBlockDevLinks(instance_name, disks):
+  """Remove the block device symlinks belonging to the given instance.
+
+  """
+  for idx, disk in enumerate(disks):
+    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+    if os.path.islink(link_name):
+      try:
+        os.remove(link_name)
+      except OSError:
+        logging.exception("Can't remove symlink '%s'", link_name)
+
+
+def _GatherAndLinkBlockDevs(instance):
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
@@ -711,18 +835,25 @@ def _GatherBlockDevs(instance):
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
-  @rtype: list of L{bdev.BlockDev}
-  @return: list of the block devices
+  @rtype: list
+  @return: list of (disk_object, device_path)
 
   """
   block_devices = []
-  for disk in instance.disks:
+  for idx, disk in enumerate(instance.disks):
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
-    block_devices.append((disk, device))
+    try:
+      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
+    except OSError, e:
+      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
+                                    e.strerror)
+
+    block_devices.append((disk, link_name))
+
   return block_devices
 
 
@@ -738,18 +869,21 @@ def StartInstance(instance, extra_args):
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
-    return True
-
-  block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+    return (True, "Already running")
 
   try:
+    block_devices = _GatherAndLinkBlockDevs(instance)
+    hyper = hypervisor.GetHypervisor(instance.hypervisor)
     hyper.StartInstance(instance, block_devices, extra_args)
+  except errors.BlockDeviceError, err:
+    logging.exception("Failed to start instance")
+    return (False, "Block device error: %s" % str(err))
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
-    return False
+    _RemoveBlockDevLinks(instance.name, instance.disks)
+    return (False, "Hypervisor error: %s" % str(err))
 
-  return True
+  return (True, "Instance started successfully")
 
 
 def ShutdownInstance(instance):
@@ -773,11 +907,10 @@ def ShutdownInstance(instance):
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
-    logging.error("Failed to stop instance")
+    logging.error("Failed to stop instance: %s" % err)
     return False
 
   # test every 10secs for 2min
-  shutdown_ok = False
 
   time.sleep(1)
   for dummy in range(11):
@@ -786,20 +919,23 @@ def ShutdownInstance(instance):
     time.sleep(10)
   else:
     # the shutdown did not succeed
-    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
+    logging.error("Shutdown of '%s' unsuccessful, using destroy",
+                  instance.name)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      logging.exception("Failed to stop instance")
+      logging.exception("Failed to stop instance: %s" % err)
       return False
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
-      logging.error("could not shutdown instance '%s' even by destroy",
+      logging.error("Could not shutdown instance '%s' even by destroy",
                     instance.name)
       return False
 
+  _RemoveBlockDevLinks(instance.name, instance.disks)
+
   return True
 
 
@@ -848,6 +984,65 @@ def RebootInstance(instance, reboot_type, extra_args):
   return True
 
 
+def MigrationInfo(instance):
+  """Gather information about an instance to be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    info = hyper.MigrationInfo(instance)
+  except errors.HypervisorError, err:
+    msg = "Failed to fetch migration information"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, info)
+
+
+def AcceptInstance(instance, info, target):
+  """Prepare the node to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type target: string
+  @param target: target host (usually ip), on this node
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.AcceptInstance(instance, info, target)
+  except errors.HypervisorError, err:
+    msg = "Failed to accept instance"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Accept successfull")
+
+
+def FinalizeMigration(instance, info, success):
+  """Finalize any preparation to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type success: boolean
+  @param success: whether the migration was a success or a failure
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.FinalizeMigration(instance, info, success)
+  except errors.HypervisorError, err:
+    msg = "Failed to finalize migration"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Migration Finalized")
+
+
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
@@ -864,18 +1059,18 @@ def MigrateInstance(instance, target, live):
       - msg is a string with details in case of failure
 
   """
-  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
-    msg = "Failed to migrate instance: %s" % str(err)
-    logging.error(msg)
-    return (False, msg)
+    msg = "Failed to migrate instance"
+    logging.exception(msg)
+    return (False, "%s: %s" % (msg, err))
   return (True, "Migration successfull")
 
 
-def CreateBlockDevice(disk, size, owner, on_primary, info):
+def BlockdevCreate(disk, size, owner, on_primary, info):
   """Creates a block device for an instance.
 
   @type disk: L{objects.Disk}
@@ -905,25 +1100,17 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
         # be assembled
         crdev.Open()
       clist.append(crdev)
+
   try:
-    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
-    if device is not None:
-      logging.info("removing existing device %s", disk)
-      device.Remove()
-  except errors.BlockDeviceError, err:
-    pass
+    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
+  except errors.GenericError, err:
+    return False, "Can't create block device: %s" % str(err)
 
-  device = bdev.Create(disk.dev_type, disk.physical_id,
-                       clist, size)
-  if device is None:
-    raise ValueError("Can't create child device for %s, %s" %
-                     (disk, size))
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
-      errorstring = "Can't assemble device after creation"
+      errorstring = "Can't assemble device after creation, very unusual event"
       logging.error(errorstring)
-      raise errors.BlockDeviceError("%s, very unusual event - check the node"
-                                    " daemon logs" % errorstring)
+      return False, errorstring
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
       device.Open(force=True)
@@ -933,39 +1120,46 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   device.SetInfo(info)
 
   physical_id = device.unique_id
-  return physical_id
+  return True, physical_id
 
 
-def RemoveBlockDevice(disk):
+def BlockdevRemove(disk):
   """Remove a block device.
 
   @note: This is intended to be called recursively.
 
-  @type disk: L{objects.disk}
+  @type disk: L{objects.Disk}
   @param disk: the disk object we should remove
   @rtype: boolean
   @return: the success of the operation
 
   """
+  msgs = []
+  result = True
   try:
-    # since we are removing the device, allow a partial match
-    # this allows removal of broken mirrors
-    rdev = _RecursiveFindBD(disk, allow_partial=True)
+    rdev = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     # probably can't attach
     logging.info("Can't attach to device %s in remove", disk)
     rdev = None
   if rdev is not None:
     r_path = rdev.dev_path
-    result = rdev.Remove()
+    try:
+      rdev.Remove()
+    except errors.BlockDeviceError, err:
+      msgs.append(str(err))
+      result = False
     if result:
       DevCacheManager.RemoveCache(r_path)
-  else:
-    result = True
+
   if disk.children:
     for child in disk.children:
-      result = result and RemoveBlockDevice(child)
-  return result
+      c_status, c_msg = BlockdevRemove(child)
+      result = result and c_status
+      if c_msg: # not an empty message
+        msgs.append(c_msg)
+
+  return (result, "; ".join(msgs))
 
 
 def _RecursiveAssembleBD(disk, owner, as_primary):
@@ -1004,11 +1198,11 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
         if children.count(None) >= mcn:
           raise
         cdev = None
-        logging.debug("Error in child activation: %s", str(err))
+        logging.error("Error in child activation: %s", str(err))
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
-    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
+    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
@@ -1021,7 +1215,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
   return result
 
 
-def AssembleBlockDevice(disk, owner, as_primary):
+def BlockdevAssemble(disk, owner, as_primary):
   """Activate a block device for an instance.
 
   This is a wrapper over _RecursiveAssembleBD.
@@ -1031,17 +1225,26 @@ def AssembleBlockDevice(disk, owner, as_primary):
       C{True} for secondary nodes
 
   """
-  result = _RecursiveAssembleBD(disk, owner, as_primary)
-  if isinstance(result, bdev.BlockDev):
-    result = result.dev_path
-  return result
+  status = False
+  result = "no error information"
+  try:
+    result = _RecursiveAssembleBD(disk, owner, as_primary)
+    if isinstance(result, bdev.BlockDev):
+      result = result.dev_path
+      status = True
+    if result == True:
+      status = True
+  except errors.BlockDeviceError, err:
+    result = "Error while assembling disk: %s" % str(err)
+  return (status, result)
 
 
-def ShutdownBlockDevice(disk):
+def BlockdevShutdown(disk):
   """Shut down a block device.
 
-  First, if the device is assembled (can L{Attach()}), then the device
-  is shutdown. Then the children of the device are shutdown.
+  First, if the device is assembled (Attach() is successfull), then
+  the device is shutdown. Then the children of the device are
+  shutdown.
 
   This function is called recursively. Note that we don't cache the
   children or such, as oppossed to assemble, shutdown of different
@@ -1054,21 +1257,29 @@ def ShutdownBlockDevice(disk):
   @return: the success of the operation
 
   """
+  msgs = []
+  result = True
   r_dev = _RecursiveFindBD(disk)
   if r_dev is not None:
     r_path = r_dev.dev_path
-    result = r_dev.Shutdown()
-    if result:
+    try:
+      r_dev.Shutdown()
       DevCacheManager.RemoveCache(r_path)
-  else:
-    result = True
+    except errors.BlockDeviceError, err:
+      msgs.append(str(err))
+      result = False
+
   if disk.children:
     for child in disk.children:
-      result = result and ShutdownBlockDevice(child)
-  return result
+      c_status, c_msg = BlockdevShutdown(child)
+      result = result and c_status
+      if c_msg: # not an empty message
+        msgs.append(c_msg)
+
+  return (result, "; ".join(msgs))
 
 
-def MirrorAddChildren(parent_cdev, new_cdevs):
+def BlockdevAddchildren(parent_cdev, new_cdevs):
   """Extend a mirrored block device.
 
   @type parent_cdev: L{objects.Disk}
@@ -1079,7 +1290,7 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   @return: the success of the operation
 
   """
-  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
+  parent_bdev = _RecursiveFindBD(parent_cdev)
   if parent_bdev is None:
     logging.error("Can't find parent device")
     return False
@@ -1092,7 +1303,7 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   return True
 
 
-def MirrorRemoveChildren(parent_cdev, new_cdevs):
+def BlockdevRemovechildren(parent_cdev, new_cdevs):
   """Shrink a mirrored block device.
 
   @type parent_cdev: L{objects.Disk}
@@ -1124,7 +1335,7 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs):
   return True
 
 
-def GetMirrorStatus(disks):
+def BlockdevGetmirrorstatus(disks):
   """Get the mirroring status of a list of devices.
 
   @type disks: list of L{objects.Disk}
@@ -1132,7 +1343,7 @@ def GetMirrorStatus(disks):
   @rtype: disk
   @return:
       a list of (mirror_done, estimated_time) tuples, which
-      are the result of L{bdev.BlockDevice.CombinedSyncStatus}
+      are the result of L{bdev.BlockDev.CombinedSyncStatus}
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
@@ -1146,17 +1357,13 @@ def GetMirrorStatus(disks):
   return stats
 
 
-def _RecursiveFindBD(disk, allow_partial=False):
+def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
   If so, return informations about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
-  @type allow_partial: boolean
-  @param allow_partial: if true, don't abort the find if a
-      child of the device can't be found; this is intended
-      to be used when repairing mirrors
 
   @return: None if the device can't be found,
       otherwise the device instance
@@ -1170,7 +1377,7 @@ def _RecursiveFindBD(disk, allow_partial=False):
   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
 
 
-def FindBlockDevice(disk):
+def BlockdevFind(disk):
   """Check if a device is activated.
 
   If it is, return informations about the real device.
@@ -1183,10 +1390,13 @@ def FindBlockDevice(disk):
       estimated_time, is_degraded)
 
   """
-  rbd = _RecursiveFindBD(disk)
+  try:
+    rbd = _RecursiveFindBD(disk)
+  except errors.BlockDeviceError, err:
+    return (False, str(err))
   if rbd is None:
-    return rbd
-  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
+    return (True, None)
+  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
 
 
 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
@@ -1231,11 +1441,22 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
                  " upload targets: '%s'", file_name)
     return False
 
-  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
+  raw_data = _Decompress(data)
+
+  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
                   atime=atime, mtime=mtime)
   return True
 
 
+def WriteSsconfFiles(values):
+  """Update all ssconf files.
+
+  Wrapper around the SimpleStore.WriteFiles.
+
+  """
+  ssconf.SimpleStore().WriteFiles(values)
+
+
 def _ErrnoOrStr(err):
   """Format an EnvironmentError exception.
 
@@ -1409,6 +1630,7 @@ def OSEnvironment(instance, debug=0):
   result = {}
   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
   result['INSTANCE_NAME'] = instance.name
+  result['INSTANCE_OS'] = instance.os
   result['HYPERVISOR'] = instance.hypervisor
   result['DISK_COUNT'] = '%d' % len(instance.disks)
   result['NIC_COUNT'] = '%d' % len(instance.nics)
@@ -1421,7 +1643,7 @@ def OSEnvironment(instance, debug=0):
     real_disk.Open()
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
     # FIXME: When disks will have read-only mode, populate this
-    result['DISK_%d_ACCESS' % idx] = 'W'
+    result['DISK_%d_ACCESS' % idx] = disk.mode
     if constants.HV_DISK_TYPE in instance.hvparams:
       result['DISK_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_DISK_TYPE]
@@ -1441,7 +1663,7 @@ def OSEnvironment(instance, debug=0):
 
   return result
 
-def GrowBlockDevice(disk, amount):
+def BlockdevGrow(disk, amount):
   """Grow a stack of block devices.
 
   This function is called recursively, with the childrens being the
@@ -1467,7 +1689,7 @@ def GrowBlockDevice(disk, amount):
   return True, None
 
 
-def SnapshotBlockDevice(disk):
+def BlockdevSnapshot(disk):
   """Create a snapshot copy of a block device.
 
   This function is called recursively, and the snapshot is actually created
@@ -1482,13 +1704,13 @@ def SnapshotBlockDevice(disk):
   if disk.children:
     if len(disk.children) == 1:
       # only one child, let's recurse on it
-      return SnapshotBlockDevice(disk.children[0])
+      return BlockdevSnapshot(disk.children[0])
     else:
       # more than one child, choose one that matches
       for child in disk.children:
         if child.size == disk.size:
           # return implies breaking the loop
-          return SnapshotBlockDevice(child)
+          return BlockdevSnapshot(child)
   elif disk.dev_type == constants.LD_LV:
     r_dev = _RecursiveFindBD(disk)
     if r_dev is not None:
@@ -1602,34 +1824,32 @@ def FinalizeExport(instance, snap_disks):
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
-  nic_count = 0
+  nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
+    nic_total += 1
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
-  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
+  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
 
-  disk_count = 0
+  disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
     if disk:
+      disk_total += 1
       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
                  ('%s' % disk.iv_name))
       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
                  ('%s' % disk.physical_id[1]))
       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
                  ('%d' % disk.size))
-  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
 
-  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
-  cfo = open(cff, 'w')
-  try:
-    config.write(cfo)
-  finally:
-    cfo.close()
+  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
 
+  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
+                  data=config.Dumps())
   shutil.rmtree(finaldestdir, True)
   shutil.move(destdir, finaldestdir)
 
@@ -1682,8 +1902,8 @@ def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
   comprcmd = "gunzip"
-  impcmd = utils.BuildShellCmd("(cd %s; %s &>%s)", inst_os.path, import_script,
-                               logfile)
+  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
+                               import_script, logfile)
 
   final_result = []
   for idx, image in enumerate(src_images):
@@ -1697,8 +1917,9 @@ def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
       import_env['IMPORT_INDEX'] = str(idx)
       result = utils.RunCmd(command, env=import_env)
       if result.failed:
-        logging.error("disk import command '%s' returned error: %s"
-                      " output: %s", command, result.fail_reason, result.output)
+        logging.error("Disk import command '%s' returned error: %s"
+                      " output: %s", command, result.fail_reason,
+                      result.output)
         final_result.append(False)
       else:
         final_result.append(True)
@@ -1739,7 +1960,7 @@ def RemoveExport(export):
   return True
 
 
-def RenameBlockDevices(devlist):
+def BlockdevRename(devlist):
   """Rename a list of block devices.
 
   @type devlist: list of tuples
@@ -1935,7 +2156,7 @@ def JobQueueUpdate(file_name, content):
     return False
 
   # Write and replace the file atomically
-  utils.WriteFile(file_name, data=content)
+  utils.WriteFile(file_name, data=_Decompress(content))
 
   return True
 
@@ -1943,7 +2164,7 @@ def JobQueueUpdate(file_name, content):
 def JobQueueRename(old, new):
   """Renames a job queue file.
 
-  This is just a wrapper over L{os.rename} with proper checking.
+  This is just a wrapper over os.rename with proper checking.
 
   @type old: str
   @param old: the old (actual) file name
@@ -1956,7 +2177,7 @@ def JobQueueRename(old, new):
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
-  os.rename(old, new)
+  utils.RenameFile(old, new, mkdir=True)
 
   return True
 
@@ -1981,12 +2202,14 @@ def JobQueueSetDrainFlag(drain_flag):
   return True
 
 
-def CloseBlockDevices(disks):
+def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
+  @param instance_name: if the argument is not empty, the symlinks
+      of this instance will be removed
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
@@ -2012,6 +2235,8 @@ def CloseBlockDevices(disks):
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
+    if instance_name:
+      _RemoveBlockDevLinks(instance_name, disks)
     return (True, "All devices secondary")
 
 
@@ -2037,6 +2262,148 @@ def ValidateHVParams(hvname, hvparams):
     return (False, str(err))
 
 
+def DemoteFromMC():
+  """Demotes the current node from master candidate role.
+
+  """
+  # try to ensure we're not the master by mistake
+  master, myself = ssconf.GetMasterAndMyself()
+  if master == myself:
+    return (False, "ssconf status shows I'm the master node, will not demote")
+  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
+    return (False, "The master daemon is running, will not demote")
+  try:
+    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      return (False, "Error while backing up cluster file: %s" % str(err))
+  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
+  return (True, "Done")
+
+
+def _FindDisks(nodes_ip, disks):
+  """Sets the physical ID on disks and returns the block devices.
+
+  """
+  # set the correct physical ID
+  my_name = utils.HostInfo().name
+  for cf in disks:
+    cf.SetPhysicalID(my_name, nodes_ip)
+
+  bdevs = []
+
+  for cf in disks:
+    rd = _RecursiveFindBD(cf)
+    if rd is None:
+      return (False, "Can't find device %s" % cf)
+    bdevs.append(rd)
+  return (True, bdevs)
+
+
+def DrbdDisconnectNet(nodes_ip, disks):
+  """Disconnects the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  # disconnect disks
+  for rd in bdevs:
+    try:
+      rd.DisconnectNet()
+    except errors.BlockDeviceError, err:
+      logging.exception("Failed to go into standalone mode")
+      return (False, "Can't change network configuration: %s" % str(err))
+  return (True, "All disks are now disconnected")
+
+
+def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
+  """Attaches the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  if multimaster:
+    for idx, rd in enumerate(bdevs):
+      try:
+        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
+      except EnvironmentError, err:
+        return (False, "Can't create symlink: %s" % str(err))
+  # reconnect disks, switch to new master configuration and if
+  # needed primary mode
+  for rd in bdevs:
+    try:
+      rd.AttachNet(multimaster)
+    except errors.BlockDeviceError, err:
+      return (False, "Can't change network configuration: %s" % str(err))
+  # wait until the disks are connected; we need to retry the re-attach
+  # if the device becomes standalone, as this might happen if the one
+  # node disconnects and reconnects in a different mode before the
+  # other node reconnects; in this case, one or both of the nodes will
+  # decide it has wrong configuration and switch to standalone
+  RECONNECT_TIMEOUT = 2 * 60
+  sleep_time = 0.100 # start with 100 miliseconds
+  timeout_limit = time.time() + RECONNECT_TIMEOUT
+  while time.time() < timeout_limit:
+    all_connected = True
+    for rd in bdevs:
+      stats = rd.GetProcStatus()
+      if not (stats.is_connected or stats.is_in_resync):
+        all_connected = False
+      if stats.is_standalone:
+        # peer had different config info and this node became
+        # standalone, even though this should not happen with the
+        # new staged way of changing disk configs
+        try:
+          rd.ReAttachNet(multimaster)
+        except errors.BlockDeviceError, err:
+          return (False, "Can't change network configuration: %s" % str(err))
+    if all_connected:
+      break
+    time.sleep(sleep_time)
+    sleep_time = min(5, sleep_time * 1.5)
+  if not all_connected:
+    return (False, "Timeout in disk reconnecting")
+  if multimaster:
+    # change to primary mode
+    for rd in bdevs:
+      try:
+        rd.Open()
+      except errors.BlockDeviceError, err:
+        return (False, "Can't change to primary mode: %s" % str(err))
+  if multimaster:
+    msg = "multi-master and primary"
+  else:
+    msg = "single-master"
+  return (True, "Disks are now configured as %s" % msg)
+
+
+def DrbdWaitSync(nodes_ip, disks):
+  """Wait until DRBDs have synchronized.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  min_resync = 100
+  alldone = True
+  failure = False
+  for rd in bdevs:
+    stats = rd.GetProcStatus()
+    if not (stats.is_connected or stats.is_in_resync):
+      failure = True
+      break
+    alldone = alldone and (not stats.is_in_resync)
+    if stats.sync_percent is not None:
+      min_resync = min(min_resync, stats.sync_percent)
+  return (not failure, (alldone, min_resync))
+
+
 class HooksRunner(object):
   """Hook runner.
 
@@ -2106,7 +2473,7 @@ class HooksRunner(object):
             #logging.exception("Error while closing fd %s", fd)
             pass
 
-    return result == 0, output
+    return result == 0, utils.SafeEncode(output.strip())
 
   def RunHooks(self, hpath, phase, env):
     """Run the scripts in the hooks directory.
@@ -2247,7 +2614,7 @@ class DevCacheManager(object):
         node nor not
     @type iv_name: str
     @param iv_name: the instance-visible name of the
-        device, as in L{objects.Disk.iv_name}
+        device, as in objects.Disk.iv_name
 
     @rtype: None