Implement software release version checks too
[ganeti-local] / lib / backend.py
index 89c1b2b..c956ecc 100644 (file)
@@ -33,6 +33,8 @@ import subprocess
 import random
 import logging
 import tempfile
 import random
 import logging
 import tempfile
+import zlib
+import base64
 
 from ganeti import errors
 from ganeti import utils
 
 from ganeti import errors
 from ganeti import utils
@@ -67,7 +69,27 @@ def _GetSshRunner(cluster_name):
   return ssh.SshRunner(cluster_name)
 
 
   return ssh.SshRunner(cluster_name)
 
 
-def _CleanDirectory(path, exclude=[]):
+def _Decompress(data):
+  """Unpacks data compressed by the RPC client.
+
+  @type data: list or tuple
+  @param data: Data sent by RPC client
+  @rtype: str
+  @return: Decompressed data
+
+  """
+  assert isinstance(data, (list, tuple))
+  assert len(data) == 2
+  (encoding, content) = data
+  if encoding == constants.RPC_ENCODING_NONE:
+    return content
+  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
+    return zlib.decompress(base64.b64decode(content))
+  else:
+    raise AssertionError("Unknown data encoding")
+
+
+def _CleanDirectory(path, exclude=None):
   """Removes all regular files in a directory.
 
   @type path: str
   """Removes all regular files in a directory.
 
   @type path: str
@@ -75,14 +97,15 @@ def _CleanDirectory(path, exclude=[]):
   @type exclude: list
   @param exclude: list of files to be excluded, defaults
       to the empty list
   @type exclude: list
   @param exclude: list of files to be excluded, defaults
       to the empty list
-  @rtype: None
 
   """
   if not os.path.isdir(path):
     return
 
   """
   if not os.path.isdir(path):
     return
-
-  # Normalize excluded paths
-  exclude = [os.path.normpath(i) for i in exclude]
+  if exclude is None:
+    exclude = []
+  else:
+    # Normalize excluded paths
+    exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
     full_name = os.path.normpath(os.path.join(path, rel_name))
 
   for rel_name in utils.ListVisibleFiles(path):
     full_name = os.path.normpath(os.path.join(path, rel_name))
@@ -257,7 +280,7 @@ def LeaveCluster():
   from the cluster.
 
   If processing is successful, then it raises an
   from the cluster.
 
   If processing is successful, then it raises an
-  L{errors.GanetiQuitException} which is used as a special case to
+  L{errors.QuitGanetiException} which is used as a special case to
   shutdown the node daemon.
 
   """
   shutdown the node daemon.
 
   """
@@ -350,37 +373,38 @@ def VerifyNode(what, cluster_name):
   """
   result = {}
 
   """
   result = {}
 
-  if 'hypervisor' in what:
-    result['hypervisor'] = my_dict = {}
-    for hv_name in what['hypervisor']:
-      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+  if constants.NV_HYPERVISOR in what:
+    result[constants.NV_HYPERVISOR] = tmp = {}
+    for hv_name in what[constants.NV_HYPERVISOR]:
+      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
 
-  if 'filelist' in what:
-    result['filelist'] = utils.FingerprintFiles(what['filelist'])
+  if constants.NV_FILELIST in what:
+    result[constants.NV_FILELIST] = utils.FingerprintFiles(
+      what[constants.NV_FILELIST])
 
 
-  if 'nodelist' in what:
-    result['nodelist'] = {}
-    random.shuffle(what['nodelist'])
-    for node in what['nodelist']:
+  if constants.NV_NODELIST in what:
+    result[constants.NV_NODELIST] = tmp = {}
+    random.shuffle(what[constants.NV_NODELIST])
+    for node in what[constants.NV_NODELIST]:
       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
-        result['nodelist'][node] = message
-  if 'node-net-test' in what:
-    result['node-net-test'] = {}
+        tmp[node] = message
+
+  if constants.NV_NODENETTEST in what:
+    result[constants.NV_NODENETTEST] = tmp = {}
     my_name = utils.HostInfo().name
     my_pip = my_sip = None
     my_name = utils.HostInfo().name
     my_pip = my_sip = None
