gnt-* use the correct opcode slot to build opcodes
[ganeti-local] / lib / backend.py
index 456ba69..4e40cb7 100644 (file)
 # 02110-1301, USA.
 
 
-"""Functions used by the node daemon"""
+"""Functions used by the node daemon
+
+@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
+     the L{UploadFile} function
+
+"""
 
 
 import os
@@ -46,6 +51,9 @@ from ganeti import objects
 from ganeti import ssconf
 
 
+_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
+
+
 class RPCFail(Exception):
   """Class denoting RPC failure.
 
@@ -53,6 +61,7 @@ class RPCFail(Exception):
 
   """
 
+
 def _Fail(msg, *args, **kwargs):
   """Log an error and the raise an RPCFail exception.
 
@@ -145,6 +154,32 @@ def _CleanDirectory(path, exclude=None):
       utils.RemoveFile(full_name)
 
 
+def _BuildUploadFileList():
+  """Build the list of allowed upload files.
+
+  This is abstracted so that it's built only once at module import time.
+
+  """
+  allowed_files = set([
+    constants.CLUSTER_CONF_FILE,
+    constants.ETC_HOSTS,
+    constants.SSH_KNOWN_HOSTS_FILE,
+    constants.VNC_PASSWORD_FILE,
+    constants.RAPI_CERT_FILE,
+    constants.RAPI_USERS_FILE,
+    constants.HMAC_CLUSTER_KEY,
+    ])
+
+  for hv_name in constants.HYPER_TYPES:
+    hv_class = hypervisor.GetHypervisorClass(hv_name)
+    allowed_files.update(hv_class.GetAncillaryFiles())
+
+  return frozenset(allowed_files)
+
+
+_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
+
+
 def JobQueuePurge():
   """Removes job queue files and archived jobs.
 
@@ -177,7 +212,7 @@ def GetMasterInfo():
   return (master_netdev, master_ip, master_node)
 
 
-def StartMaster(start_daemons):
+def StartMaster(start_daemons, no_voting):
   """Activate local node as master node.
 
   The function will always try activate the IP address of the master
@@ -187,6 +222,9 @@ def StartMaster(start_daemons):
   @type start_daemons: boolean
   @param start_daemons: whether to also start the master
       daemons (ganeti-masterd and ganeti-rapi)
+  @type no_voting: boolean
+  @param no_voting: whether to start ganeti-masterd without a node vote
+      (if start_daemons is True), but still non-interactively
   @rtype: None
 
   """
@@ -217,8 +255,17 @@ def StartMaster(start_daemons):
 
   # and now start the master and rapi daemons
   if start_daemons:
-    for daemon in 'ganeti-masterd', 'ganeti-rapi':
-      result = utils.RunCmd([daemon])
+    daemons_params = {
+        'ganeti-masterd': [],
+        'ganeti-rapi': [],
+        }
+    if no_voting:
+      daemons_params['ganeti-masterd'].append('--no-voting')
+      daemons_params['ganeti-masterd'].append('--yes-do-it')
+    for daemon in daemons_params:
+      cmd = [daemon]
+      cmd.extend(daemons_params[daemon])
+      result = utils.RunCmd(cmd)
       if result.failed:
         msg = "Can't start daemon %s: %s" % (daemon, result.output)
         logging.error(msg)
@@ -255,7 +302,7 @@ def StopMaster(stop_daemons):
 
   if stop_daemons:
     # stop/kill the rapi and the master daemon
-    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+    for daemon in constants.RAPI, constants.MASTERD:
       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
 
 
@@ -321,17 +368,25 @@ def LeaveCluster():
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
 
-    f = open(pub_key, 'r')
-    try:
-      utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
-    finally:
-      f.close()
+    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
 
     utils.RemoveFile(priv_key)
     utils.RemoveFile(pub_key)
   except errors.OpExecError:
     logging.exception("Error while processing ssh files")
 
+  try:
+    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
+    utils.RemoveFile(constants.RAPI_CERT_FILE)
+    utils.RemoveFile(constants.SSL_CERT_FILE)
+  except:
+    logging.exception("Error while removing cluster secrets")
+
+  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
+
+  if confd_pid:
+    utils.KillProcess(confd_pid, timeout=2)
+
   # Raise a custom exception (handled in ganeti-noded)
   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
 
@@ -363,11 +418,7 @@ def GetNodeInfo(vgname, hypervisor_type):
   if hyp_info is not None:
     outputarray.update(hyp_info)
 
-  f = open("/proc/sys/kernel/random/boot_id", 'r')
-  try:
-    outputarray["bootid"] = f.read(128).rstrip("\n")
-  finally:
-    f.close()
+  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
 
   return outputarray
 
