Create symlinks to intances' block devices
[ganeti-local] / lib / backend.py
index 5d79743..2a483db 100644 (file)
@@ -33,6 +33,8 @@ import subprocess
 import random
 import logging
 import tempfile
+import zlib
+import base64
 
 from ganeti import errors
 from ganeti import utils
@@ -45,13 +47,13 @@ from ganeti import ssconf
 
 
 def _GetConfig():
-  """Simple wrapper to return a ConfigReader.
+  """Simple wrapper to return a SimpleStore.
 
-  @rtype: L{ssconf.SimpleConfigReader}
-  @return: a SimpleConfigReader instance
+  @rtype: L{ssconf.SimpleStore}
+  @return: a SimpleStore instance
 
   """
-  return ssconf.SimpleConfigReader()
+  return ssconf.SimpleStore()
 
 
 def _GetSshRunner(cluster_name):
@@ -67,7 +69,27 @@ def _GetSshRunner(cluster_name):
   return ssh.SshRunner(cluster_name)
 
 
-def _CleanDirectory(path, exclude=[]):
+def _Decompress(data):
+  """Unpacks data compressed by the RPC client.
+
+  @type data: list or tuple
+  @param data: Data sent by RPC client
+  @rtype: str
+  @return: Decompressed data
+
+  """
+  assert isinstance(data, (list, tuple))
+  assert len(data) == 2
+  (encoding, content) = data
+  if encoding == constants.RPC_ENCODING_NONE:
+    return content
+  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
+    return zlib.decompress(base64.b64decode(content))
+  else:
+    raise AssertionError("Unknown data encoding")
+
+
+def _CleanDirectory(path, exclude=None):
   """Removes all regular files in a directory.
 
   @type path: str
@@ -75,14 +97,15 @@ def _CleanDirectory(path, exclude=[]):
   @type exclude: list
   @param exclude: list of files to be excluded, defaults
       to the empty list
-  @rtype: None
 
   """
   if not os.path.isdir(path):
     return
-
-  # Normalize excluded paths
-  exclude = [os.path.normpath(i) for i in exclude]
+  if exclude is None:
+    exclude = []
+  else:
+    # Normalize excluded paths
+    exclude = [os.path.normpath(i) for i in exclude]
 
   for rel_name in utils.ListVisibleFiles(path):
     full_name = os.path.normpath(os.path.join(path, rel_name))
@@ -257,7 +280,7 @@ def LeaveCluster():
   from the cluster.
 
   If processing is successful, then it raises an
-  L{errors.GanetiQuitException} which is used as a special case to
+  L{errors.QuitGanetiException} which is used as a special case to
   shutdown the node daemon.
 
   """