-    for name, pip, sip in what['node-net-test']:
+    for name, pip, sip in what[constants.NV_NODENETTEST]:
       if name == my_name:
         my_pip = pip
         my_sip = sip
         break
     if not my_pip:
       if name == my_name:
         my_pip = pip
         my_sip = sip
         break
     if not my_pip:
-      result['node-net-test'][my_name] = ("Can't find my own"
-                                          " primary/secondary IP"
-                                          " in the node list")
+      tmp[my_name] = ("Can't find my own primary/secondary IP"
+                      " in the node list")
     else:
       port = utils.GetNodeDaemonPort()
     else:
       port = utils.GetNodeDaemonPort()
-      for name, pip, sip in what['node-net-test']:
+      for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
@@ -388,9 +412,34 @@ def VerifyNode(what, cluster_name):
           if not utils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
           if not utils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
-          result['node-net-test'][name] = ("failure using the %s"
-                                           " interface(s)" %
-                                           " and ".join(fail))
+          tmp[name] = ("failure using the %s interface(s)" %
+                       " and ".join(fail))
+
+  if constants.NV_LVLIST in what:
+    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+
+  if constants.NV_INSTANCELIST in what:
+    result[constants.NV_INSTANCELIST] = GetInstanceList(
+      what[constants.NV_INSTANCELIST])
+
+  if constants.NV_VGLIST in what:
+    result[constants.NV_VGLIST] = ListVolumeGroups()
+
+  if constants.NV_VERSION in what:
+    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
+                                    constants.RELEASE_VERSION)
+
+  if constants.NV_HVINFO in what:
+    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
+    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
+
+  if constants.NV_DRBDLIST in what:
+    try:
+      used_minors = bdev.DRBD8.GetUsedDevs().keys()
+    except errors.BlockDeviceError:
+      logging.warning("Can't get used minors list", exc_info=True)
+      used_minors = []
+    result[constants.NV_DRBDLIST] = used_minors
 
   return result
 
 
   return result
 
@@ -557,6 +606,30 @@ def GetInstanceInfo(instance, hname):
   return output
 
 
   return output
 
 