@@ -433,7 +484,7 @@ def VerifyNode(what, cluster_name):
       tmp[my_name] = ("Can't find my own primary/secondary IP"
                       " in the node list")
     else:
-      port = utils.GetNodeDaemonPort()
+      port = utils.GetDaemonPort(constants.NODED)
       for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
@@ -508,6 +559,11 @@ def GetVolumeList(vg_name):
     name, size, attr = match.groups()
     inactive = attr[4] == '-'
     online = attr[5] == 'o'
+    virtual = attr[0] == 'v'
+    if virtual:
+      # we don't want to report such volumes as existing, since they
+      # don't really hold data
+      continue
     lvs[name] = (size, inactive, online)
 
   return lvs
@@ -904,40 +960,48 @@ def StartInstance(instance):
     _Fail("Hypervisor error: %s", err, exc=True)
 
 
-def InstanceShutdown(instance):
+def InstanceShutdown(instance, timeout):
   """Shut an instance down.
 
   @note: this functions uses polling with a hardcoded timeout.
 
   @type instance: L{objects.Instance}
   @param instance: the instance object
+  @type timeout: integer
+  @param timeout: maximum timeout for soft shutdown
   @rtype: None
 
   """
   hv_name = instance.hypervisor
-  running_instances = GetInstanceList([hv_name])
+  hyper = hypervisor.GetHypervisor(hv_name)
+  running_instances = hyper.ListInstances()
   iname = instance.name
 
   if iname not in running_instances:
     logging.info("Instance %s not running, doing nothing", iname)
     return
 
-  hyper = hypervisor.GetHypervisor(hv_name)
-  try:
-    hyper.StopInstance(instance)
-  except errors.HypervisorError, err:
-    _Fail("Failed to stop instance %s: %s", iname, err)
-
-  # test every 10secs for 2min
+  start = time.time()
+  end = start + timeout
+  sleep_time = 1
 
-  time.sleep(1)
-  for _ in range(11):
-    if instance.name not in GetInstanceList([hv_name]):
+  tried_once = False
+  while not tried_once and time.time() < end:
+    try:
+      hyper.StopInstance(instance, retry=tried_once)
+    except errors.HypervisorError, err:
+      _Fail("Failed to stop instance %s: %s", iname, err)
+    tried_once = True
+    time.sleep(sleep_time)
+    if instance.name not in hyper.ListInstances():
       break
-    time.sleep(10)
+    if sleep_time < 5:
+      # 1.2 behaves particularly good for our case:
+      # it gives us 10 increasing steps and caps just slightly above 5 seconds
+      sleep_time *= 1.2
   else:
     # the shutdown did not succeed
-    logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
+    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
 
     try:
       hyper.StopInstance(instance, force=True)
@@ -951,7 +1015,7 @@ def InstanceShutdown(instance):
   _RemoveBlockDevLinks(iname, instance.disks)
 
 
-def InstanceReboot(instance, reboot_type):
+def InstanceReboot(instance, reboot_type, shutdown_timeout):
   """Reboot an instance.
 
   @type instance: L{objects.Instance}
@@ -967,6 +1031,8 @@ def InstanceReboot(instance, reboot_type):
         not accepted here, since that mode is handled differently, in
         cmdlib, and translates into full stop and start of the
         instance (instead of a call_instance_reboot RPC)
+  @type timeout: integer
+  @param timeout: maximum timeout for soft shutdown
   @rtype: None
 
   """
@@ -983,7 +1049,7 @@ def InstanceReboot(instance, reboot_type):
       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
     try:
-      InstanceShutdown(instance)
+      InstanceShutdown(instance, shutdown_timeout)
       return StartInstance(instance)
     except errors.HypervisorError, err:
       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
@@ -1341,7 +1407,9 @@ def BlockdevGetmirrorstatus(disks):
     rbd = _RecursiveFindBD(dsk)
     if rbd is None:
       _Fail("Can't find device %s", dsk)
+
     stats.append(rbd.CombinedSyncStatus())
+
   return stats
 
 
@@ -1372,19 +1440,94 @@ def BlockdevFind(disk):
 
   @type disk: L{objects.Disk}
   @param disk: the disk to find
