Implement software release version checks too
[ganeti-local] / lib / backend.py
index 994d8e0..c956ecc 100644 (file)
@@ -426,12 +426,21 @@ def VerifyNode(what, cluster_name):
     result[constants.NV_VGLIST] = ListVolumeGroups()
 
   if constants.NV_VERSION in what:
     result[constants.NV_VGLIST] = ListVolumeGroups()
 
   if constants.NV_VERSION in what:
-    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
+    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
+                                    constants.RELEASE_VERSION)
 
   if constants.NV_HVINFO in what:
     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
 
 
   if constants.NV_HVINFO in what:
     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
 
+  if constants.NV_DRBDLIST in what:
+    try:
+      used_minors = bdev.DRBD8.GetUsedDevs().keys()
+    except errors.BlockDeviceError:
+      logging.warning("Can't get used minors list", exc_info=True)
+      used_minors = []
+    result[constants.NV_DRBDLIST] = used_minors
+
   return result
 
 
   return result
 
 
@@ -597,6 +606,30 @@ def GetInstanceInfo(instance, hname):
   return output
 
 
   return output
 
 
+def GetInstanceMigratable(instance):
+  """Gives whether an instance can be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: object representing the instance to be checked.
+
+  @rtype: tuple
+  @return: tuple of (result, description) where:
+      - result: whether the instance can be migrated or not
+      - description: a description of the issue, if relevant
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  if instance.name not in hyper.ListInstances():
+    return (False, 'not running')
+
+  for idx in range(len(instance.disks)):
+    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
+    if not os.path.islink(link_name):
+      return (False, 'not restarted since ganeti 1.2.5')
+
+  return (True, '')
+
+
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
 def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
@@ -657,9 +690,12 @@ def AddOSToInstance(instance):
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", result.cmd, result.fail_reason, logfile,
                   result.output)
-    return False
+    lines = [val.encode("string_escape")
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS create script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
 
-  return True
+  return (True, "Successfully installed")
 
 
 def RunRenameInstance(instance, old_name):
 
 
 def RunRenameInstance(instance, old_name):
@@ -688,9 +724,12 @@ def RunRenameInstance(instance, old_name):
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
                   result.cmd, result.fail_reason, result.output)
   if result.failed:
     logging.error("os create command '%s' returned error: %s output: %s",
                   result.cmd, result.fail_reason, result.output)
-    return False
+    lines = [val.encode("string_escape")
+             for val in utils.TailFile(logfile, lines=20)]
+    return (False, "OS rename script failed (%s), last lines in the"
+            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
 
 
-  return True
+  return (True, "Rename successful")
 
 
 def _GetVGInfo(vg_name):
 
 
 def _GetVGInfo(vg_name):
@@ -733,7 +772,53 @@ def _GetVGInfo(vg_name):
   return retdic
 
 
   return retdic
 
 
-def _GatherBlockDevs(instance):
+def _GetBlockDevSymlinkPath(instance_name, idx):
+  return os.path.join(constants.DISK_LINKS_DIR,
+                      "%s:%d" % (instance_name, idx))
+
+
+def _SymlinkBlockDev(instance_name, device_path, idx):
+  """Set up symlinks to a instance's block device.
+
+  This is an auxiliary function run when an instance is start (on the primary
+  node) or when an instance is migrated (on the target node).
+
+
+  @param instance_name: the name of the target instance
+  @param device_path: path of the physical block device, on the node
+  @param idx: the disk index
+  @return: absolute path to the disk's symlink
+
+  """
+  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+  try:
+    os.symlink(device_path, link_name)
+  except OSError, err:
+    if err.errno == errno.EEXIST:
+      if (not os.path.islink(link_name) or
+          os.readlink(link_name) != device_path):
+        os.remove(link_name)
+        os.symlink(device_path, link_name)
+    else:
+      raise
+
+  return link_name
+
+
+def _RemoveBlockDevLinks(instance_name, disks):
+  """Remove the block device symlinks belonging to the given instance.
+
+  """
+  for idx, disk in enumerate(disks):
+    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
+    if os.path.islink(link_name):
+      try:
+        os.remove(link_name)
+      except OSError:
+        logging.exception("Can't remove symlink '%s'", link_name)
+
+
+def _GatherAndLinkBlockDevs(instance):
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
   """Set up an instance's block device(s).
 
   This is run on the primary node at instance startup. The block
@@ -741,18 +826,25 @@ def _GatherBlockDevs(instance):
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
 
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we shoul assemble
-  @rtype: list of L{bdev.BlockDev}
-  @return: list of the block devices
+  @rtype: list
+  @return: list of (disk_object, device_path)
 
   """
   block_devices = []
 
   """
   block_devices = []
-  for disk in instance.disks:
+  for idx, disk in enumerate(instance.disks):
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
     device = _RecursiveFindBD(disk)
     if device is None:
       raise errors.BlockDeviceError("Block device '%s' is not set up." %
                                     str(disk))
     device.Open()
-    block_devices.append((disk, device))
+    try:
+      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
+    except OSError, e:
+      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
+                                    e.strerror)
+
+    block_devices.append((disk, link_name))
+
   return block_devices
 
 
   return block_devices
 
 
@@ -768,18 +860,21 @@ def StartInstance(instance, extra_args):
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
   running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
-    return True
-
-  block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+    return (True, "Already running")
 
   try:
 
   try:
+    block_devices = _GatherAndLinkBlockDevs(instance)
+    hyper = hypervisor.GetHypervisor(instance.hypervisor)
     hyper.StartInstance(instance, block_devices, extra_args)
     hyper.StartInstance(instance, block_devices, extra_args)
+  except errors.BlockDeviceError, err:
+    logging.exception("Failed to start instance")
+    return (False, "Block device error: %s" % str(err))
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
   except errors.HypervisorError, err:
     logging.exception("Failed to start instance")
-    return False
+    _RemoveBlockDevLinks(instance.name, instance.disks)
+    return (False, "Hypervisor error: %s" % str(err))
 
 
-  return True
+  return (True, "Instance started successfully")
 
 
 def ShutdownInstance(instance):
 
 
 def ShutdownInstance(instance):
@@ -803,7 +898,7 @@ def ShutdownInstance(instance):
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
-    logging.error("Failed to stop instance")
+    logging.error("Failed to stop instance: %s" % err)
     return False
 
   # test every 10secs for 2min
     return False
 
   # test every 10secs for 2min
@@ -815,20 +910,23 @@ def ShutdownInstance(instance):
     time.sleep(10)
   else:
     # the shutdown did not succeed
     time.sleep(10)
   else:
     # the shutdown did not succeed
-    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
+    logging.error("Shutdown of '%s' unsuccessful, using destroy",
+                  instance.name)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      logging.exception("Failed to stop instance")
+      logging.exception("Failed to stop instance: %s" % err)
       return False
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
       return False
 
     time.sleep(1)
     if instance.name in GetInstanceList([hv_name]):
-      logging.error("could not shutdown instance '%s' even by destroy",
+      logging.error("Could not shutdown instance '%s' even by destroy",
                     instance.name)
       return False
 
                     instance.name)
       return False
 
+  _RemoveBlockDevLinks(instance.name, instance.disks)
+
   return True
 
 
   return True
 
 
@@ -877,6 +975,65 @@ def RebootInstance(instance, reboot_type, extra_args):
   return True
 
 
   return True
 
 