+def GetInstanceMigratable(instance):
+  """Gives whether an instance can be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: object representing the instance to be checked.
+
+  @rtype: tuple
+  @return: tuple of (result, description) where:
+      - result: whether the instance can be migrated or not
+      - description: a description of the issue, if relevant
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  if instance.name not in hyper.ListInstances():
+    return (False, 'not running')
+
+  for idx in range(len(instance.disks)):
+    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
+    if not os.path.islink(link_name):
+      return (False, 'not restarted since ganeti 1.2.5')
+
+  return (True, '')
+
+
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
@@ -617,9 +690,12 @@ def AddOSToInstance(instance):
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
-    return False
+    lines = [val.encode("string_escape")
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS create script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
 
-  return True
+  return (True, "Successfully installed")
 
 
 def RunRenameInstance(instance, old_name):
 
 
 def RunRenameInstance(instance, old_name):
@@ -648,9 +724,12 @@ def RunRenameInstance(instance, old_name):
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
                   result.cmd, result.fail_reason, result.output)
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
                   result.cmd, result.fail_reason, result.output)
-    return False
+    lines = [val.encode("string_escape")
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS rename script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
 
-  return True
+  return (True, "Rename successful")
 
 
 def _GetVGInfo(vg_name):
 
 
 def _GetVGInfo(vg_name):
@@ -693,7 +772,53 @@ def _GetVGInfo(vg_name):
   return retdic
 
 
   return retdic
 
 
-def _GatherBlockDevs(instance):
+def _GetBlockDevSymlinkPath(instance_name, idx):
+  return os.path.join(constants.DISK_LINKS_DIR,
+                      "%s:%d" % (instance_name, idx))
+
+
+def _SymlinkBlockDev(instance_name, device_path, idx):
+  """Set up symlinks to a instance's block device.
+
+  This is an auxiliary function run when an instance is start (on the primary
+  node) or when an instance is migrated (on the target node).
+
+
+  @param instance_name: the name of the target instance
+  @param device_path: path of the physical block device, on the node
+  @param idx: the disk index
+  @return: absolute path to the disk's symlink
+
+  """
+  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+  try:
+    os.symlink(device_path, link_name)
+  except OSError, err:
+    if err.errno == errno.EEXIST:
+      if (not os.path.islink(link_name) or
+          os.readlink(link_name) != device_path):
+        os.remove(link_name)
+        os.symlink(device_path, link_name)
+    else:
+      raise
+
+  return link_name
+
+
+def _RemoveBlockDevLinks(instance_name, disks):
+  """Remove the block device symlinks belonging to the given instance.
+
+  """
+  for idx, disk in enumerate(disks):
+    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+    if os.path.islink(link_name):
+      try:
+        os.remove(link_name)
+      except OSError:
+        logging.exception("Can't remove symlink '%s'", link_name)
+
+
+def _GatherAndLinkBlockDevs(instance):
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
@@ -701,18 +826,25 @@ def _GatherBlockDevs(instance):
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
-  @rtype: list of L{bdev.BlockDev}
-  @return: list of the block devices
+  @rtype: list
+  @return: list of (disk_object, device_path)
 
   """
   block_devices = []
 
   """
   block_devices = []
-  for disk in instance.disks:
+  for idx, disk in enumerate(instance.disks):
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
-    block_devices.append((disk, device))
+    try:
+      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
+    except OSError, e:
+      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
+                                    e.strerror)
+
+    block_devices.append((disk, link_name))
+
   return block_devices
 
 
   return block_devices
 
 
@@ -728,18 +860,21 @@ def StartInstance(instance, extra_args):
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
-    return True
-
-  block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+    return (True, "Already running")
 
   try:
 
   try:
+    block_devices = _GatherAndLinkBlockDevs(instance)
+    hyper = hypervisor.GetHypervisor(instance.hypervisor)
     hyper.StartInstance(instance, block_devices, extra_args)
     hyper.StartInstance(instance, block_devices, extra_args)
+  except errors.BlockDeviceError, err:
+    logging.exception("Failed to start instance")
+    return (False, "Block device error: %s" % str(err))
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
-    return False
+    _RemoveBlockDevLinks(instance.name, instance.disks)
+    return (False, "Hypervisor error: %s" % str(err))
 
 
-  return True
+  return (True, "Instance started successfully")
 
 
 def ShutdownInstance(instance):
 
 
 def ShutdownInstance(instance):
@@ -763,11 +898,10 @@ def ShutdownInstance(instance):
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
-    logging.error("Failed to stop instance")
+    logging.error("Failed to stop instance: %s" % err)
     return False
 
   # test every 10secs for 2min
     return False
 
   # test every 10secs for 2min
-  shutdown_ok = False
 
   time.sleep(1)
   for dummy in range(11):
 
   time.sleep(1)
   for dummy in range(11):
@@ -776,20 +910,23 @@ def ShutdownInstance(instance):
     time.sleep(10)
   else:
     # the shutdown did not succeed
     time.sleep(10)
   else:
     # the shutdown did not succeed
-    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
+    logging.error("Shutdown of '%s' unsuccessful, using destroy",
+                  instance.name)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      logging.exception("Failed to stop instance")
+      logging.exception("Failed to stop instance: %s" % err)
       return False
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
       return False
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
-      logging.error("could not shutdown instance '%s' even by destroy",
+      logging.error("Could not shutdown instance '%s' even by destroy",
                     instance.name)
       return False
 
                     instance.name)
       return False
 
+  _RemoveBlockDevLinks(instance.name, instance.disks)
+
   return True
 
 
   return True
 
 
@@ -838,6 +975,65 @@ def RebootInstance(instance, reboot_type, extra_args):
   return True
 
 
   return True
 
 
+def MigrationInfo(instance):
+  """Gather information about an instance to be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    info = hyper.MigrationInfo(instance)
+  except errors.HypervisorError, err:
+    msg = "Failed to fetch migration information"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, info)
+
+
+def AcceptInstance(instance, info, target):
+  """Prepare the node to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type target: string
+  @param target: target host (usually ip), on this node
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.AcceptInstance(instance, info, target)
+  except errors.HypervisorError, err:
+    msg = "Failed to accept instance"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Accept successfull")
+
+
+def FinalizeMigration(instance, info, success):
+  """Finalize any preparation to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type success: boolean
+  @param success: whether the migration was a success or a failure
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.FinalizeMigration(instance, info, success)
+  except errors.HypervisorError, err:
+    msg = "Failed to finalize migration"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Migration Finalized")
+
+
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
@@ -854,14 +1050,14 @@ def MigrateInstance(instance, target, live):
       - msg is a string with details in case of failure
 
   """
       - msg is a string with details in case of failure
 
   """