-  @rtype: None or tuple
-  @return: None if the disk cannot be found, otherwise a
-      tuple (device_path, major, minor, sync_percent,
-      estimated_time, is_degraded)
+  @rtype: None or objects.BlockDevStatus
+  @return: None if the disk cannot be found, otherwise a the current
+           information
 
   """
   try:
     rbd = _RecursiveFindBD(disk)
   except errors.BlockDeviceError, err:
     _Fail("Failed to find device: %s", err, exc=True)
+
   if rbd is None:
     return None
-  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
+
+  return rbd.GetSyncStatus()
+
+
+def BlockdevGetsize(disks):
+  """Computes the size of the given disks.
+
+  If a disk is not found, returns None instead.
+
+  @type disks: list of L{objects.Disk}
+  @param disks: the list of disk to compute the size for
+  @rtype: list
+  @return: list with elements None if the disk cannot be found,
+      otherwise the size
+
+  """
+  result = []
+  for cf in disks:
+    try:
+      rbd = _RecursiveFindBD(cf)
+    except errors.BlockDeviceError, err:
+      result.append(None)
+      continue
+    if rbd is None:
+      result.append(None)
+    else:
+      result.append(rbd.GetActualSize())
+  return result
+
+
+def BlockdevExport(disk, dest_node, dest_path, cluster_name):
+  """Export a block device to a remote node.
+
+  @type disk: L{objects.Disk}
+  @param disk: the description of the disk to export
+  @type dest_node: str
+  @param dest_node: the destination node to export to
+  @type dest_path: str
+  @param dest_path: the destination path on the target node
+  @type cluster_name: str
+  @param cluster_name: the cluster name, needed for SSH hostalias
+  @rtype: None
+
+  """
+  real_disk = _RecursiveFindBD(disk)
+  if real_disk is None:
+    _Fail("Block device '%s' is not set up", disk)
+
+  real_disk.Open()
+
+  # the block size on the read dd is 1MiB to match our units
+  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
+                               "dd if=%s bs=1048576 count=%s",
+                               real_disk.dev_path, str(disk.size))
+
+  # we set here a smaller block size as, due to ssh buffering, more
+  # than 64-128k will mostly ignored; we use nocreat to fail if the
+  # device is not already there or we pass a wrong path; we use
+  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
+  # to not buffer too much memory; this means that at best, we flush
+  # every 64k, which will not be very fast
+  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
+                                " oflag=dsync", dest_path)
+
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
+
+  # all commands have been checked, so we're safe to combine them
+  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
+
+  result = utils.RunCmd(["bash", "-c", command])
+
+  if result.failed:
+    _Fail("Disk copy command '%s' returned error: %s"
+          " output: %s", command, result.fail_reason, result.output)
 
 
 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
@@ -1413,20 +1556,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
   if not os.path.isabs(file_name):
     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
 
-  allowed_files = set([
-    constants.CLUSTER_CONF_FILE,
-    constants.ETC_HOSTS,
-    constants.SSH_KNOWN_HOSTS_FILE,
-    constants.VNC_PASSWORD_FILE,
-    constants.RAPI_CERT_FILE,
-    constants.RAPI_USERS_FILE,
-    ])
-
-  for hv_name in constants.HYPER_TYPES:
-    hv_class = hypervisor.GetHypervisor(hv_name)
-    allowed_files.update(hv_class.GetAncillaryFiles())
-
-  if file_name not in allowed_files:
+  if file_name not in _ALLOWED_UPLOAD_FILES:
     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
           file_name)
 
@@ -1478,17 +1608,17 @@ def _OSOndiskAPIVersion(name, os_dir):
       data holding either the vaid versions or an error message
 
   """
-  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
+  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
 
   try:
     st = os.stat(api_file)
   except EnvironmentError, err:
-    return False, ("Required file 'ganeti_api_version' file not"
-                   " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
+    return False, ("Required file '%s' not found under path %s: %s" %
+                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
 
   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
-    return False, ("File 'ganeti_api_version' file at %s is not"
-                   " a regular file" % os_dir)
+    return False, ("File '%s' in %s is not a regular file" %
+                   (constants.OS_API_FILE, os_dir))
 
   try:
     api_versions = utils.ReadFile(api_file).splitlines()
@@ -1513,12 +1643,13 @@ def DiagnoseOS(top_dirs=None):
       search (if not given defaults to
       L{constants.OS_SEARCH_PATH})
   @rtype: list of L{objects.OS}
-  @return: a list of tuples (name, path, status, diagnose)
+  @return: a list of tuples (name, path, status, diagnose, variants)
       for all (potential) OSes under all search paths, where:
           - name is the (potential) OS name
           - path is the full path to the OS
           - status True/False is the validity of the OS
           - diagnose is the error message for an invalid OS, otherwise empty
+          - variants is a list of supported OS variants, if any
 
   """
   if top_dirs is None:
@@ -1537,9 +1668,11 @@ def DiagnoseOS(top_dirs=None):
         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
         if status:
           diagnose = ""
+          variants = os_inst.supported_variants
         else:
           diagnose = os_inst
-        result.append((name, os_path, status, diagnose))
+          variants = []
+        result.append((name, os_path, status, diagnose, variants))
 
   return result
 
@@ -1574,31 +1707,47 @@ def _TryOSFromDisk(name, base_dir=None):
     return False, ("API version mismatch for path '%s': found %s, want %s." %
                    (os_dir, api_versions, constants.OS_API_VERSIONS))
 
-  # OS Scripts dictionary, we will populate it with the actual script names
-  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
+  # OS Files dictionary, we will populate it with the absolute path names
+  os_files = dict.fromkeys(constants.OS_SCRIPTS)
+
+  if max(api_versions) >= constants.OS_API_V15:
+    os_files[constants.OS_VARIANTS_FILE] = ''
 
-  for script in os_scripts:
-    os_scripts[script] = os.path.sep.join([os_dir, script])
+  for name in os_files:
+    os_files[name] = os.path.sep.join([os_dir, name])
 
     try:
-      st = os.stat(os_scripts[script])
+      st = os.stat(os_files[name])
     except EnvironmentError, err:
-      return False, ("Script '%s' under path '%s' is missing (%s)" %
-                     (script, os_dir, _ErrnoOrStr(err)))
-
-    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
-      return False, ("Script '%s' under path '%s' is not executable" %
-                     (script, os_dir))
+      return False, ("File '%s' under path '%s' is missing (%s)" %
+                     (name, os_dir, _ErrnoOrStr(err)))
 
     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
-      return False, ("Script '%s' under path '%s' is not a regular file" %
-                     (script, os_dir))
+      return False, ("File '%s' under path '%s' is not a regular file" %
+                     (name, os_dir))
+
+    if name in constants.OS_SCRIPTS:
+      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
+        return False, ("File '%s' under path '%s' is not executable" %
+                       (name, os_dir))
+
+  variants = None
+  if constants.OS_VARIANTS_FILE in os_files:
+    variants_file = os_files[constants.OS_VARIANTS_FILE]
+    try:
+      variants = utils.ReadFile(variants_file).splitlines()
+    except EnvironmentError, err:
+      return False, ("Error while reading the OS variants file at %s: %s" %
+                     (variants_file, _ErrnoOrStr(err)))
+    if not variants:
+      return False, ("No supported os variant found")
 
   os_obj = objects.OS(name=name, path=os_dir,
-                      create_script=os_scripts[constants.OS_SCRIPT_CREATE],
-                      export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
-                      import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
-                      rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
+                      create_script=os_files[constants.OS_SCRIPT_CREATE],
+                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
+                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
+                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
+                      supported_variants=variants,
                       api_versions=api_versions)
   return True, os_obj
 
@@ -1621,7 +1770,8 @@ def OSFromDisk(name, base_dir=None):
   @raise RPCFail: if we don't find a valid OS
 
   """
-  status, payload = _TryOSFromDisk(name, base_dir)
+  name_only = name.split("+", 1)[0]
+  status, payload = _TryOSFromDisk(name_only, base_dir)
 
   if not status:
     _Fail(payload)
@@ -1653,6 +1803,12 @@ def OSEnvironment(instance, os, debug=0):
   result['DISK_COUNT'] = '%d' % len(instance.disks)
   result['NIC_COUNT'] = '%d' % len(instance.nics)
   result['DEBUG_LEVEL'] = '%d' % debug
+  if api_version >= constants.OS_API_V15:
+    try:
+      variant = instance.os.split('+', 1)[1]
+    except IndexError:
+      variant = os.supported_variants[0]
+    result['OS_VARIANT'] = variant
   for idx, disk in enumerate(instance.disks):
     real_disk = _RecursiveFindBD(disk)
     if real_disk is None:
@@ -1787,8 +1943,8 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
   # the target command is built out of three individual commands,
   # which are joined by pipes; we check each individual command for
   # valid parameters
-  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
-                               export_script, logfile)
+  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
+                               inst_os.path, export_script, logfile)
 
   comprcmd = "gzip"
 
@@ -1801,7 +1957,7 @@ def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
 
-  result = utils.RunCmd(command, env=export_env)
+  result = utils.RunCmd(["bash", "-c", command], env=export_env)
 
   if result.failed:
     _Fail("OS snapshot export command '%s' returned error: %s"
@@ -2250,7 +2406,7 @@ def DemoteFromMC():
   master, myself = ssconf.GetMasterAndMyself()
   if master == myself:
     _Fail("ssconf status shows I'm the master node, will not demote")
-  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
+  pid_file = utils.DaemonPidFileName(constants.MASTERD)
   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
     _Fail("The master daemon is running, will not demote")
   try: