return ssh.SshRunner()
-def _GetMasterInfo():
- """Return the master ip and netdev.
+def _CleanDirectory(path, exclude=[]):
+ """Removes all regular files in a directory.
+
+ @param exclude: List of files to be excluded.
+ @type exclude: list
+
+ """
+ if not os.path.isdir(path):
+ return
+
+ # Normalize excluded paths
+ exclude = [os.path.normpath(i) for i in exclude]
+
+ for rel_name in utils.ListVisibleFiles(path):
+ full_name = os.path.normpath(os.path.join(path, rel_name))
+ if full_name in exclude:
+ continue
+ if os.path.isfile(full_name) and not os.path.islink(full_name):
+ utils.RemoveFile(full_name)
+
+
+def _JobQueuePurge(keep_lock):
+ """Removes job queue files and archived jobs
+
+ """
+ if keep_lock:
+ exclude = [constants.JOB_QUEUE_LOCK_FILE]
+ else:
+ exclude = []
+
+ _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
+ _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def GetMasterInfo():
+ """Returns master information.
+
+ This is an utility function to compute master information, either
+ for consumption here or from the node daemon.
+
+ @rtype: tuple
+ @return: (master_netdev, master_ip, master_name)
"""
try:
ss = ssconf.SimpleStore()
master_netdev = ss.GetMasterNetdev()
master_ip = ss.GetMasterIP()
+ master_node = ss.GetMasterNode()
except errors.ConfigurationError, err:
logging.exception("Cluster configuration incomplete")
return (None, None)
- return (master_netdev, master_ip)
+ return (master_netdev, master_ip, master_node)
def StartMaster(start_daemons):
"""
ok = True
- master_netdev, master_ip = _GetMasterInfo()
+ master_netdev, master_ip, _ = GetMasterInfo()
if not master_netdev:
return False
stop the master daemons (ganet-masterd and ganeti-rapi).
"""
- master_netdev, master_ip = _GetMasterInfo()
+ master_netdev, master_ip, _ = GetMasterInfo()
if not master_netdev:
return False
"""Cleans up the current node and prepares it to be removed from the cluster.
"""
- if os.path.isdir(constants.DATA_DIR):
- for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
- full_name = os.path.join(constants.DATA_DIR, rel_name)
- if os.path.isfile(full_name) and not os.path.islink(full_name):
- utils.RemoveFile(full_name)
+ _CleanDirectory(constants.DATA_DIR)
+
+ # The lock can be removed because we're going to quit anyway.
+ _JobQueuePurge(keep_lock=False)
try:
priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
inst_os.path, create_script, instance.name,
real_os_dev.dev_path, real_swap_dev.dev_path,
logfile)
+ env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
- result = utils.RunCmd(command)
+ result = utils.RunCmd(command, env=env)
if result.failed:
logging.error("os create command '%s' returned error: %s, logfile: %s,"
" output: %s", command, result.fail_reason, logfile,
constants.ETC_HOSTS,
constants.SSH_KNOWN_HOSTS_FILE,
constants.VNC_PASSWORD_FILE,
- constants.JOB_QUEUE_SERIAL_FILE,
]
allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
if file_name not in allowed_files:
logging.error("Filename passed to UploadFile not in allowed"
" upload targets: '%s'", file_name)
config.set(constants.INISECT_INS, 'nic%d_mac' %
nic_count, '%s' % nic.mac)
config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
- config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+ config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+ '%s' % nic.bridge)
# TODO: redundant: on load can read nics until it doesn't exist
config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
logfile)
command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+ env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
- result = utils.RunCmd(command)
+ result = utils.RunCmd(command, env=env)
if result.failed:
logging.error("os import command '%s' returned error: %s"
return result
+def _IsJobQueueFile(file_name):
+ """Checks whether the given filename is in the queue directory.
+
+ """
+ queue_dir = os.path.normpath(constants.QUEUE_DIR)
+ result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+ if not result:
+ logging.error("'%s' is not a file in the queue directory",
+ file_name)
+
+ return result
+
+
+def JobQueueUpdate(file_name, content):
+ """Updates a file in the queue directory.
+
+ """
+ if not _IsJobQueueFile(file_name):
+ return False
+
+ # Write and replace the file atomically
+ utils.WriteFile(file_name, data=content)
+
+ return True
+
+
+def JobQueuePurge():
+ """Removes job queue files and archived jobs
+
+ """
+ # The lock must not be removed, otherwise another process could create
+ # it again.
+ return _JobQueuePurge(keep_lock=True)
+
+
+def JobQueueRename(old, new):
+ """Renames a job queue file.
+
+ """
+ if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+ return False
+
+ os.rename(old, new)
+
+ return True
+
+
def CloseBlockDevices(disks):
"""Closes the given block devices.