-  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
-    msg = "Failed to migrate instance: %s" % str(err)
-    logging.error(msg)
-    return (False, msg)
+    msg = "Failed to migrate instance"
+    logging.exception(msg)
+    return (False, "%s: %s" % (msg, err))
   return (True, "Migration successfull")
 
 
   return (True, "Migration successfull")
 
 
@@ -895,25 +1091,17 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
         # be assembled
         crdev.Open()
       clist.append(crdev)
         # be assembled
         crdev.Open()
       clist.append(crdev)
+
   try:
   try:
-    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
-    if device is not None:
-      logging.info("removing existing device %s", disk)
-      device.Remove()
-  except errors.BlockDeviceError, err:
-    pass
+    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
+  except errors.GenericError, err:
+    return False, "Can't create block device: %s" % str(err)
 
 
-  device = bdev.Create(disk.dev_type, disk.physical_id,
-                       clist, size)
-  if device is None:
-    raise ValueError("Can't create child device for %s, %s" %
-                     (disk, size))
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
-      errorstring = "Can't assemble device after creation"
+      errorstring = "Can't assemble device after creation, very unusual event"
       logging.error(errorstring)
       logging.error(errorstring)
-      raise errors.BlockDeviceError("%s, very unusual event - check the node"
-                                    " daemon logs" % errorstring)
+      return False, errorstring
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
       device.Open(force=True)
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
       device.Open(force=True)
@@ -923,7 +1111,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   device.SetInfo(info)
 
   physical_id = device.unique_id
   device.SetInfo(info)
 
   physical_id = device.unique_id
-  return physical_id
+  return True, physical_id
 
 
 def RemoveBlockDevice(disk):
 
 
 def RemoveBlockDevice(disk):
@@ -931,16 +1119,14 @@ def RemoveBlockDevice(disk):
 
   @note: This is intended to be called recursively.
 
 
   @note: This is intended to be called recursively.
 
-  @type disk: L{objects.disk}
+  @type disk: L{objects.Disk}
   @param disk: the disk object we should remove
   @rtype: boolean
   @return: the success of the operation
 
   """
   try:
   @param disk: the disk object we should remove
   @rtype: boolean
   @return: the success of the operation
 
   """
   try:
-    # since we are removing the device, allow a partial match
-    # this allows removal of broken mirrors
-    rdev = _RecursiveFindBD(disk, allow_partial=True)
+    rdev = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     # probably can't attach
     logging.info("Can't attach to device %s in remove", disk)
   except errors.BlockDeviceError, err:
     # probably can't attach
     logging.info("Can't attach to device %s in remove", disk)
@@ -998,7 +1184,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
-    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
+    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
@@ -1030,8 +1216,9 @@ def AssembleBlockDevice(disk, owner, as_primary):
 def ShutdownBlockDevice(disk):
   """Shut down a block device.
 
 def ShutdownBlockDevice(disk):
   """Shut down a block device.
 
-  First, if the device is assembled (can L{Attach()}), then the device
-  is shutdown. Then the children of the device are shutdown.
+  First, if the device is assembled (Attach() is successfull), then
+  the device is shutdown. Then the children of the device are
+  shutdown.
 
   This function is called recursively. Note that we don't cache the
   children or such, as oppossed to assemble, shutdown of different
 
   This function is called recursively. Note that we don't cache the
   children or such, as oppossed to assemble, shutdown of different
@@ -1069,7 +1256,7 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   @return: the success of the operation
 
   """
   @return: the success of the operation
 
   """
-  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
+  parent_bdev = _RecursiveFindBD(parent_cdev)
   if parent_bdev is None:
     logging.error("Can't find parent device")
     return False
   if parent_bdev is None:
     logging.error("Can't find parent device")
     return False
@@ -1122,7 +1309,7 @@ def GetMirrorStatus(disks):
   @rtype: disk
   @return:
       a list of (mirror_done, estimated_time) tuples, which
   @rtype: disk
   @return:
       a list of (mirror_done, estimated_time) tuples, which
-      are the result of L{bdev.BlockDevice.CombinedSyncStatus}
+      are the result of L{bdev.BlockDev.CombinedSyncStatus}
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
@@ -1136,17 +1323,13 @@ def GetMirrorStatus(disks):
   return stats
 
 
   return stats
 
 
-def _RecursiveFindBD(disk, allow_partial=False):
+def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
   If so, return informations about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
   """Check if a device is activated.
 
   If so, return informations about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
-  @type allow_partial: boolean
-  @param allow_partial: if true, don't abort the find if a
-      child of the device can't be found; this is intended
-      to be used when repairing mirrors
 
   @return: None if the device can't be found,
       otherwise the device instance
 
   @return: None if the device can't be found,
       otherwise the device instance
@@ -1221,13 +1404,20 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
                  " upload targets: '%s'", file_name)
     return False
 
                  " upload targets: '%s'", file_name)
     return False
 
-  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
+  raw_data = _Decompress(data)
+
+  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
                   atime=atime, mtime=mtime)
   return True
 
 
 def WriteSsconfFiles(values):
                   atime=atime, mtime=mtime)
   return True
 
 
 def WriteSsconfFiles(values):
-  ssconf.WriteSsconfFiles(values)
+  """Update all ssconf files.
+
+  Wrapper around the SimpleStore.WriteFiles.
+
+  """
+  ssconf.SimpleStore().WriteFiles(values)
 
 
 def _ErrnoOrStr(err):
 
 
 def _ErrnoOrStr(err):
@@ -1596,15 +1786,16 @@ def FinalizeExport(instance, snap_disks):
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
-  nic_count = 0
+  nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
   for nic_count, nic in enumerate(instance.nics):
+    nic_total += 1
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
-  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
+  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
@@ -1927,7 +2118,7 @@ def JobQueueUpdate(file_name, content):
     return False
 
   # Write and replace the file atomically
     return False
 
   # Write and replace the file atomically
-  utils.WriteFile(file_name, data=content)
+  utils.WriteFile(file_name, data=_Decompress(content))
 
   return True
 
 
   return True
 
@@ -1935,7 +2126,7 @@ def JobQueueUpdate(file_name, content):
 def JobQueueRename(old, new):
   """Renames a job queue file.
 
 def JobQueueRename(old, new):
   """Renames a job queue file.
 
-  This is just a wrapper over L{os.rename} with proper checking.
+  This is just a wrapper over os.rename with proper checking.
 
   @type old: str
   @param old: the old (actual) file name
 
   @type old: str
   @param old: the old (actual) file name
@@ -1948,7 +2139,7 @@ def JobQueueRename(old, new):
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
-  os.rename(old, new)
+  utils.RenameFile(old, new, mkdir=True)
 
   return True
 
 
   return True
 
@@ -1973,12 +2164,14 @@ def JobQueueSetDrainFlag(drain_flag):
   return True
 
 
   return True
 
 
-def CloseBlockDevices(disks):
+def CloseBlockDevices(instance_name, disks):
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
+  @param instance_name: if the argument is not empty, the symlinks
+      of this instance will be removed
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
@@ -2004,6 +2197,8 @@ def CloseBlockDevices(disks):
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
+    if instance_name:
+      _RemoveBlockDevLinks(instance_name, disks)
     return (True, "All devices secondary")
 
 
     return (True, "All devices secondary")
 
 
@@ -2029,6 +2224,145 @@ def ValidateHVParams(hvname, hvparams):
     return (False, str(err))
 
 
     return (False, str(err))
 
 
+def DemoteFromMC():
+  """Demotes the current node from master candidate role.
+
+  """
+  # try to ensure we're not the master by mistake
+  master, myself = ssconf.GetMasterAndMyself()
+  if master == myself:
+    return (False, "ssconf status shows I'm the master node, will not demote")
+  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
+    return (False, "The master daemon is running, will not demote")
+  try:
+    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      return (False, "Error while backing up cluster file: %s" % str(err))
+  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
+  return (True, "Done")
+
+
+def _FindDisks(nodes_ip, disks):
+  """Sets the physical ID on disks and returns the block devices.
+
+  """
+  # set the correct physical ID
+  my_name = utils.HostInfo().name
+  for cf in disks:
+    cf.SetPhysicalID(my_name, nodes_ip)
+
+  bdevs = []
+
+  for cf in disks:
+    rd = _RecursiveFindBD(cf)
+    if rd is None:
+      return (False, "Can't find device %s" % cf)
+    bdevs.append(rd)
+  return (True, bdevs)
+
+
+def DrbdDisconnectNet(nodes_ip, disks):
+  """Disconnects the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  # disconnect disks
+  for rd in bdevs:
+    try:
+      rd.DisconnectNet()
+    except errors.BlockDeviceError, err:
+      logging.exception("Failed to go into standalone mode")
+      return (False, "Can't change network configuration: %s" % str(err))
+  return (True, "All disks are now disconnected")
+
+
+def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
+  """Attaches the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  if multimaster:
+    for idx, rd in enumerate(bdevs):
+      try:
+        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
+      except EnvironmentError, err:
+        return (False, "Can't create symlink: %s" % str(err))
+  # reconnect disks, switch to new master configuration and if
+  # needed primary mode
+  for rd in bdevs:
+    try:
+      rd.AttachNet(multimaster)
+    except errors.BlockDeviceError, err:
+      return (False, "Can't change network configuration: %s" % str(err))
+  # wait until the disks are connected; we need to retry the re-attach
+  # if the device becomes standalone, as this might happen if the one
+  # node disconnects and reconnects in a different mode before the
+  # other node reconnects; in this case, one or both of the nodes will
+  # decide it has wrong configuration and switch to standalone
+  RECONNECT_TIMEOUT = 2 * 60
+  sleep_time = 0.100 # start with 100 miliseconds
+  timeout_limit = time.time() + RECONNECT_TIMEOUT
+  while time.time() < timeout_limit:
+    all_connected = True
+    for rd in bdevs:
+      stats = rd.GetProcStatus()
+      if not (stats.is_connected or stats.is_in_resync):
+        all_connected = False
+      if stats.is_standalone:
+        # peer had different config info and this node became
+        # standalone, even though this should not happen with the
+        # new staged way of changing disk configs
+        try:
+          rd.ReAttachNet(multimaster)
+        except errors.BlockDeviceError, err:
+          return (False, "Can't change network configuration: %s" % str(err))
+    if all_connected:
+      break
+    time.sleep(sleep_time)
+    sleep_time = min(5, sleep_time * 1.5)
+  if not all_connected:
+    return (False, "Timeout in disk reconnecting")
+  if multimaster:
+    # change to primary mode
+    for rd in bdevs:
+      rd.Open()
+  if multimaster:
+    msg = "multi-master and primary"
+  else:
+    msg = "single-master"
+  return (True, "Disks are now configured as %s" % msg)
+
+
+def DrbdWaitSync(nodes_ip, disks):
+  """Wait until DRBDs have synchronized.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  min_resync = 100
+  alldone = True
+  failure = False
+  for rd in bdevs:
+    stats = rd.GetProcStatus()
+    if not (stats.is_connected or stats.is_in_resync):
+      failure = True
+      break
+    alldone = alldone and (not stats.is_in_resync)
+    if stats.sync_percent is not None:
+      min_resync = min(min_resync, stats.sync_percent)
+  return (not failure, (alldone, min_resync))
+
+
 class HooksRunner(object):
   """Hook runner.
 
 class HooksRunner(object):
   """Hook runner.
 
@@ -2239,7 +2573,7 @@ class DevCacheManager(object):
         node nor not
     @type iv_name: str
     @param iv_name: the instance-visible name of the
         node nor not
     @type iv_name: str
     @param iv_name: the instance-visible name of the
-        device, as in L{objects.Disk.iv_name}
+        device, as in objects.Disk.iv_name
 
     @rtype: None
 
 
     @rtype: None