+def MigrationInfo(instance):
+  """Gather information about an instance to be migrated.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    info = hyper.MigrationInfo(instance)
+  except errors.HypervisorError, err:
+    msg = "Failed to fetch migration information"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, info)
+
+
+def AcceptInstance(instance, info, target):
+  """Prepare the node to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type target: string
+  @param target: target host (usually ip), on this node
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.AcceptInstance(instance, info, target)
+  except errors.HypervisorError, err:
+    msg = "Failed to accept instance"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Accept successfull")
+
+
+def FinalizeMigration(instance, info, success):
+  """Finalize any preparation to accept an instance.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance definition
+  @type info: string/data (opaque)
+  @param info: migration information, from the source node
+  @type success: boolean
+  @param success: whether the migration was a success or a failure
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  try:
+    hyper.FinalizeMigration(instance, info, success)
+  except errors.HypervisorError, err:
+    msg = "Failed to finalize migration"
+    logging.exception(msg)
+    return (False, '%s: %s' % (msg, err))
+  return (True, "Migration Finalized")
+
+
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
@@ -893,14 +1050,14 @@ def MigrateInstance(instance, target, live):
       - msg is a string with details in case of failure
 
   """
       - msg is a string with details in case of failure
 
   """
-  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
 
   try:
     hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
-    msg = "Failed to migrate instance: %s" % str(err)
-    logging.error(msg)
-    return (False, msg)
+    msg = "Failed to migrate instance"
+    logging.exception(msg)
+    return (False, "%s: %s" % (msg, err))
   return (True, "Migration successfull")
 
 
   return (True, "Migration successfull")
 
 
@@ -934,25 +1091,17 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
         # be assembled
         crdev.Open()
       clist.append(crdev)
         # be assembled
         crdev.Open()
       clist.append(crdev)
+
   try:
   try:
-    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
-    if device is not None:
-      logging.info("removing existing device %s", disk)
-      device.Remove()
-  except errors.BlockDeviceError, err:
-    pass
+    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
+  except errors.GenericError, err:
+    return False, "Can't create block device: %s" % str(err)
 
 
-  device = bdev.Create(disk.dev_type, disk.physical_id,
-                       clist, size)
-  if device is None:
-    raise ValueError("Can't create child device for %s, %s" %
-                     (disk, size))
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
-      errorstring = "Can't assemble device after creation"
+      errorstring = "Can't assemble device after creation, very unusual event"
       logging.error(errorstring)
       logging.error(errorstring)
-      raise errors.BlockDeviceError("%s, very unusual event - check the node"
-                                    " daemon logs" % errorstring)
+      return False, errorstring
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
       device.Open(force=True)
     device.SetSyncSpeed(constants.SYNC_SPEED)
     if on_primary or disk.OpenOnSecondary():
       device.Open(force=True)
@@ -962,7 +1111,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   device.SetInfo(info)
 
   physical_id = device.unique_id
   device.SetInfo(info)
 
   physical_id = device.unique_id
-  return physical_id
+  return True, physical_id
 
 
 def RemoveBlockDevice(disk):
 
 
 def RemoveBlockDevice(disk):
@@ -1035,7 +1184,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
-    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
+    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
     result = r_dev
     if as_primary or disk.OpenOnSecondary():
@@ -1637,15 +1786,16 @@ def FinalizeExport(instance, snap_disks):
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
              instance.beparams[constants.BE_VCPUS])
   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
 
-  nic_count = 0
+  nic_total = 0
   for nic_count, nic in enumerate(instance.nics):
   for nic_count, nic in enumerate(instance.nics):
+    nic_total += 1
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
                '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
-  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
+  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
 
   disk_total = 0
   for disk_count, disk in enumerate(snap_disks):
@@ -1989,7 +2139,7 @@ def JobQueueRename(old, new):
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
     return False
 
-  os.rename(old, new)
+  utils.RenameFile(old, new, mkdir=True)
 
   return True
 
 
   return True
 
@@ -2014,12 +2164,14 @@ def JobQueueSetDrainFlag(drain_flag):
   return True
 
 
   return True
 
 
-def CloseBlockDevices(disks):
+def CloseBlockDevices(instance_name, disks):
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
   """Closes the given block devices.
 
   This means they will be switched to secondary mode (in case of
   DRBD).
 
+  @param instance_name: if the argument is not empty, the symlinks
+      of this instance will be removed
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
   @type disks: list of L{objects.Disk}
   @param disks: the list of disks to be closed
   @rtype: tuple (success, message)
@@ -2045,6 +2197,8 @@ def CloseBlockDevices(disks):
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
   if msg:
     return (False, "Can't make devices secondary: %s" % ",".join(msg))
   else:
+    if instance_name:
+      _RemoveBlockDevLinks(instance_name, disks)
     return (True, "All devices secondary")
 
 
     return (True, "All devices secondary")
 
 
@@ -2090,6 +2244,125 @@ def DemoteFromMC():
   return (True, "Done")
 
 
   return (True, "Done")
 
 
+def _FindDisks(nodes_ip, disks):
+  """Sets the physical ID on disks and returns the block devices.
+
+  """
+  # set the correct physical ID
+  my_name = utils.HostInfo().name
+  for cf in disks:
+    cf.SetPhysicalID(my_name, nodes_ip)
+
+  bdevs = []
+
+  for cf in disks:
+    rd = _RecursiveFindBD(cf)
+    if rd is None:
+      return (False, "Can't find device %s" % cf)
+    bdevs.append(rd)
+  return (True, bdevs)
+
+
+def DrbdDisconnectNet(nodes_ip, disks):
+  """Disconnects the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  # disconnect disks
+  for rd in bdevs:
+    try:
+      rd.DisconnectNet()
+    except errors.BlockDeviceError, err:
+      logging.exception("Failed to go into standalone mode")
+      return (False, "Can't change network configuration: %s" % str(err))
+  return (True, "All disks are now disconnected")
+
+
+def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
+  """Attaches the network on a list of drbd devices.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  if multimaster:
+    for idx, rd in enumerate(bdevs):
+      try:
+        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
+      except EnvironmentError, err:
+        return (False, "Can't create symlink: %s" % str(err))
+  # reconnect disks, switch to new master configuration and if
+  # needed primary mode
+  for rd in bdevs:
+    try:
+      rd.AttachNet(multimaster)
+    except errors.BlockDeviceError, err:
+      return (False, "Can't change network configuration: %s" % str(err))
+  # wait until the disks are connected; we need to retry the re-attach
+  # if the device becomes standalone, as this might happen if the one
+  # node disconnects and reconnects in a different mode before the
+  # other node reconnects; in this case, one or both of the nodes will
+  # decide it has wrong configuration and switch to standalone
+  RECONNECT_TIMEOUT = 2 * 60
+  sleep_time = 0.100 # start with 100 miliseconds
+  timeout_limit = time.time() + RECONNECT_TIMEOUT
+  while time.time() < timeout_limit:
+    all_connected = True
+    for rd in bdevs:
+      stats = rd.GetProcStatus()
+      if not (stats.is_connected or stats.is_in_resync):
+        all_connected = False
+      if stats.is_standalone:
+        # peer had different config info and this node became
+        # standalone, even though this should not happen with the
+        # new staged way of changing disk configs
+        try:
+          rd.ReAttachNet(multimaster)
+        except errors.BlockDeviceError, err:
+          return (False, "Can't change network configuration: %s" % str(err))
+    if all_connected:
+      break
+    time.sleep(sleep_time)
+    sleep_time = min(5, sleep_time * 1.5)
+  if not all_connected:
+    return (False, "Timeout in disk reconnecting")
+  if multimaster:
+    # change to primary mode
+    for rd in bdevs:
+      rd.Open()
+  if multimaster:
+    msg = "multi-master and primary"
+  else:
+    msg = "single-master"
+  return (True, "Disks are now configured as %s" % msg)
+
+
+def DrbdWaitSync(nodes_ip, disks):
+  """Wait until DRBDs have synchronized.
+
+  """
+  status, bdevs = _FindDisks(nodes_ip, disks)
+  if not status:
+    return status, bdevs
+
+  min_resync = 100
+  alldone = True
+  failure = False
+  for rd in bdevs:
+    stats = rd.GetProcStatus()
+    if not (stats.is_connected or stats.is_in_resync):
+      failure = True
+      break
+    alldone = alldone and (not stats.is_in_resync)
+    if stats.sync_percent is not None:
+      min_resync = min(min_resync, stats.sync_percent)
+  return (not failure, (alldone, min_resync))
+
+
 class HooksRunner(object):
   """Hook runner.
 
 class HooksRunner(object):
   """Hook runner.