Add ParseCpuMask() utility function
[ganeti-local] / lib / backend.py
index 85bb9b4..f3c2fcf 100644 (file)
@@ -58,6 +58,7 @@ from ganeti import bdev
 from ganeti import objects
 from ganeti import ssconf
 from ganeti import serializer
+from ganeti import netutils
 
 
 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
@@ -193,6 +194,7 @@ def _BuildUploadFileList():
     constants.RAPI_CERT_FILE,
     constants.RAPI_USERS_FILE,
     constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
     ])
 
   for hv_name in constants.HYPER_TYPES:
@@ -257,8 +259,8 @@ def StartMaster(start_daemons, no_voting):
   master_netdev, master_ip, _ = GetMasterInfo()
 
   err_msgs = []
-  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
-    if utils.OwnIpAddress(master_ip):
+  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+    if netutils.OwnIpAddress(master_ip):
       # we already have the ip:
       logging.debug("Master IP already configured, doing nothing")
     else:
@@ -486,8 +488,8 @@ def VerifyNode(what, cluster_name):
 
   """
   result = {}
-  my_name = utils.HostInfo().name
-  port = utils.GetDaemonPort(constants.NODED)
+  my_name = netutils.HostInfo().name
+  port = netutils.GetDaemonPort(constants.NODED)
 
   if constants.NV_HYPERVISOR in what:
     result[constants.NV_HYPERVISOR] = tmp = {}
@@ -524,10 +526,10 @@ def VerifyNode(what, cluster_name):
     else:
       for name, pip, sip in what[constants.NV_NODENETTEST]:
         fail = []
-        if not utils.TcpPing(pip, port, source=my_pip):
+        if not netutils.TcpPing(pip, port, source=my_pip):
           fail.append("primary")
         if sip != pip:
-          if not utils.TcpPing(sip, port, source=my_sip):
+          if not netutils.TcpPing(sip, port, source=my_sip):
             fail.append("secondary")
         if fail:
           tmp[name] = ("failure using the %s interface(s)" %
@@ -538,10 +540,10 @@ def VerifyNode(what, cluster_name):
     # rest of the function)
     master_name, master_ip = what[constants.NV_MASTERIP]
     if master_name == my_name:
-      source = constants.LOCALHOST_IP_ADDRESS
+      source = constants.IP4_ADDRESS_LOCALHOST
     else:
       source = None
-    result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
+    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
                                                   source=source)
 
   if constants.NV_LVLIST in what:
@@ -583,6 +585,16 @@ def VerifyNode(what, cluster_name):
       used_minors = str(err)
     result[constants.NV_DRBDLIST] = used_minors
 
+  if constants.NV_DRBDHELPER in what:
+    status = True
+    try:
+      payload = bdev.BaseDRBD.GetUsermodeHelper()
+    except errors.BlockDeviceError, err:
+      logging.error("Can't get DRBD usermode helper: %s", str(err))
+      status = False
+      payload = str(err)
+    result[constants.NV_DRBDHELPER] = (status, payload)
+
   if constants.NV_NODESETUP in what:
     result[constants.NV_NODESETUP] = tmpr = []
     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
@@ -598,6 +610,9 @@ def VerifyNode(what, cluster_name):
   if constants.NV_TIME in what:
     result[constants.NV_TIME] = utils.SplitTime(time.time())
 
+  if constants.NV_OSLIST in what:
+    result[constants.NV_OSLIST] = DiagnoseOS()
+
   return result
 
 
@@ -1773,13 +1788,16 @@ def DiagnoseOS(top_dirs=None):
       search (if not given defaults to
       L{constants.OS_SEARCH_PATH})
   @rtype: list of L{objects.OS}
-  @return: a list of tuples (name, path, status, diagnose, variants)
-      for all (potential) OSes under all search paths, where:
+  @return: a list of tuples (name, path, status, diagnose, variants,
+      parameters, api_version) for all (potential) OSes under all
+      search paths, where:
           - name is the (potential) OS name
           - path is the full path to the OS
           - status True/False is the validity of the OS
           - diagnose is the error message for an invalid OS, otherwise empty
           - variants is a list of supported OS variants, if any
+          - parameters is a list of (name, help) parameters, if any
+          - api_version is a list of support OS API versions
 
   """
   if top_dirs is None:
@@ -1799,10 +1817,13 @@ def DiagnoseOS(top_dirs=None):
         if status:
           diagnose = ""
           variants = os_inst.supported_variants
+          parameters = os_inst.supported_parameters
+          api_versions = os_inst.api_versions
         else:
           diagnose = os_inst
-          variants = []
-        result.append((name, os_path, status, diagnose, variants))
+          variants = parameters = api_versions = []
+        result.append((name, os_path, status, diagnose, variants,
+                       parameters, api_versions))
 
   return result
 
@@ -1844,6 +1865,11 @@ def _TryOSFromDisk(name, base_dir=None):
   if max(api_versions) >= constants.OS_API_V15:
     os_files[constants.OS_VARIANTS_FILE] = ''
 
+  if max(api_versions) >= constants.OS_API_V20:
+    os_files[constants.OS_PARAMETERS_FILE] = ''
+  else:
+    del os_files[constants.OS_SCRIPT_VERIFY]
+
   for filename in os_files:
     os_files[filename] = utils.PathJoin(os_dir, filename)
 
@@ -1862,7 +1888,7 @@ def _TryOSFromDisk(name, base_dir=None):
         return False, ("File '%s' under path '%s' is not executable" %
                        (filename, os_dir))
 
-  variants = None
+  variants = []
   if constants.OS_VARIANTS_FILE in os_files:
     variants_file = os_files[constants.OS_VARIANTS_FILE]
     try:
@@ -1873,12 +1899,25 @@ def _TryOSFromDisk(name, base_dir=None):
     if not variants:
       return False, ("No supported os variant found")
 
+  parameters = []
+  if constants.OS_PARAMETERS_FILE in os_files:
+    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
+    try:
+      parameters = utils.ReadFile(parameters_file).splitlines()
+    except EnvironmentError, err:
+      return False, ("Error while reading the OS parameters file at %s: %s" %
+                     (parameters_file, _ErrnoOrStr(err)))
+    parameters = [v.split(None, 1) for v in parameters]
+
   os_obj = objects.OS(name=name, path=os_dir,
                       create_script=os_files[constants.OS_SCRIPT_CREATE],
                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
+                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
+                                                 None),
                       supported_variants=variants,
+                      supported_parameters=parameters,
                       api_versions=api_versions)
   return True, os_obj
 
@@ -1910,13 +1949,13 @@ def OSFromDisk(name, base_dir=None):
   return payload
 
 
-def OSEnvironment(instance, inst_os, debug=0):
-  """Calculate the environment for an os script.
+def OSCoreEnv(inst_os, os_params, debug=0):
+  """Calculate the basic environment for an os script.
 
-  @type instance: L{objects.Instance}
-  @param instance: target instance for the os script run
   @type inst_os: L{objects.OS}
   @param inst_os: operating system for which the environment is being built
+  @type os_params: dict
+  @param os_params: the OS parameters
   @type debug: integer
   @param debug: debug level (0 or 1, for OS Api 10)
   @rtype: dict
@@ -1929,18 +1968,48 @@ def OSEnvironment(instance, inst_os, debug=0):
   api_version = \
     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
   result['OS_API_VERSION'] = '%d' % api_version
-  result['INSTANCE_NAME'] = instance.name
-  result['INSTANCE_OS'] = instance.os
-  result['HYPERVISOR'] = instance.hypervisor
-  result['DISK_COUNT'] = '%d' % len(instance.disks)
-  result['NIC_COUNT'] = '%d' % len(instance.nics)
+  result['OS_NAME'] = inst_os.name
   result['DEBUG_LEVEL'] = '%d' % debug
+
+  # OS variants
   if api_version >= constants.OS_API_V15:
     try:
-      variant = instance.os.split('+', 1)[1]
+      variant = inst_os.name.split('+', 1)[1]
     except IndexError:
       variant = inst_os.supported_variants[0]
     result['OS_VARIANT'] = variant