@@ -350,37 +373,38 @@ def VerifyNode(what, cluster_name):
   """
   result = {}
 
-  if 'hypervisor' in what:
-    result['hypervisor'] = my_dict = {}
-    for hv_name in what['hypervisor']:
-      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
+  if constants.NV_HYPERVISOR in what:
+    result[constants.NV_HYPERVISOR] = tmp = {}
+    for hv_name in what[constants.NV_HYPERVISOR]:
+      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
-  if 'filelist' in what:
-    result['filelist'] = utils.FingerprintFiles(what['filelist'])
+  if constants.NV_FILELIST in what:
+    result[constants.NV_FILELIST] = utils.FingerprintFiles(
+      what[constants.NV_FILELIST])
 
-  if 'nodelist' in what:
-    result['nodelist'] = {}
-    random.shuffle(what['nodelist'])
-    for node in what['nodelist']:
+  if constants.NV_NODELIST in what:
+    result[constants.NV_NODELIST] = tmp = {}
+    random.shuffle(what[constants.NV_NODELIST])
+    for node in what[constants.NV_NODELIST]:
       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
-        result['nodelist'][node] = message
-  if 'node-net-test' in what:
-    result['node-net-test'] = {}
+        tmp[node] = message
+
+  if constants.NV_NODENETTEST in what:
+    result[constants.NV_NODENETTEST] = tmp = {}
     my_name = utils.HostInfo().name
     my_pip = my_sip = None
-    for name, pip, sip in what['node-net-test']:
+    for name, pip, sip in what[constants.NV_NODENETTEST]:
       if name == my_name:
         my_pip = pip
         my_sip = sip
         break
     if not my_pip:
-      result['node-net-test'][my_name] = ("Can't find my own"
-                                          " primary/secondary IP"
-                                          " in the node list")
+      tmp[my_name] = ("Can't find my own primary/secondary IP"
+                      " in the node list")
     else:
       port = utils.GetNodeDaemonPort()
-      for name, pip, sip in what['node-net-test']:
+      for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
@@ -388,9 +412,25 @@ def VerifyNode(what, cluster_name):
           if not utils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
-          result['node-net-test'][name] = ("failure using the %s"
-                                           " interface(s)" %
-                                           " and ".join(fail))
+          tmp[name] = ("failure using the %s interface(s)" %
+                       " and ".join(fail))
+
+  if constants.NV_LVLIST in what:
+    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
+
+  if constants.NV_INSTANCELIST in what:
+    result[constants.NV_INSTANCELIST] = GetInstanceList(
+      what[constants.NV_INSTANCELIST])
+
+  if constants.NV_VGLIST in what:
+    result[constants.NV_VGLIST] = ListVolumeGroups()
+
+  if constants.NV_VERSION in what:
+    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
+
+  if constants.NV_HVINFO in what:
+    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
+    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
 
   return result
 
@@ -693,7 +733,38 @@ def _GetVGInfo(vg_name):
   return retdic
 
 
-def _GatherBlockDevs(instance):
+def _SymlinkBlockDev(instance_name, device_path, device_name):
+  """Set up symlinks to a instance's block device.
+
+  This is an auxiliary function run when an instance is start (on the primary
+  node) or when an instance is migrated (on the target node).
+
+  Args:
+    instance_name: the name of the target instance
+    device_path: path of the physical block device, on the node
+    device_name: 'virtual' name of the device
+
+  Returns:
+    absolute path to the disk's symlink
+
+  """
+  link_basename = "%s-%s" % (instance_name, device_name)
+  link_name = os.path.join(constants.DISK_LINKS_DIR, link_basename)
+  try:
+    os.symlink(device_path, link_name)
+  except OSError, e:
+    if e.errno == errno.EEXIST:
+      if (not os.path.islink(link_name) or
+          os.readlink(link_name) != device_path):
+        os.remove(link_name)
+        os.symlink(device_path, link_name)
+    else:
+      raise
+
+  return link_name
+
+
+def _GatherAndLinkBlockDevs(instance):
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
@@ -701,18 +772,26 @@ def _GatherBlockDevs(instance):
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
-  @rtype: list of L{bdev.BlockDev}
-  @return: list of the block devices
+  @rtype: list
+  @return: list of (disk_object, device_path)
 
   """
   block_devices = []
-  for disk in instance.disks:
+  for idx, disk in enumerate(instance.disks):
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
-    block_devices.append((disk, device))
+    try:
+      link_name = _SymlinkBlockDev(instance.name, device.dev_path,
+                                   "disk%d" % idx)
+    except OSError, e:
+      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
+                                    e.strerror)
+
+    block_devices.append((disk, link_name))
+
   return block_devices
 
 
@@ -730,7 +809,7 @@ def StartInstance(instance, extra_args):
   if instance.name in running_instances:
     return True
 
-  block_devices = _GatherBlockDevs(instance)
+  block_devices = _GatherAndLinkBlockDevs(instance)
   hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
@@ -767,7 +846,6 @@ def ShutdownInstance(instance):
     return False
 
   # test every 10secs for 2min
-  shutdown_ok = False
 
   time.sleep(1)
   for dummy in range(11):
@@ -931,16 +1009,14 @@ def RemoveBlockDevice(disk):
 
   @note: This is intended to be called recursively.
 
-  @type disk: L{objects.disk}
+  @type disk: L{objects.Disk}
   @param disk: the disk object we should remove
   @rtype: boolean
   @return: the success of the operation
 
   """
   try:
-    # since we are removing the device, allow a partial match
-    # this allows removal of broken mirrors
-    rdev = _RecursiveFindBD(disk, allow_partial=True)
+    rdev = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     # probably can't attach
     logging.info("Can't attach to device %s in remove", disk)
@@ -1030,8 +1106,9 @@ def AssembleBlockDevice(disk, owner, as_primary):
 def ShutdownBlockDevice(disk):
   """Shut down a block device.
 
