First write operation (add tag) for Ganeti RAPI
[ganeti-local] / lib / backend.py
index 382eec5..16b7bb9 100644 (file)
@@ -31,8 +31,9 @@ import errno
 import re
 import subprocess
 import random
+import logging
+import tempfile
 
-from ganeti import logger
 from ganeti import errors
 from ganeti import utils
 from ganeti import ssh
@@ -47,36 +48,86 @@ def _GetSshRunner():
   return ssh.SshRunner()
 
 
-def StartMaster():
+def _GetMasterInfo():
+  """Return the master ip and netdev.
+
+  """
+  try:
+    ss = ssconf.SimpleStore()
+    master_netdev = ss.GetMasterNetdev()
+    master_ip = ss.GetMasterIP()
+  except errors.ConfigurationError, err:
+    logging.exception("Cluster configuration incomplete")
+    return (None, None)
+  return (master_netdev, master_ip)
+
+
+def StartMaster(start_daemons):
   """Activate local node as master node.
 
-  There are two needed steps for this:
-    - run the master script
-    - register the cron script
+  The function will always try activate the IP address of the master
+  (if someone else has it, then it won't). Then, if the start_daemons
+  parameter is True, it will also start the master daemons
+  (ganet-masterd and ganeti-rapi).
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
-
-  if result.failed:
-    logger.Error("could not activate cluster interface with command %s,"
-                 " error: '%s'" % (result.cmd, result.output))
+  ok = True
+  master_netdev, master_ip = _GetMasterInfo()
+  if not master_netdev:
     return False
 
-  return True
+  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
+                     source=constants.LOCALHOST_IP_ADDRESS):
+      # we already have the ip:
+      logging.debug("Already started")
+    else:
+      logging.error("Someone else has the master ip, not activating")
+      ok = False
+  else:
+    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
+                           "dev", master_netdev, "label",
+                           "%s:0" % master_netdev])
+    if result.failed:
+      logging.error("Can't activate master IP: %s", result.output)
+      ok = False
+
+    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
+                           "-s", master_ip, master_ip])
+    # we'll ignore the exit code of arping
+
+  # and now start the master and rapi daemons
+  if start_daemons:
+    for daemon in 'ganeti-masterd', 'ganeti-rapi':
+      result = utils.RunCmd([daemon])
+      if result.failed:
+        logging.error("Can't start daemon %s: %s", daemon, result.output)
+        ok = False
+  return ok
 
 
-def StopMaster():
+def StopMaster(stop_daemons):
   """Deactivate this node as master.
 
-  This runs the master stop script.
+  The function will always try to deactivate the IP address of the
+  master. Then, if the stop_daemons parameter is True, it will also
+  stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
+  master_netdev, master_ip = _GetMasterInfo()
+  if not master_netdev:
+    return False
 
+  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
+                         "dev", master_netdev])
   if result.failed:
-    logger.Error("could not deactivate cluster interface with command %s,"
-                 " error: '%s'" % (result.cmd, result.output))
-    return False
+    logging.error("Can't remove the master IP, error: %s", result.output)
+    # but otherwise ignore the failure
+
+  if stop_daemons:
+    # stop/kill the rapi and the master daemon
+    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
 
   return True
 
@@ -101,7 +152,7 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
                                                     mkdir=True)
   except errors.OpExecError, err:
-    logger.Error("Error while processing user ssh files: %s" % err)
+    logging.exception("Error while processing user ssh files")
     return False
 
   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
@@ -126,8 +177,8 @@ def LeaveCluster():
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
-  except errors.OpExecError, err:
-    logger.Error("Error while processing ssh files: %s" % err)
+  except errors.OpExecError:
+    logging.exception("Error while processing ssh files")
     return
 
   f = open(pub_key, 'r')
@@ -254,15 +305,18 @@ def GetVolumeList(vg_name):
                          "--separator=%s" % sep,
                          "-olv_name,lv_size,lv_attr", vg_name])
   if result.failed:
-    logger.Error("Failed to list logical volumes, lvs output: %s" %
-                 result.output)
+    logging.error("Failed to list logical volumes, lvs output: %s",
+                  result.output)
     return result.output
 
+  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
   for line in result.stdout.splitlines():
-    line = line.strip().rstrip(sep)
-    name, size, attr = line.split(sep)
-    if len(attr) != 6:
-      attr = '------'
+    line = line.strip()
+    match = valid_line_re.match(line)
+    if not match:
+      logging.error("Invalid line returned from lvs output: '%s'", line)
+      continue
+    name, size, attr = match.groups()
     inactive = attr[4] == '-'
     online = attr[5] == 'o'
     lvs[name] = (size, inactive, online)
@@ -288,8 +342,8 @@ def NodeVolumes():
                          "--separator=|",
                          "--options=lv_name,lv_size,devices,vg_name"])
   if result.failed:
-    logger.Error("Failed to list logical volumes, lvs output: %s" %
-                 result.output)
+    logging.error("Failed to list logical volumes, lvs output: %s",
+                  result.output)
     return {}
 
   def parse_dev(dev):
@@ -306,7 +360,8 @@ def NodeVolumes():
       'vg': line[3].strip(),
     }
 
-  return [map_line(line.split('|')) for line in result.stdout.splitlines()]
+  return [map_line(line.split('|')) for line in result.stdout.splitlines()
+          if line.count('|') >= 3]
 
 
 def BridgesExist(bridges_list):
@@ -335,7 +390,7 @@ def GetInstanceList():
   try:
     names = hypervisor.GetHypervisor().ListInstances()
   except errors.HypervisorError, err:
-    logger.Error("error enumerating instances: %s" % str(err))
+    logging.exception("Error enumerating instances")
     raise
 
   return names
@@ -413,12 +468,12 @@ def AddOSToInstance(instance, os_disk, swap_disk):
 
   os_device = instance.FindDisk(os_disk)
   if os_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % os_disk)
+    logging.error("Can't find this device-visible name '%s'", os_disk)
     return False
 
   swap_device = instance.FindDisk(swap_disk)
   if swap_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
+    logging.error("Can't find this device-visible name '%s'", swap_disk)
     return False
 
   real_os_dev = _RecursiveFindBD(os_device)
@@ -445,9 +500,9 @@ def AddOSToInstance(instance, os_disk, swap_disk):
 
   result = utils.RunCmd(command)
   if result.failed:
-    logger.Error("os create command '%s' returned error: %s, logfile: %s,"
-                 " output: %s" %
-                 (command, result.fail_reason, logfile, result.output))
+    logging.error("os create command '%s' returned error: %s, logfile: %s,"
+                  " output: %s", command, result.fail_reason, logfile,
+                  result.output)
     return False
 
   return True
@@ -469,12 +524,12 @@ def RunRenameInstance(instance, old_name, os_disk, swap_disk):
 
   os_device = instance.FindDisk(os_disk)
   if os_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % os_disk)
+    logging.error("Can't find this device-visible name '%s'", os_disk)
     return False
 
   swap_device = instance.FindDisk(swap_disk)
   if swap_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
+    logging.error("Can't find this device-visible name '%s'", swap_disk)
     return False
 
   real_os_dev = _RecursiveFindBD(os_device)
@@ -503,9 +558,8 @@ def RunRenameInstance(instance, old_name, os_disk, swap_disk):
   result = utils.RunCmd(command)
 
   if result.failed:
-    logger.Error("os create command '%s' returned error: %s"
-                 " output: %s" %
-                 (command, result.fail_reason, result.output))
+    logging.error("os create command '%s' returned error: %s output: %s",
+                  command, result.fail_reason, result.output)
     return False
 
   return True
@@ -534,8 +588,7 @@ def _GetVGInfo(vg_name):
                          "--nosuffix", "--units=m", "--separator=:", vg_name])
 
   if retval.failed:
-    errmsg = "volume group %s not present" % vg_name
-    logger.Error(errmsg)
+    logging.error("volume group %s not present", vg_name)
     return retdic
   valarr = retval.stdout.strip().rstrip(':').split(':')
   if len(valarr) == 3:
@@ -546,10 +599,10 @@ def _GetVGInfo(vg_name):
         "pv_count": int(valarr[2]),
         }
     except ValueError, err:
-      logger.Error("Fail to parse vgs output: %s" % str(err))
+      logging.exception("Fail to parse vgs output")
   else:
-    logger.Error("vgs output has the wrong number of fields (expected"
-                 " three): %s" % str(valarr))
+    logging.error("vgs output has the wrong number of fields (expected"
+                  " three): %s", str(valarr))
   return retdic
 
 
@@ -589,7 +642,7 @@ def StartInstance(instance, extra_args):
   try:
     hyper.StartInstance(instance, block_devices, extra_args)
   except errors.HypervisorError, err:
-    logger.Error("Failed to start instance: %s" % err)
+    logging.exception("Failed to start instance")
     return False
 
   return True
@@ -611,7 +664,7 @@ def ShutdownInstance(instance):
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
-    logger.Error("Failed to stop instance: %s" % err)
+    logging.error("Failed to stop instance")
     return False
 
   # test every 10secs for 2min
@@ -624,17 +677,18 @@ def ShutdownInstance(instance):
     time.sleep(10)
   else:
     # the shutdown did not succeed
-    logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance)
+    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
 
     try:
       hyper.StopInstance(instance, force=True)
     except errors.HypervisorError, err:
-      logger.Error("Failed to stop instance: %s" % err)
+      logging.exception("Failed to stop instance")
       return False
 
     time.sleep(1)
     if instance.name in GetInstanceList():
-      logger.Error("could not shutdown instance '%s' even by destroy")
+      logging.error("could not shutdown instance '%s' even by destroy",
+                    instance.name)
       return False
 
   return True
@@ -651,7 +705,7 @@ def RebootInstance(instance, reboot_type, extra_args):
   running_instances = GetInstanceList()
 
   if instance.name not in running_instances:
-    logger.Error("Cannot reboot instance that is not running")
+    logging.error("Cannot reboot instance that is not running")
     return False
 
   hyper = hypervisor.GetHypervisor()
@@ -659,14 +713,14 @@ def RebootInstance(instance, reboot_type, extra_args):
     try:
       hyper.RebootInstance(instance)
     except errors.HypervisorError, err:
-      logger.Error("Failed to soft reboot instance: %s" % err)
+      logging.exception("Failed to soft reboot instance")
       return False
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
       ShutdownInstance(instance)
       StartInstance(instance, extra_args)
     except errors.HypervisorError, err:
-      logger.Error("Failed to hard reboot instance: %s" % err)
+      logging.exception("Failed to hard reboot instance")
       return False
   else:
     raise errors.ParameterError("reboot_type invalid")
@@ -685,7 +739,7 @@ def MigrateInstance(instance, target, live):
     hyper.MigrateInstance(instance, target, live)
   except errors.HypervisorError, err:
     msg = "Failed to migrate instance: %s" % str(err)
-    logger.Error(msg)
+    logging.error(msg)
     return (False, msg)
   return (True, "Migration successfull")
 
@@ -718,7 +772,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   try:
     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
     if device is not None:
-      logger.Info("removing existing device %s" % disk)
+      logging.info("removing existing device %s", disk)
       device.Remove()
   except errors.BlockDeviceError, err:
     pass
@@ -731,7 +785,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info):
   if on_primary or disk.AssembleOnSecondary():
     if not device.Assemble():
       errorstring = "Can't assemble device after creation"
-      logger.Error(errorstring)
+      logging.error(errorstring)
       raise errors.BlockDeviceError("%s, very unusual event - check the node"
                                     " daemon logs" % errorstring)
     device.SetSyncSpeed(constants.SYNC_SPEED)
@@ -758,7 +812,7 @@ def RemoveBlockDevice(disk):
     rdev = _RecursiveFindBD(disk, allow_partial=True)
   except errors.BlockDeviceError, err:
     # probably can't attach
-    logger.Info("Can't attach to device %s in remove" % disk)
+    logging.info("Can't attach to device %s in remove", disk)
     rdev = None
   if rdev is not None:
     r_path = rdev.dev_path
@@ -804,7 +858,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary):
         if children.count(None) >= mcn:
           raise
         cdev = None
-        logger.Debug("Error in child activation: %s" % str(err))
+        logging.debug("Error in child activation: %s", str(err))
       children.append(cdev)
 
   if as_primary or disk.AssembleOnSecondary():