+
+  # OS params
+  for pname, pvalue in os_params.items():
+    result['OSP_%s' % pname.upper()] = pvalue
+
+  return result
+
+
+def OSEnvironment(instance, inst_os, debug=0):
+  """Calculate the environment for an os script.
+
+  @type instance: L{objects.Instance}
+  @param instance: target instance for the os script run
+  @type inst_os: L{objects.OS}
+  @param inst_os: operating system for which the environment is being built
+  @type debug: integer
+  @param debug: debug level (0 or 1, for OS Api 10)
+  @rtype: dict
+  @return: dict of environment variables
+  @raise errors.BlockDeviceError: if the block device
+      cannot be found
+
+  """
+  result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
+
+  result['INSTANCE_NAME'] = instance.name
+  result['INSTANCE_OS'] = instance.os
+  result['HYPERVISOR'] = instance.hypervisor
+  result['DISK_COUNT'] = '%d' % len(instance.disks)
+  result['NIC_COUNT'] = '%d' % len(instance.nics)
+
+  # Disks
   for idx, disk in enumerate(instance.disks):
     real_disk = _OpenRealBD(disk)
     result['DISK_%d_PATH' % idx] = real_disk.dev_path
@@ -1953,6 +2022,8 @@ def OSEnvironment(instance, inst_os, debug=0):
     elif disk.dev_type == constants.LD_FILE:
       result['DISK_%d_BACKEND_TYPE' % idx] = \
         'file:%s' % disk.physical_id[0]
+
+  # NICs
   for idx, nic in enumerate(instance.nics):
     result['NIC_%d_MAC' % idx] = nic.mac
     if nic.ip:
@@ -1966,6 +2037,7 @@ def OSEnvironment(instance, inst_os, debug=0):
       result['NIC_%d_FRONTEND_TYPE' % idx] = \
         instance.hvparams[constants.HV_NIC_TYPE]
 
+  # HV/BE params
   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
     for key, value in source.items():
       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
@@ -2097,6 +2169,10 @@ def FinalizeExport(instance, snap_disks):
   for name, value in instance.beparams.items():
     config.set(constants.INISECT_BEP, name, str(value))
 
+  config.add_section(constants.INISECT_OSP)
+  for name, value in instance.osparams.items():
+    config.set(constants.INISECT_OSP, name, str(value))
+
   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
                   data=config.Dumps())
   shutil.rmtree(finaldestdir, ignore_errors=True)
@@ -2134,7 +2210,7 @@ def ListExports():
 
   """
   if os.path.isdir(constants.EXPORT_DIR):
-    return utils.ListVisibleFiles(constants.EXPORT_DIR)
+    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
   else:
     _Fail("No exports directory")
 
@@ -2356,24 +2432,6 @@ def JobQueueRename(old, new):
   utils.RenameFile(old, new, mkdir=True)
 
 
-def JobQueueSetDrainFlag(drain_flag):
-  """Set the drain flag for the queue.
-
-  This will set or unset the queue drain flag.
-
-  @type drain_flag: boolean
-  @param drain_flag: if True, will set the drain flag, otherwise reset it.
-  @rtype: truple
-  @return: always True, None
-  @warning: the function always returns True
-
-  """
-  if drain_flag:
-    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
-  else:
-    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
-
-
 def BlockdevClose(instance_name, disks):
   """Closes the given block devices.
 
@@ -2428,6 +2486,70 @@ def ValidateHVParams(hvname, hvparams):
     _Fail(str(err), log=False)
 
 
+def _CheckOSPList(os_obj, parameters):
+  """Check whether a list of parameters is supported by the OS.
+
+  @type os_obj: L{objects.OS}
+  @param os_obj: OS object to check
+  @type parameters: list
+  @param parameters: the list of parameters to check
+
+  """
+  supported = [v[0] for v in os_obj.supported_parameters]
+  delta = frozenset(parameters).difference(supported)
+  if delta:
+    _Fail("The following parameters are not supported"
+          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
+
+
+def ValidateOS(required, osname, checks, osparams):
+  """Validate the given OS' parameters.
+
+  @type required: boolean
+  @param required: whether absence of the OS should translate into
+      failure or not
+  @type osname: string
+  @param osname: the OS to be validated
+  @type checks: list
+  @param checks: list of the checks to run (currently only 'parameters')
+  @type osparams: dict
+  @param osparams: dictionary with OS parameters
+  @rtype: boolean
+  @return: True if the validation passed, or False if the OS was not
+      found and L{required} was false
+
+  """
+  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
+    _Fail("Unknown checks required for OS %s: %s", osname,
+          set(checks).difference(constants.OS_VALIDATE_CALLS))
+
+  name_only = osname.split("+", 1)[0]
+  status, tbv = _TryOSFromDisk(name_only, None)
+
+  if not status:
+    if required:
+      _Fail(tbv)
+    else:
+      return False
+
+  if max(tbv.api_versions) < constants.OS_API_V20:
+    return True
+
+  if constants.OS_VALIDATE_PARAMETERS in checks:
+    _CheckOSPList(tbv, osparams.keys())
+
+  validate_env = OSCoreEnv(tbv, osparams)
+  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
+                        cwd=tbv.path)
+  if result.failed:
+    logging.error("os validate command '%s' returned error: %s output: %s",
+                  result.cmd, result.fail_reason, result.output)
+    _Fail("OS validation script failed (%s), output: %s",
+          result.fail_reason, result.output, log=False)
+
+  return True
+
+
 def DemoteFromMC():
   """Demotes the current node from master candidate role.
 
@@ -2470,7 +2592,7 @@ def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
 
   """
   (key_pem, cert_pem) = \