-  First, if the device is assembled (can L{Attach()}), then the device
-  is shutdown. Then the children of the device are shutdown.
+  First, if the device is assembled (Attach() is successfull), then
+  the device is shutdown. Then the children of the device are
+  shutdown.
 
   This function is called recursively. Note that we don't cache the
   children or such, as oppossed to assemble, shutdown of different
@@ -1069,7 +1146,7 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   @return: the success of the operation
 
   """
-  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
+  parent_bdev = _RecursiveFindBD(parent_cdev)
   if parent_bdev is None:
     logging.error("Can't find parent device")
     return False
@@ -1122,7 +1199,7 @@ def GetMirrorStatus(disks):
   @rtype: disk
   @return:
       a list of (mirror_done, estimated_time) tuples, which
-      are the result of L{bdev.BlockDevice.CombinedSyncStatus}
+      are the result of L{bdev.BlockDev.CombinedSyncStatus}
   @raise errors.BlockDeviceError: if any of the disks cannot be
       found
 
@@ -1136,17 +1213,13 @@ def GetMirrorStatus(disks):
   return stats
 
 
-def _RecursiveFindBD(disk, allow_partial=False):
+def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
   If so, return informations about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
-  @type allow_partial: boolean
-  @param allow_partial: if true, don't abort the find if a
-      child of the device can't be found; this is intended
-      to be used when repairing mirrors
 
   @return: None if the device can't be found,
       otherwise the device instance
@@ -1221,13 +1294,20 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
                  " upload targets: '%s'", file_name)
     return False
 
-  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
+  raw_data = _Decompress(data)
+
+  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
                   atime=atime, mtime=mtime)
   return True
 
 
 def WriteSsconfFiles(values):
-  ssconf.WriteSsconfFiles(values)
+  """Update all ssconf files.
+
+  Wrapper around the SimpleStore.WriteFiles.
+
+  """
+  ssconf.SimpleStore().WriteFiles(values)
 
 
 def _ErrnoOrStr(err):
@@ -1596,15 +1676,16 @@ def FinalizeExport(instance, snap_disks):
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
-  nic_count = 0
+  nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
+    nic_total += 1
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
-  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
+  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
@@ -1927,7 +2008,7 @@ def JobQueueUpdate(file_name, content):
     return False
 
   # Write and replace the file atomically
-  utils.WriteFile(file_name, data=content)
+  utils.WriteFile(file_name, data=_Decompress(content))
 
   return True
 
@@ -1935,7 +2016,7 @@ def JobQueueUpdate(file_name, content):
 def JobQueueRename(old, new):
   """Renames a job queue file.
 
-  This is just a wrapper over L{os.rename} with proper checking.
+  This is just a wrapper over os.rename with proper checking.
 
   @type old: str
   @param old: the old (actual) file name
@@ -1948,7 +2029,7 @@ def JobQueueRename(old, new):
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
-  os.rename(old, new)
+  utils.RenameFile(old, new, mkdir=True)
 
   return True
 
@@ -2029,6 +2110,26 @@ def ValidateHVParams(hvname, hvparams):
     return (False, str(err))
 
 
+def DemoteFromMC():
+  """Demotes the current node from master candidate role.
+
+  """
+  # try to ensure we're not the master by mistake
+  master, myself = ssconf.GetMasterAndMyself()
+  if master == myself:
+    return (False, "ssconf status shows I'm the master node, will not demote")
+  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
+    return (False, "The master daemon is running, will not demote")
+  try:
+    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      return (False, "Error while backing up cluster file: %s" % str(err))
+  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
+  return (True, "Done")
+
+
 class HooksRunner(object):
   """Hook runner.
 
@@ -2239,7 +2340,7 @@ class DevCacheManager(object):
         node nor not
     @type iv_name: str
     @param iv_name: the instance-visible name of the
-        device, as in L{objects.Disk.iv_name}
+        device, as in objects.Disk.iv_name
 
     @rtype: None