@@ -868,12 +922,12 @@ def MirrorAddChildren(parent_cdev, new_cdevs):
   """
   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
   if parent_bdev is None:
-    logger.Error("Can't find parent device")
+    logging.error("Can't find parent device")
     return False
   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
   if new_bdevs.count(None) > 0:
-    logger.Error("Can't find new device(s) to add: %s:%s" %
-                 (new_bdevs, new_cdevs))
+    logging.error("Can't find new device(s) to add: %s:%s",
+                  new_bdevs, new_cdevs)
     return False
   parent_bdev.AddChildren(new_bdevs)
   return True
@@ -885,7 +939,7 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs):
   """
   parent_bdev = _RecursiveFindBD(parent_cdev)
   if parent_bdev is None:
-    logger.Error("Can't find parent in remove children: %s" % parent_cdev)
+    logging.error("Can't find parent in remove children: %s", parent_cdev)
     return False
   devs = []
   for disk in new_cdevs:
@@ -893,8 +947,8 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs):
     if rpath is None:
       bd = _RecursiveFindBD(disk)
       if bd is None:
-        logger.Error("Can't find dynamic device %s while removing children" %
-                     disk)
+        logging.error("Can't find dynamic device %s while removing children",
+                      disk)
         return False
       else:
         devs.append(bd.dev_path)
@@ -974,8 +1028,8 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
 
   """
   if not os.path.isabs(file_name):
-    logger.Error("Filename passed to UploadFile is not absolute: '%s'" %
-                 file_name)
+    logging.error("Filename passed to UploadFile is not absolute: '%s'",
+                  file_name)
     return False
 
   allowed_files = [
@@ -983,11 +1037,12 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
     constants.VNC_PASSWORD_FILE,
+    constants.JOB_QUEUE_SERIAL_FILE,
     ]
   allowed_files.extend(ssconf.SimpleStore().GetFileList())
   if file_name not in allowed_files:
-    logger.Error("Filename passed to UploadFile not in allowed"
-                 " upload targets: '%s'" % file_name)
+    logging.error("Filename passed to UploadFile not in allowed"
+                 " upload targets: '%s'", file_name)
     return False
 
   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
@@ -1071,8 +1126,7 @@ def DiagnoseOS(top_dirs=None):
       try:
         f_names = utils.ListVisibleFiles(dir_name)
       except EnvironmentError, err:
-        logger.Error("Can't list the OS directory %s: %s" %
-                     (dir_name, str(err)))
+        logging.exception("Can't list the OS directory %s", dir_name)
         break
       for name in f_names:
         try:
@@ -1253,9 +1307,8 @@ def ExportSnapshot(disk, dest_node, instance):
   result = utils.RunCmd(command)
 
   if result.failed:
-    logger.Error("os snapshot export command '%s' returned error: %s"
-                 " output: %s" %
-                 (command, result.fail_reason, result.output))
+    logging.error("os snapshot export command '%s' returned error: %s"
+                  " output: %s", command, result.fail_reason, result.output)
     return False
 
   return True
@@ -1295,7 +1348,8 @@ def FinalizeExport(instance, snap_disks):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+               '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
@@ -1363,12 +1417,12 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
 
   os_device = instance.FindDisk(os_disk)
   if os_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % os_disk)
+    logging.error("Can't find this device-visible name '%s'", os_disk)
     return False
 
   swap_device = instance.FindDisk(swap_disk)
   if swap_device is None:
-    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
+    logging.error("Can't find this device-visible name '%s'", swap_disk)
     return False
 
   real_os_dev = _RecursiveFindBD(os_device)
@@ -1403,9 +1457,8 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
   result = utils.RunCmd(command)
 
   if result.failed:
-    logger.Error("os import command '%s' returned error: %s"
-                 " output: %s" %
-                 (command, result.fail_reason, result.output))
+    logging.error("os import command '%s' returned error: %s"
+                  " output: %s", command, result.fail_reason, result.output)
     return False
 
   return True
@@ -1466,8 +1519,7 @@ def RenameBlockDevices(devlist):
         # cache? for now, we only lose lvm data when we rename, which
         # is less critical than DRBD or MD
     except errors.BlockDeviceError, err:
-      logger.Error("Can't rename device '%s' to '%s': %s" %
-                   (dev, unique_id, err))
+      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
       result = False
   return result
 
@@ -1490,9 +1542,9 @@ def _TransformFileStorageDir(file_storage_dir):
   base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
       base_file_storage_dir):
-    logger.Error("file storage directory '%s' is not under base file"
-                 " storage directory '%s'" %
-                 (file_storage_dir, base_file_storage_dir))
+    logging.error("file storage directory '%s' is not under base file"
+                  " storage directory '%s'",
+                  file_storage_dir, base_file_storage_dir)
     return None
   return file_storage_dir
 
@@ -1515,14 +1567,14 @@ def CreateFileStorageDir(file_storage_dir):
   else:
     if os.path.exists(file_storage_dir):
       if not os.path.isdir(file_storage_dir):
-        logger.Error("'%s' is not a directory" % file_storage_dir)
+        logging.error("'%s' is not a directory", file_storage_dir)
         result = False,
     else:
       try:
         os.makedirs(file_storage_dir, 0750)
       except OSError, err:
-        logger.Error("Cannot create file storage directory '%s': %s" %
-                     (file_storage_dir, err))
+        logging.error("Cannot create file storage directory '%s': %s",
+                      file_storage_dir, err)
         result = False,
   return result
 
@@ -1547,14 +1599,14 @@ def RemoveFileStorageDir(file_storage_dir):
   else:
     if os.path.exists(file_storage_dir):
       if not os.path.isdir(file_storage_dir):
-        logger.Error("'%s' is not a directory" % file_storage_dir)
+        logging.error("'%s' is not a directory", file_storage_dir)
         result = False,
       # deletes dir only if empty, otherwise we want to return False
       try:
         os.rmdir(file_storage_dir)
       except OSError, err:
-        logger.Error("Cannot remove file storage directory '%s': %s" %
-                     (file_storage_dir, err))
+        logging.exception("Cannot remove file storage directory '%s'",
+                          file_storage_dir)
         result = False,
   return result
 
@@ -1582,16 +1634,16 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
         try:
           os.rename(old_file_storage_dir, new_file_storage_dir)
         except OSError, err:
-          logger.Error("Cannot rename '%s' to '%s': %s"
-                       % (old_file_storage_dir, new_file_storage_dir, err))
+          logging.exception("Cannot rename '%s' to '%s'",
+                            old_file_storage_dir, new_file_storage_dir)
           result =  False,
       else:
-        logger.Error("'%s' is not a directory" % old_file_storage_dir)
+        logging.error("'%s' is not a directory", old_file_storage_dir)
         result = False,
     else:
       if os.path.exists(old_file_storage_dir):
-        logger.Error("Cannot rename '%s' to '%s'. Both locations exist." %
-                     old_file_storage_dir, new_file_storage_dir)
+        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
+                      old_file_storage_dir, new_file_storage_dir)
         result = False,
   return result
 
@@ -1681,7 +1733,7 @@ class HooksRunner(object):
             fd.close()
           except EnvironmentError, err:
             # just log the error
-            #logger.Error("While closing fd %s: %s" % (fd, err))
+            #logging.exception("Error while closing fd %s", fd)
             pass
 
     return result == 0, output
@@ -1791,7 +1843,7 @@ class DevCacheManager(object):
 
     """
     if dev_path is None:
-      logger.Error("DevCacheManager.UpdateCache got a None dev_path")
+      logging.error("DevCacheManager.UpdateCache got a None dev_path")
       return
     fpath = cls._ConvertPath(dev_path)
     if on_primary:
@@ -1804,8 +1856,7 @@ class DevCacheManager(object):
     try:
       utils.WriteFile(fpath, data=fdata)
     except EnvironmentError, err:
-      logger.Error("Can't update bdev cache for %s, error %s" %
-                   (dev_path, str(err)))
+      logging.exception("Can't update bdev cache for %s", dev_path)
 
   @classmethod
   def RemoveCache(cls, dev_path):
@@ -1813,11 +1864,10 @@ class DevCacheManager(object):
 
     """
     if dev_path is None:
-      logger.Error("DevCacheManager.RemoveCache got a None dev_path")
+      logging.error("DevCacheManager.RemoveCache got a None dev_path")
       return
     fpath = cls._ConvertPath(dev_path)
     try:
       utils.RemoveFile(fpath)
     except EnvironmentError, err:
-      logger.Error("Can't update bdev cache for %s, error %s" %
-                   (dev_path, str(err)))
+      logging.exception("Can't update bdev cache for %s", dev_path)