-    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
+    utils.GenerateSelfSignedX509Cert(netutils.HostInfo.SysName(),
                                      min(validity, _MAX_SSL_CERT_VALIDITY))
 
   cert_dir = tempfile.mkdtemp(dir=cryptodir,
@@ -2525,6 +2647,7 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
   env = None
   prefix = None
   suffix = None
+  exp_size = None
 
   if ieio == constants.IEIO_FILE:
     (filename, ) = ieargs
@@ -2549,6 +2672,14 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
     elif mode == constants.IEM_EXPORT:
       suffix = "< %s" % quoted_filename
 
+      # Retrieve file size
+      try:
+        st = os.stat(filename)
+      except EnvironmentError, err:
+        logging.error("Can't stat(2) %s: %s", filename, err)
+      else:
+        exp_size = utils.BytesToMebibyte(st.st_size)
+
   elif ieio == constants.IEIO_RAW_DISK:
     (disk, ) = ieargs
 
@@ -2572,6 +2703,7 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
                                    real_disk.dev_path,
                                    str(1024 * 1024), # 1 MB
                                    str(disk.size))
+      exp_size = disk.size
 
   elif ieio == constants.IEIO_SCRIPT:
     (disk, disk_index, ) = ieargs
@@ -2602,10 +2734,13 @@ def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
     elif mode == constants.IEM_EXPORT:
       prefix = "%s |" % script_cmd
 
+    # Let script predict size
+    exp_size = constants.IE_CUSTOM_SIZE
+
   else:
     _Fail("Invalid %s I/O mode %r", mode, ieio)
 
-  return (env, prefix, suffix)
+  return (env, prefix, suffix, exp_size)
 
 
 def _CreateImportExportStatusDir(prefix):
@@ -2651,7 +2786,7 @@ def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
   if (opts.key_name is None) ^ (opts.ca_pem is None):
     _Fail("Cluster certificate can only be used for both key and CA")
 
-  (cmd_env, cmd_prefix, cmd_suffix) = \
+  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
 
   if opts.key_name is None:
@@ -2697,6 +2832,15 @@ def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
     if port:
       cmd.append("--port=%s" % port)
 
+    if opts.compress:
+      cmd.append("--compress=%s" % opts.compress)
+
+    if opts.magic:
+      cmd.append("--magic=%s" % opts.magic)
+
+    if exp_size is not None:
+      cmd.append("--expected-size=%s" % exp_size)
+
     if cmd_prefix:
       cmd.append("--cmd-prefix=%s" % cmd_prefix)
 
@@ -2762,7 +2906,7 @@ def AbortImportExport(name):
   if pid:
     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
                  name, pid)
-    os.kill(pid, signal.SIGTERM)
+    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
 
 
 def CleanupImportExport(name):
@@ -2791,7 +2935,7 @@ def _FindDisks(nodes_ip, disks):
 
   """
   # set the correct physical ID
-  my_name = utils.HostInfo().name
+  my_name = netutils.HostInfo().name
   for cf in disks:
     cf.SetPhysicalID(my_name, nodes_ip)
 
@@ -2912,6 +3056,16 @@ def DrbdWaitSync(nodes_ip, disks):
   return (alldone, min_resync)
 
 
+def GetDrbdUsermodeHelper():
+  """Returns DRBD usermode helper currently configured.
+
+  """
+  try:
+    return bdev.BaseDRBD.GetUsermodeHelper()
+  except errors.BlockDeviceError, err:
+    _Fail(str(err))
+
+
 def PowercycleNode(hypervisor_type):
   """Hard-powercycle the node.