# global ignores
*.py[co]
+*.swp
# /
/Makefile
/*.tar.bz2
/*.tar.gz
+# daemons
+/daemons/ganeti-cleaner
+
# devel
/devel/clean-cluster
/devel/upload
devel \
doc \
doc/examples \
+ doc/examples/hooks \
lib \
lib/http \
lib/hypervisor \
CLEANFILES = \
autotools/replace_vars.sed \
+ daemons/ganeti-cleaner \
devel/upload \
doc/rapi-resources.gen \
doc/examples/bash_completion \
scripts/gnt-node \
scripts/gnt-os
+nodist_sbin_SCRIPTS = \
+ daemons/ganeti-cleaner
+
dist_tools_SCRIPTS = \
tools/burnin \
tools/cfgshell \
$(MAINTAINERCLEANFILES) \
NEWS \
DEVNOTES \
+ pylintrc \
autotools/docbook-wrapper \
+ daemons/ganeti-cleaner.in \
devel/upload.in \
$(docrst) \
$(docdot) \
doc/examples/ganeti.initd.in \
doc/examples/ganeti.cron.in \
doc/examples/dumb-allocator \
+ doc/examples/hooks/ethers \
doc/locking.txt \
test/testutils.py \
test/mocks.py \
$(REPLACE_VARS_SED)
sed -f $(REPLACE_VARS_SED) < $< > $@
+daemons/ganeti-cleaner: daemons/ganeti-cleaner.in stamp-directories \
+ $(REPLACE_VARS_SED)
+ sed -f $(REPLACE_VARS_SED) < $< > $@
+ chmod +x $@
+
doc/%.html: doc/%.rst
@test -n "$(RST2HTML)" || { echo 'rst2html' not found during configure; exit 1; }
$(RST2HTML) $< $@
--- /dev/null
+#!/bin/bash
+#
+
+# Copyright (C) 2009 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+set -e
+
+DATA_DIR=@LOCALSTATEDIR@/lib/ganeti
+QUEUE_ARCHIVE_DIR=$DATA_DIR/queue/archive
+
+# Define how many days archived jobs should be left alone
+REMOVE_AFTER=21
+
+# Exit if machine is not part of a cluster
+[[ -e $DATA_DIR/ssconf_master_node ]] || echo 0
+
+# Exit if queue archive directory doesn't exist
+[[ -d $QUEUE_ARCHIVE_DIR ]] || exit 0
+
+# Remove old jobs
+find $QUEUE_ARCHIVE_DIR -mindepth 2 -type f -mtime +$REMOVE_AFTER -print0 | \
+xargs -r0 rm -f
+
+exit 0
import os
-import errno
import sys
import SocketServer
import time
import collections
-import Queue
-import random
import signal
import logging
def send_message(self, msg):
#print "sending", msg
+ # TODO: sendall is not guaranteed to send everything
self.request.sendall(msg + self.EOM)
ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
return queue.SubmitJob(ops)
+ if method == luxi.REQ_SUBMIT_MANY_JOBS:
+ logging.info("Received multiple jobs")
+ jobs = []
+ for ops in args:
+ jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+ return queue.SubmitManyJobs(jobs)
+
elif method == luxi.REQ_CANCEL_JOB:
job_id = args
logging.info("Received job cancel request for %s", job_id)
"""Main function"""
options, args = ParseOptions()
- utils.debug = options.debug
utils.no_fork = True
if options.fork:
rpc.Init()
try:
# activate ip
- master_node = ssconf.SimpleConfigReader().GetMasterNode()
+ master_node = ssconf.SimpleStore().GetMasterNode()
if not rpc.RpcRunner.call_node_start_master(master_node, False, False):
logging.error("Can't activate master IP address")
import os
import sys
-import traceback
import SocketServer
-import errno
import logging
import signal
global queue_lock
options, args = ParseOptions()
- utils.debug = options.debug
if options.fork:
utils.CloseFDs()
for fname in (constants.SSL_CERT_FILE,):
if not os.path.isfile(fname):
print "config %s not there, will not run." % fname
- sys.exit(5)
+ sys.exit(constants.EXIT_NOTCLUSTER)
- try:
- port = utils.GetNodeDaemonPort()
- except errors.ConfigurationError, err:
- print "Cluster configuration incomplete: '%s'" % str(err)
- sys.exit(5)
+ port = utils.GetNodeDaemonPort()
dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
dirs.append((constants.LOG_OS_DIR, 0750))
parser.add_option("-f", "--foreground", dest="fork",
help="Don't detach from the current terminal",
default=True, action="store_false")
+ parser.add_option("-b", "--bind", dest="bind_address",
+ help="Bind address",
+ default="", metavar="ADDRESS")
options, args = parser.parse_args()
if len(args) != 0:
print >> sys.stderr, "Usage: %s [-d] [-p port]" % sys.argv[0]
- sys.exit(1)
+ sys.exit(constants.EXIT_FAILURE)
if options.ssl and not (options.ssl_cert and options.ssl_key):
print >> sys.stderr, ("For secure mode please provide "
- "--ssl-key and --ssl-cert arguments")
- sys.exit(1)
+ "--ssl-key and --ssl-cert arguments")
+ sys.exit(constants.EXIT_FAILURE)
return options, args
ssl_cert_path=options.ssl_cert)
except Exception, err:
sys.stderr.write("Can't load the SSL certificate/key: %s\n" % (err,))
- sys.exit(1)
+ sys.exit(constants.EXIT_FAILURE)
else:
ssl_params = None
utils.WritePidFile(constants.RAPI_PID)
try:
mainloop = daemon.Mainloop()
- server = RemoteApiHttpServer(mainloop, "", options.port,
+ server = RemoteApiHttpServer(mainloop, options.bind_address, options.port,
ssl_params=ssl_params, ssl_verify_peer=False,
request_executor_class=
JsonErrorRequestExecutor)
PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin
-# restart failed instances
+
+# Restart failed instances (every 5 minutes)
*/5 * * * * root [ -x @SBINDIR@/ganeti-watcher ] && @SBINDIR@/ganeti-watcher
+
+# Clean job archive (at 01:45 AM)
+45 1 * * * root [ -x @SBINDIR@/ganeti-cleaner ] && @SBINDIR@/ganeti-cleaner
# based on skeleton from Debian GNU/Linux
### BEGIN INIT INFO
# Provides: ganeti
-# Required-Start: $syslog $remote_fs xend
-# Required-Stop: $syslog $remote_fs xend
+# Required-Start: $syslog $remote_fs
+# Required-Stop: $syslog $remote_fs
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
-# Short-Description: Ganeti Xen Cluster Manager
-# Description: Ganeti Xen Cluster Manager
+# Short-Description: Ganeti Cluster Manager
+# Description: Ganeti Cluster Manager
### END INIT INFO
PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin
GANETI_DEFAULTS_FILE="@SYSCONFDIR@/default/ganeti"
-NODED_NAME="ganeti-noded"
-NODED="@PREFIX@/sbin/${NODED_NAME}"
-NODED_PID="${GANETIRUNDIR}/${NODED_NAME}.pid"
+NODED="ganeti-noded"
NODED_ARGS=""
-MASTERD_NAME="ganeti-masterd"
-MASTERD="@PREFIX@/sbin/${MASTERD_NAME}"
-MASTERD_PID="${GANETIRUNDIR}/${MASTERD_NAME}.pid"
+MASTERD="ganeti-masterd"
MASTERD_ARGS=""
-RAPI_NAME="ganeti-rapi"
-RAPI="@PREFIX@/sbin/${RAPI_NAME}"
-RAPI_PID="${GANETIRUNDIR}/${RAPI_NAME}.pid"
+RAPI="ganeti-rapi"
RAPI_ARGS=""
SCRIPTNAME="@SYSCONFDIR@/init.d/ganeti"
-test -f $NODED || exit 0
+test -f "@PREFIX@/sbin/$NODED" || exit 0
. /lib/lsb/init-functions
}
start_action() {
- # called as start_action daemon pidfile
+ # called as start_action daemon-name
local daemon="$1"; shift
- local pidfile="$1"; shift
log_action_begin_msg "$daemon"
- start-stop-daemon --start --quiet --exec "$daemon" --pidfile "$pidfile" \
+ start-stop-daemon --start --quiet \
+ --pidfile "${GANETIRUNDIR}/${daemon}.pid" \
+ --startas "@PREFIX@/sbin/$daemon" \
+ --oknodo \
-- "$@"
check_exitcode $?
}
stop_action() {
- # called as stop_action daemon pidfile
- log_action_begin_msg "$1"
+ # called as stop_action daemon-name
+ local daemon="$1"
+ log_action_begin_msg "$daemon"
start-stop-daemon --stop --quiet --oknodo \
- --retry 30 --pidfile "$2"
+ --retry 30 --pidfile "${GANETIRUNDIR}/${daemon}.pid"
check_exitcode $?
}
+maybe_do() {
+ requested="$1"; shift
+ action="$1"; shift
+ target="$1"
+ if [ -z "$requested" -o "$requested" = "$target" ]; then
+ $action "$@"
+ fi
+}
+
+if [ -n "$2" -a \
+ "$2" != "$NODED" -a \
+ "$2" != "$MASTERD" -a \
+ "$2" != "$RAPI" ]; then
+ log_failure_msg "Unknown daemon '$2' requested"
+ exit 1
+fi
case "$1" in
start)
- log_daemon_msg "Starting $DESC" "$NAME"
+ log_daemon_msg "Starting $DESC" "$2"
check_config
- start_action $NODED $NODED_PID $NODED_ARGS
- start_action $MASTERD $MASTERD_PID $MASTERD_ARGS
- start_action $RAPI $RAPI_PID $RAPI_ARGS
+ maybe_do "$2" start_action $NODED $NODED_ARGS
+ maybe_do "$2" start_action $MASTERD $MASTERD_ARGS
+ maybe_do "$2" start_action $RAPI $RAPI_ARGS
;;
stop)
- log_daemon_msg "Stopping $DESC" "$NAME"
- stop_action $RAPI $RAPI_PID
- stop_action $MASTERD $MASTERD_PID
- stop_action $NODED $NODED_PID
+ log_daemon_msg "Stopping $DESC" "$2"
+ maybe_do "$2" stop_action $RAPI
+ maybe_do "$2" stop_action $MASTERD
+ maybe_do "$2" stop_action $NODED
;;
restart|force-reload)
- log_daemon_msg "Reloading $DESC"
- stop_action $RAPI $RAPI_PID
- stop_action $MASTERD $MASTERD_PID
- stop_action $NODED $NODED_PID
+ log_daemon_msg "Reloading $DESC" "$2"
+ maybe_do "$2" stop_action $RAPI
+ maybe_do "$2" stop_action $MASTERD
+ maybe_do "$2" stop_action $NODED
check_config
- start_action $NODED $NODED_PID
- start_action $MASTERD $MASTERD_PID
- start_action $RAPI $RAPI_PID
+ maybe_do "$2" start_action $NODED $NODED_ARGS
+ maybe_do "$2" start_action $MASTERD $MASTERD_ARGS
+ maybe_do "$2" start_action $RAPI $RAPI_ARGS
;;
*)
log_success_msg "Usage: $SCRIPTNAME {start|stop|force-reload|restart}"
--- /dev/null
+#!/bin/bash
+
+# Copyright (C) 2009 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# This is an example ganeti hook that writes the instance mac addresses in the
+# node's /etc/ether file. It will pic up the first nic connected to the
+# TARGET_BRIDGE bridge, and write it down with the syntax "MAC INSTANCE_NAME".
+
+# The hook will also send a HUP signal the daemon whose PID is in
+# DAEMON_PID_FILE, so that it can load the new /etc/ethers file and use it.
+# This has been tested in conjunction with dnsmasq's dhcp implementation.
+
+# It will also remove any other occurrences for the same instance in the
+# aformentioned file. This hook supports the "instance-add", "instance-modify"
+# "instance-remove", and "instance-mirror-replace" ganeti post hook paths. To
+# install it add a symlink from those hooks' directories to where this file is
+# installed (with a mode which permits execution).
+
+# TARGET_BRIDGE: We'll only add the first nic which gets connected to this
+# bridge to /etc/ethers.
+TARGET_BRIDGE="br0"
+DAEMON_PID_FILE="/var/run/dnsmasq.pid"
+
+# In order to handle concurrent execution of this lock, we use the $LOCKFILE.
+# LOCKFILE_CREATE and LOCKFILE_REMOVE are the path names for the lockfile-progs
+# programs which we use as helpers.
+LOCKFILE="/var/lock/ganeti_ethers"
+LOCKFILE_CREATE="/usr/bin/lockfile-create"
+LOCKFILE_REMOVE="/usr/bin/lockfile-remove"
+
+hooks_path=$GANETI_HOOKS_PATH
+[ -n "$hooks_path" ] || exit 1
+instance=$GANETI_INSTANCE_NAME
+[ -n "$instance" ] || exit 1
+nic_count=$GANETI_INSTANCE_NIC_COUNT
+
+acquire_lockfile() {
+ $LOCKFILE_CREATE $LOCKFILE || exit 1
+ trap "$LOCKFILE_REMOVE $LOCKFILE" EXIT
+}
+
+update_ethers_from_new() {
+ chmod 644 /etc/ethers.new
+ mv /etc/ethers.new /etc/ethers
+ [ -f "$DAEMON_PID_FILE" ] && kill -HUP $(< $DAEMON_PID_FILE)
+}
+
+if [ "$hooks_path" = "instance-add" -o \
+ "$hooks_path" = "instance-modify" -o \
+ "$hooks_path" = "instance-mirror-replace" ]
+then
+ for i in $(seq 0 $((nic_count - 1)) ); do
+ bridge_var="GANETI_INSTANCE_NIC${i}_BRIDGE"
+ bridge=${!bridge_var}
+ if [ -n "$bridge" -a "$bridge" = "$TARGET_BRIDGE" ]; then
+ mac_var="GANETI_INSTANCE_NIC${i}_MAC"
+ mac=${!mac_var}
+ acquire_lockfile
+ cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/;
+ END {print \"$mac\t$instance\"}" > /etc/ethers.new
+ update_ethers_from_new
+ break
+ fi
+ done
+fi
+if [ "$hooks_path" = "instance-remove" -o \
+ \( "$hooks_path" = "instance-modify" -a "$nic_count" -eq 0 \) ]; then
+ acquire_lockfile
+ cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/" \
+ > /etc/ethers.new
+ update_ethers_from_new
+fi
+
be left
-All informations about the cluster is passed using environment
+All information about the cluster is passed using environment
variables. Different operations will have sligthly different
environments, but most of the variables are common.
nodes
dictionary with the data for the nodes in the cluster, indexed by
- the node name; the dict contains:
+ the node name; the dict contains [*]_ :
total_disk
the total disk size of this node (mebibytes)
or ``offline`` flags set. More details about these of node status
flags is available in the manpage *ganeti(7)*.
+.. [*] Note that no run-time data is present for offline or drained nodes;
+ this means the tags total_memory, reserved_memory, free_memory, total_disk,
+ free_disk, total_cpus, i_pri_memory and i_pri_up memory will be absent
-Respone message
-~~~~~~~~~~~~~~~
+
+Response message
+~~~~~~~~~~~~~~~~
The response message is much more simple than the input one. It is
also a dict having three keys:
success
- a boolean value denoting if the allocation was successfull or not
+ a boolean value denoting if the allocation was successful or not
info
a string with information from the scripts; if the allocation fails,
# 02110-1301, USA.
-"""Functions used by the node daemon"""
+"""Functions used by the node daemon
+
+@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
+ the L{UploadFile} function
+
+"""
import os
utils.RemoveFile(full_name)
+def _BuildUploadFileList():
+ """Build the list of allowed upload files.
+
+ This is abstracted so that it's built only once at module import time.
+
+ """
+ return frozenset([
+ constants.CLUSTER_CONF_FILE,
+ constants.ETC_HOSTS,
+ constants.SSH_KNOWN_HOSTS_FILE,
+ constants.VNC_PASSWORD_FILE,
+ ])
+
+
+_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
+
+
def JobQueuePurge():
"""Removes job queue files and archived jobs.
master_netdev = cfg.GetMasterNetdev()
master_ip = cfg.GetMasterIP()
master_node = cfg.GetMasterNode()
- except errors.ConfigurationError, err:
+ except errors.ConfigurationError:
logging.exception("Cluster configuration incomplete")
return (None, None, None)
return (master_netdev, master_ip, master_node)
def GetNodeInfo(vgname, hypervisor_type):
- """Gives back a hash with different informations about the node.
+ """Gives back a hash with different information about the node.
@type vgname: C{string}
@param vgname: the name of the volume group to ask for disk space information
try:
names = hypervisor.GetHypervisor(hname).ListInstances()
results.extend(names)
- except errors.HypervisorError, err:
+ except errors.HypervisorError:
logging.exception("Error enumerating instances for hypevisor %s", hname)
raise
def GetInstanceInfo(instance, hname):
- """Gives back the informations about an instance as a dictionary.
+ """Gives back the information about an instance as a dictionary.
@type instance: string
@param instance: the instance name
def _GetVGInfo(vg_name):
- """Get informations about the volume group.
+ """Get information about the volume group.
@type vg_name: str
@param vg_name: the volume group which we query
# test every 10secs for 2min
time.sleep(1)
- for dummy in range(11):
+ for _ in range(11):
if instance.name not in GetInstanceList([hv_name]):
break
time.sleep(10)
msg = "Failed to accept instance"
logging.exception(msg)
return (False, '%s: %s' % (msg, err))
- return (True, "Accept successfull")
+ return (True, "Accept successful")
def FinalizeMigration(instance, info, success):
msg = "Failed to migrate instance"
logging.exception(msg)
return (False, "%s: %s" % (msg, err))
- return (True, "Migration successfull")
+ return (True, "Migration successful")
def BlockdevCreate(disk, size, owner, on_primary, info):
def BlockdevShutdown(disk):
"""Shut down a block device.
- First, if the device is assembled (Attach() is successfull), then
+ First, if the device is assembled (Attach() is successful), then
the device is shutdown. Then the children of the device are
shutdown.
def _RecursiveFindBD(disk):
"""Check if a device is activated.
- If so, return informations about the real device.
+ If so, return information about the real device.
@type disk: L{objects.Disk}
@param disk: the disk object we need to find
def BlockdevFind(disk):
"""Check if a device is activated.
- If it is, return informations about the real device.
+ If it is, return information about the real device.
@type disk: L{objects.Disk}
@param disk: the disk to find
file_name)
return False
- allowed_files = [
- constants.CLUSTER_CONF_FILE,
- constants.ETC_HOSTS,
- constants.SSH_KNOWN_HOSTS_FILE,
- constants.VNC_PASSWORD_FILE,
- ]
-
- if file_name not in allowed_files:
+ if file_name not in _ALLOWED_UPLOAD_FILES:
logging.error("Filename passed to UploadFile not in allowed"
" upload targets: '%s'", file_name)
return False
# but we don't have the owner here - maybe parse from existing
# cache? for now, we only lose lvm data when we rename, which
# is less critical than DRBD or MD
- except errors.BlockDeviceError, err:
+ except errors.BlockDeviceError:
logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
result = False
return result
@param file_storage_dir: the directory we should cleanup
@rtype: tuple (success,)
@return: tuple of one element, C{success}, denoting
- whether the operation was successfull
+ whether the operation was successful
"""
file_storage_dir = _TransformFileStorageDir(file_storage_dir)
# deletes dir only if empty, otherwise we want to return False
try:
os.rmdir(file_storage_dir)
- except OSError, err:
+ except OSError:
logging.exception("Cannot remove file storage directory '%s'",
file_storage_dir)
result = False,
if os.path.isdir(old_file_storage_dir):
try:
os.rename(old_file_storage_dir, new_file_storage_dir)
- except OSError, err:
+ except OSError:
logging.exception("Cannot rename '%s' to '%s'",
old_file_storage_dir, new_file_storage_dir)
result = False,
dir_name = "%s/%s" % (self._BASE_DIR, subdir)
try:
dir_contents = utils.ListVisibleFiles(dir_name)
- except OSError, err:
+ except OSError:
# FIXME: must log output in case of failures
return rr
fdata = "%s %s %s\n" % (str(owner), state, iv_name)
try:
utils.WriteFile(fpath, data=fdata)
- except EnvironmentError, err:
+ except EnvironmentError:
logging.exception("Can't update bdev cache for %s", dev_path)
@classmethod
fpath = cls._ConvertPath(dev_path)
try:
utils.RemoveFile(fpath)
- except EnvironmentError, err:
+ except EnvironmentError:
logging.exception("Can't update bdev cache for %s", dev_path)
"""Remove this device.
This makes sense only for some of the device types: LV and file
- storeage. Also note that if the device can't attach, the removal
+ storage. Also note that if the device can't attach, the removal
can't be completed.
"""
def Assemble(self):
"""Assemble the device.
- We alway run `lvchange -ay` on the LV to ensure it's active before
+ We always run `lvchange -ay` on the LV to ensure it's active before
use, as there were cases when xenvg was not active after boot
(also possibly after disk issues).
If sync_percent is None, it means all is ok
- If estimated_time is None, it means we can't esimate
+ If estimated_time is None, it means we can't estimate
the time needed, otherwise it's the time left in seconds.
We set the is_degraded parameter to True on two conditions:
network not connected or local disk missing.
- We compute the ldisk parameter based on wheter we have a local
+ We compute the ldisk parameter based on whether we have a local
disk or not.
@rtype: tuple
ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor)
timeout_limit = time.time() + self._NET_RECONFIG_TIMEOUT
- sleep_time = 0.100 # we start the retry time at 100 miliseconds
+ sleep_time = 0.100 # we start the retry time at 100 milliseconds
while time.time() < timeout_limit:
status = self.GetProcStatus()
if status.is_standalone:
break
# retry the disconnect, it seems possible that due to a
# well-time disconnect on the peer, my disconnect command might
- # be ingored and forgotten
+ # be ignored and forgotten
ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor) or \
ever_disconnected
time.sleep(sleep_time)
def Shutdown(self):
"""Shutdown the device.
- This is a no-op for the file type, as we don't deacivate
+ This is a no-op for the file type, as we don't deactivate
the file on shutdown.
"""
"""
(fd, tmp_file_name) = tempfile.mkstemp(dir=os.path.dirname(file_name))
try:
- # Set permissions before writing key
- os.chmod(tmp_file_name, 0600)
-
- result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
- "-days", str(validity), "-nodes", "-x509",
- "-keyout", tmp_file_name, "-out", tmp_file_name,
- "-batch"])
- if result.failed:
- raise errors.OpExecError("Could not generate SSL certificate, command"
- " %s had exitcode %s and error message %s" %
- (result.cmd, result.exit_code, result.output))
-
- # Make read-only
- os.chmod(tmp_file_name, 0400)
-
- os.rename(tmp_file_name, file_name)
+ try:
+ # Set permissions before writing key
+ os.chmod(tmp_file_name, 0600)
+
+ result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
+ "-days", str(validity), "-nodes", "-x509",
+ "-keyout", tmp_file_name, "-out", tmp_file_name,
+ "-batch"])
+ if result.failed:
+ raise errors.OpExecError("Could not generate SSL certificate, command"
+ " %s had exitcode %s and error message %s" %
+ (result.cmd, result.exit_code, result.output))
+
+ # Make read-only
+ os.chmod(tmp_file_name, 0400)
+
+ os.rename(tmp_file_name, file_name)
+ finally:
+ utils.RemoveFile(tmp_file_name)
finally:
- utils.RemoveFile(tmp_file_name)
+ os.close(fd)
def _InitGanetiServerSetup():
def InitCluster(cluster_name, mac_prefix, def_bridge,
master_netdev, file_storage_dir, candidate_pool_size,
secondary_ip=None, vg_name=None, beparams=None, hvparams=None,
- enabled_hypervisors=None, default_hypervisor=None):
+ enabled_hypervisors=None, default_hypervisor=None,
+ modify_etc_hosts=True):
"""Initialise the cluster.
@type candidate_pool_size: int
if config.ConfigWriter.IsCluster():
raise errors.OpPrereqError("Cluster is already initialised")
+ if not enabled_hypervisors:
+ raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+ " least one member")
+ invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
+ if invalid_hvs:
+ raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+ " entries: %s" % invalid_hvs)
+
hostname = utils.HostInfo()
if hostname.ip.startswith("127."):
f.close()
sshkey = sshline.split(" ")[1]
- utils.AddHostToEtcHosts(hostname.name)
+ if modify_etc_hosts:
+ utils.AddHostToEtcHosts(hostname.name)
+
_InitSSHSetup()
# init of cluster config file
beparams={constants.BEGR_DEFAULT: beparams},
hvparams=hvparams,
candidate_pool_size=candidate_pool_size,
+ modify_etc_hosts=modify_etc_hosts,
)
master_node_config = objects.Node(name=hostname.name,
primary_ip=hostname.ip,
@type node_list: list
@param node_list: the list of nodes to query for master info; the current
- node wil be removed if it is in the list
+ node will be removed if it is in the list
@rtype: list
@return: list of (node, votes)
from optparse import (OptionParser, make_option, TitledHelpFormatter,
Option, OptionValueError)
+
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
"SubmitOpCode", "GetClient",
"cli_option", "ikv_option", "keyval_option",
]
+
def _ExtractTagsObject(opts, args):
"""Extract the tag type object.
result = list(result)
result.sort()
for tag in result:
- print tag
+ ToStdout(tag)
def AddTags(opts, args):
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
- This function parses the arguements and returns the function which
+ This function parses the arguments and returns the function which
must be executed together with its (modified) arguments.
@param argv: the command line
binary = argv[0].split("/")[-1]
if len(argv) > 1 and argv[1] == "--version":
- print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
+ ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
# Quit right away. That way we don't have to care about this special
# argument. optparse.py does it the same.
sys.exit(0)
# let's do a nice thing
sortedcmds = commands.keys()
sortedcmds.sort()
- print ("Usage: %(bin)s {command} [options...] [argument...]"
- "\n%(bin)s <command> --help to see details, or"
- " man %(bin)s\n" % {"bin": binary})
+
+ ToStdout("Usage: %s {command} [options...] [argument...]", binary)
+ ToStdout("%s <command> --help to see details, or man %s", binary, binary)
+ ToStdout("")
+
# compute the max line length for cmd + usage
mlen = max([len(" %s" % cmd) for cmd in commands])
mlen = min(60, mlen) # should not get here...
+
# and format a nice command list
- print "Commands:"
+ ToStdout("Commands:")
for cmd in sortedcmds:
cmdstr = " %s" % (cmd,)
help_text = commands[cmd][4]
- help_lines = textwrap.wrap(help_text, 79-3-mlen)
- print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
+ help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
+ ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
for line in help_lines:
- print "%-*s %s" % (mlen, "", line)
- print
+ ToStdout("%-*s %s", mlen, "", line)
+
+ ToStdout("")
+
return None, None, None
# get command, unalias it, and look it up in commands
options, args = parser.parse_args()
if nargs is None:
if len(args) != 0:
- print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
+ ToStderr("Error: Command %s expects no arguments", cmd)
return None, None, None
elif nargs < 0 and len(args) != -nargs:
- print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
- (cmd, -nargs))
+ ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
return None, None, None
elif nargs >= 0 and len(args) < nargs:
- print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
- (cmd, nargs))
+ ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
return None, None, None
return func, options, args
choices = [('y', True, 'Perform the operation'),
('n', False, 'Do not perform the operation')]
if not choices or not isinstance(choices, list):
- raise errors.ProgrammerError("Invalid choiches argument to AskUser")
+ raise errors.ProgrammerError("Invalid choices argument to AskUser")
for entry in choices:
if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
- raise errors.ProgrammerError("Invalid choiches element to AskUser")
+ raise errors.ProgrammerError("Invalid choices element to AskUser")
answer = choices[-1][1]
new_text = []
feedback_fn(log_entry[1:])
else:
encoded = utils.SafeEncode(message)
- print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
+ ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
stderr_logging=True, program=binary)
- utils.debug = options.debug
-
if old_cmdline:
logging.info("run with arguments '%s'", old_cmdline)
else:
except (errors.GenericError, luxi.ProtocolError,
JobSubmittedException), err:
result, err_msg = FormatError(err)
- logging.exception("Error durring command processing")
+ logging.exception("Error during command processing")
ToStderr(err_msg)
return result
cl = GetClient()
self.cl = cl
self.verbose = verbose
+ self.jobs = []
def QueueJob(self, name, *ops):
- """Submit a job for execution.
+ """Record a job for later submit.
@type name: string
@param name: a description of the job, will be used in WaitJobSet
"""
- job_id = SendJob(ops, cl=self.cl)
- self.queue.append((job_id, name))
+ self.queue.append((name, ops))
+
+ def SubmitPending(self):
+ """Submit all pending jobs.
+
+ """
+ results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+ for ((status, data), (name, _)) in zip(results, self.queue):
+ self.jobs.append((status, data, name))
def GetResults(self):
"""Wait for and return the results of all jobs.
there will be the error message
"""
+ if not self.jobs:
+ self.SubmitPending()
results = []
if self.verbose:
- ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
- for jid, name in self.queue:
+ ok_jobs = [row[1] for row in self.jobs if row[0]]
+ if ok_jobs:
+ ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
+ for submit_status, jid, name in self.jobs:
+ if not submit_status:
+ ToStderr("Failed to submit job for %s: %s", name, jid)
+ results.append((False, jid))
+ continue
if self.verbose:
ToStdout("Waiting for job %s for %s...", jid, name)
try:
if wait:
return self.GetResults()
else:
- for jid, name in self.queue:
- ToStdout("%s: %s", jid, name)
+ if not self.jobs:
+ self.SubmitPending()
+ for status, result, name in self.jobs:
+ if status:
+ ToStdout("%s: %s", result, name)
+ else:
+ ToStderr("Failure for %s: %s", name, result)
import os
import os.path
import time
-import tempfile
import re
import platform
import logging
import copy
-import random
from ganeti import ssh
from ganeti import utils
from ganeti import locking
from ganeti import constants
from ganeti import objects
-from ganeti import opcodes
from ganeti import serializer
from ganeti import ssconf
def __init__(self, processor, op, context, rpc):
"""Constructor for LogicalUnit.
- This needs to be overriden in derived classes in order to check op
+ This needs to be overridden in derived classes in order to check op
validity.
"""
CheckPrereq, doing these separate is better because:
- ExpandNames is left as as purely a lock-related function
- - CheckPrereq is run after we have aquired locks (and possible
+ - CheckPrereq is run after we have acquired locks (and possible
waited for them)
The function is allowed to change the self.op attribute so that
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
memory, vcpus, nics, disk_template, disks,
- bep, hvp, hypervisor):
+ bep, hvp, hypervisor_name):
"""Builds instance related env variables for hooks
This builds the hook environment from individual variables.
@param nics: list of tuples (ip, bridge, mac) representing
the NICs the instance has
@type disk_template: string
- @param disk_template: the distk template of the instance
+ @param disk_template: the disk template of the instance
@type disks: list
@param disks: the list of (size, mode) pairs
@type bep: dict
@param bep: the backend parameters for the instance
@type hvp: dict
@param hvp: the hypervisor parameters for the instance
- @type hypervisor: string
- @param hypervisor: the hypervisor for the instance
+ @type hypervisor_name: string
+ @param hypervisor_name: the hypervisor for the instance
@rtype: dict
@return: the hook environment for this instance
"INSTANCE_MEMORY": memory,
"INSTANCE_VCPUS": vcpus,
"INSTANCE_DISK_TEMPLATE": disk_template,
- "INSTANCE_HYPERVISOR": hypervisor,
+ "INSTANCE_HYPERVISOR": hypervisor_name,
}
if nics:
'disks': [(disk.size, disk.mode) for disk in instance.disks],
'bep': bep,
'hvp': hvp,
- 'hypervisor': instance.hypervisor,
+ 'hypervisor_name': instance.hypervisor,
}
if override:
args.update(override)
def _CheckInstanceBridgesExist(lu, instance):
- """Check that the brigdes needed by an instance exist.
+ """Check that the bridges needed by an instance exist.
"""
- # check bridges existance
+ # check bridges existence
brlist = [nic.bridge for nic in instance.nics]
result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
result.Raise()
This checks whether the cluster is empty.
- Any errors are signalled by raising errors.OpPrereqError.
+ Any errors are signaled by raising errors.OpPrereqError.
"""
master = self.cfg.GetMasterNode()
Test list:
- compares ganeti version
- - checks vg existance and size > 20G
+ - checks vg existence and size > 20G
- checks config file checksum
- checks ssh to other nodes
if bep[constants.BE_AUTO_BALANCE]:
needed_mem += bep[constants.BE_MEMORY]
if nodeinfo['mfree'] < needed_mem:
- feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
+ feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
" failovers should node %s fail" % (node, prinode))
bad = True
return bad
def BuildHooksEnv(self):
"""Build hooks env.
- Cluster-Verify hooks just rone in the post phase and their failure makes
+ Cluster-Verify hooks just ran in the post phase and their failure makes
the output be logged in the verify output and the verification to fail.
"""
return not bad
def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
- """Analize the post-hooks' result
+ """Analyze the post-hooks' result
This method analyses the hook result, handles it, and sends some
nicely-formatted feedback back to the user.
node_lvs = self.rpc.call_volume_list(nodes, vg_name)
- to_act = set()
for node in nodes:
# node_volume
lvs = node_lvs[node]
@type disk: L{objects.Disk}
@param disk: the disk to check
- @rtype: booleean
+ @rtype: boolean
@return: boolean indicating whether a LD_LV dev_type was found or not
"""
if self.op.enabled_hypervisors is not None:
self.hv_list = self.op.enabled_hypervisors
+ if not self.hv_list:
+ raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+ " least one member")
+ invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
+ if invalid_hvs:
+ raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+ " entries: %s" % invalid_hvs)
else:
self.hv_list = cluster.enabled_hypervisors
- it does not have primary or secondary instances
- it's not the master
- Any errors are signalled by raising errors.OpPrereqError.
+ Any errors are signaled by raising errors.OpPrereqError.
"""
node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
- it is resolvable
- its parameters (single/dual homed) matches the cluster
- Any errors are signalled by raising errors.OpPrereqError.
+ Any errors are signaled by raising errors.OpPrereqError.
"""
node_name = self.op.node_name
raise errors.OpPrereqError("The master has a private ip but the"
" new node doesn't have one")
- # checks reachablity
+ # checks reachability
if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("Node not reachable by ping")
" new node: %s" % msg)
# Add node to our /etc/hosts, and add key to known_hosts
- utils.AddHostToEtcHosts(new_node.name)
+ if self.cfg.GetClusterInfo().modify_etc_hosts:
+ utils.AddHostToEtcHosts(new_node.name)
if new_node.secondary_ip != new_node.primary_ip:
result = self.rpc.call_node_has_ip_address(new_node.name,
"""
node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+ if (self.op.master_candidate is not None or
+ self.op.drained is not None or
+ self.op.offline is not None):
+ # we can't change the master's node flags
+ if self.op.node_name == self.cfg.GetMasterNode():
+ raise errors.OpPrereqError("The master role can be changed"
+ " only via masterfailover")
+
if ((self.op.master_candidate == False or self.op.offline == True or
self.op.drained == True) and node.master_candidate):
- # we will demote the node from master_candidate
- if self.op.node_name == self.cfg.GetMasterNode():
- raise errors.OpPrereqError("The master node has to be a"
- " master candidate, online and not drained")
cp_size = self.cfg.GetClusterInfo().candidate_pool_size
num_candidates, _ = self.cfg.GetMasterCandidateStats()
if num_candidates <= cp_size:
"master": cluster.master_node,
"default_hypervisor": cluster.default_hypervisor,
"enabled_hypervisors": cluster.enabled_hypervisors,
- "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
- for hypervisor in cluster.enabled_hypervisors]),
+ "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
+ for hypervisor_name in cluster.enabled_hypervisors]),
"beparams": cluster.beparams,
"candidate_pool_size": cluster.candidate_pool_size,
"default_bridge": cluster.default_bridge,
"master_netdev": cluster.master_netdev,
"volume_group_name": cluster.volume_group_name,
"file_storage_dir": cluster.file_storage_dir,
+ "tags": list(cluster.GetTags()),
}
return result
"""Start the disks of an instance.
"""
- disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
+ disks_ok, _ = _AssembleInstanceDisks(lu, instance,
ignore_secondaries=force)
if not disks_ok:
_ShutdownInstanceDisks(lu, instance)
_CheckNodeOnline(self, instance.primary_node)
bep = self.cfg.GetClusterInfo().FillBE(instance)
- # check bridges existance
+ # check bridges existence
_CheckInstanceBridgesExist(self, instance)
remote_info = self.rpc.call_instance_info(instance.primary_node,
_CheckNodeOnline(self, instance.primary_node)
- # check bridges existance
+ # check bridges existence
_CheckInstanceBridgesExist(self, instance)
def Exec(self, feedback_fn):
self.LogInfo("Not checking memory on the secondary node as"
" instance will not be started")
- # check bridge existance
+ # check bridge existence
brlist = [nic.bridge for nic in instance.nics]
result = self.rpc.call_bridges_exist(target_node, brlist)
result.Raise()
logging.info("Starting instance %s on node %s",
instance.name, target_node)
- disks_ok, dummy = _AssembleInstanceDisks(self, instance,
+ disks_ok, _ = _AssembleInstanceDisks(self, instance,
ignore_secondaries=True)
if not disks_ok:
_ShutdownInstanceDisks(self, instance)
instance.name, i_be[constants.BE_MEMORY],
instance.hypervisor)
- # check bridge existance
+ # check bridge existence
brlist = [nic.bridge for nic in instance.nics]
result = self.rpc.call_bridges_exist(target_node, brlist)
if result.failed or not result.data:
if len(secondary_nodes) != 0:
raise errors.ProgrammerError("Wrong template configuration")
- names = _GenerateUniqueNames(lu, [".disk%d" % i
+ names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)])
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
[primary_node, remote_node] * len(disk_info), instance_name)
names = []
- for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+ for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
names.append(lv_prefix + "_data")
names.append(lv_prefix + "_meta")
if not utils.IsValidMac(mac.lower()):
raise errors.OpPrereqError("Invalid MAC address specified: %s" %
mac)
+ else:
+ # or validate/reserve the current one
+ if self.cfg.IsMacInUse(mac):
+ raise errors.OpPrereqError("MAC address %s already in use"
+ " in cluster" % mac)
+
# bridge verification
bridge = nic.get("bridge", None)
if bridge is None:
disks=[(d["size"], d["mode"]) for d in self.disks],
bep=self.be_full,
hvp=self.hv_full,
- hypervisor=self.op.hypervisor,
+ hypervisor_name=self.op.hypervisor,
))
nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
logging.debug("Allocated minors %s" % (minors,))
self.proc.LogStep(4, steps_total, "changing drbd configuration")
for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
- size = dev.size
info("activating a new drbd on %s for disk/%d" % (new_node, idx))
# create new devices on new_node; note that we create two IDs:
# one without port, so the drbd will be activated without
This only checks the instance list against the existing names.
"""
- force = self.force = self.op.force
+ self.force = self.op.force
# checking the new params on the primary/secondary nodes
# remove it from its current node. In the future we could fix this by:
# - making a tasklet to search (share-lock all), then create the new one,
# then one to remove, after
- # - removing the removal operation altoghether
+ # - removing the removal operation altogether
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
def DeclareLocks(self, level):
"master_candidate": ninfo.master_candidate,
}
- if not ninfo.offline:
+ if not (ninfo.offline or ninfo.drained):
nresult.Raise()
if not isinstance(nresult.data, dict):
raise errors.OpExecError("Can't get data for node %s" % nname)
"""
if call_fn is None:
call_fn = self.lu.rpc.call_iallocator_runner
- data = self.in_text
result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
result.Raise()
data = self._config_data
seen_lids = []
seen_pids = []
+
+ # global cluster checks
+ if not data.cluster.enabled_hypervisors:
+ result.append("enabled hypervisors list doesn't have any entries")
+ invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
+ if invalid_hvs:
+ result.append("enabled hypervisors contains invalid entries: %s" %
+ invalid_hvs)
+
+ if data.cluster.master_node not in data.nodes:
+ result.append("cluster has invalid primary node '%s'" %
+ data.cluster.master_node)
+
+ # per-instance checks
for instance_name in data.instances:
instance = data.instances[instance_name]
if instance.primary_node not in data.nodes:
def _AppendUsedPorts(instance_name, disk, used):
duplicates = []
if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
- nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
- for node, port in ((nodeA, minorA), (nodeB, minorB)):
+ node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
+ for node, port in ((node_a, minor_a), (node_b, minor_b)):
assert node in used, ("Node '%s' of instance '%s' not found"
" in node list" % (node, instance_name))
if port in used[node]:
self._config_data.instances.keys())
def _UnlockedGetInstanceInfo(self, instance_name):
- """Returns informations about an instance.
+ """Returns information about an instance.
This function is for internal use, when the config lock is already held.
@locking.ssynchronized(_config_lock, shared=1)
def GetInstanceInfo(self, instance_name):
- """Returns informations about an instance.
+ """Returns information about an instance.
- It takes the information from the configuration file. Other informations of
+ It takes the information from the configuration file. Other information of
an instance are taken from the live systems.
@param instance_name: name of the instance, e.g.
constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
}
- @locking.ssynchronized(_config_lock)
- def InitConfig(self, version, cluster_config, master_node_config):
- """Create the initial cluster configuration.
-
- It will contain the current node, which will also be the master
- node, and no instances.
-
- @type version: int
- @param version: Configuration version
- @type cluster_config: objects.Cluster
- @param cluster_config: Cluster configuration
- @type master_node_config: objects.Node
- @param master_node_config: Master node configuration
-
- """
- nodes = {
- master_node_config.name: master_node_config,
- }
-
- self._config_data = objects.ConfigData(version=version,
- cluster=cluster_config,
- nodes=nodes,
- instances={},
- serial_no=1)
- self._WriteConfig()
-
@locking.ssynchronized(_config_lock, shared=1)
def GetVGName(self):
"""Return the volume group name.
@locking.ssynchronized(_config_lock, shared=1)
def GetClusterInfo(self):
- """Returns informations about the cluster
+ """Returns information about the cluster
@rtype: L{objects.Cluster}
@return: the cluster object
# common exit codes
EXIT_SUCCESS = 0
EXIT_FAILURE = 1
+EXIT_NOTCLUSTER = 5
EXIT_NOTMASTER = 11
EXIT_NODESETUP_ERROR = 12
EXIT_CONFIRMATION = 13 # need user confirmation
HV_ROOT_PATH = "root_path"
HV_SERIAL_CONSOLE = "serial_console"
HV_USB_MOUSE = "usb_mouse"
+HV_DEVICE_MODEL = "device_model"
HVS_PARAMETER_TYPES = {
HV_BOOT_ORDER: VTYPE_STRING,
HV_ROOT_PATH: VTYPE_STRING,
HV_SERIAL_CONSOLE: VTYPE_BOOL,
HV_USB_MOUSE: VTYPE_STRING,
+ HV_DEVICE_MODEL: VTYPE_STRING,
}
HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
JOB_STATUS_SUCCESS = "success"
JOB_STATUS_ERROR = "error"
+# OpCode status
+# not yet finalized
OP_STATUS_QUEUED = "queued"
OP_STATUS_WAITLOCK = "waiting"
OP_STATUS_CANCELING = "canceling"
OP_STATUS_RUNNING = "running"
+# finalized
OP_STATUS_CANCELED = "canceled"
OP_STATUS_SUCCESS = "success"
OP_STATUS_ERROR = "error"
+OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
+ OP_STATUS_SUCCESS,
+ OP_STATUS_ERROR])
# Execution log types
ELOG_MESSAGE = "message"
ELOG_PROGRESS = "progress"
-# Temporary RAPI constants until we have cluster parameters
-RAPI_ENABLE = True
RAPI_PORT = 5080
# max dynamic devices
HV_VNC_BIND_ADDRESS: '0.0.0.0',
HV_ACPI: True,
HV_PAE: True,
+ HV_KERNEL_PATH: "/usr/lib/xen/boot/hvmloader",
+ HV_DEVICE_MODEL: "/usr/lib/xen/bin/qemu-dm",
},
HT_KVM: {
HV_KERNEL_PATH: "/boot/vmlinuz-2.6-kvmU",
"""
for owner in self._signal_wait:
- owner.OnSignal(signal.SIGCHLD)
+ owner.OnSignal(signum)
def RegisterIO(self, owner, fd, condition):
"""Registers a receiver for I/O notifications
# TODO: event_poll/event_check/override
if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
event_poll = select.POLLOUT
- event_check = select.POLLOUT
elif op == SOCKOP_RECV:
event_poll = select.POLLIN
- event_check = select.POLLIN | select.POLLPRI
elif op == SOCKOP_SHUTDOWN:
event_poll = None
- event_check = None
# The timeout is only used when OpenSSL requests polling for a condition.
# It is not advisable to have no timeout for shutdown.
def HasMessageBody(self):
"""Checks whether the HTTP message contains a body.
- Can be overriden by subclasses.
+ Can be overridden by subclasses.
"""
return bool(self._msg.body)
def ParseStartLine(self, start_line):
"""Parses the start line of a message.
- Must be overriden by subclass.
+ Must be overridden by subclass.
@type start_line: string
@param start_line: Start line string
"""
import logging
-import time
import re
import base64
import binascii
-from ganeti import constants
from ganeti import utils
from ganeti import http
def GetAuthRealm(self, req):
"""Returns the authentication realm for a request.
- MAY be overriden by a subclass, which then can return different realms for
+ MAY be overridden by a subclass, which then can return different realms for
different paths. Returning "None" means no authentication is needed for a
request.
def Authenticate(self, req, user, password):
"""Checks the password for a user.
- This function MUST be overriden by a subclass.
+ This function MUST be overridden by a subclass.
"""
raise NotImplementedError()
"""
-import BaseHTTPServer
-import cgi
-import logging
-import OpenSSL
import os
import select
import socket
-import sys
-import time
-import signal
import errno
import threading
-from ganeti import constants
-from ganeti import serializer
from ganeti import workerpool
-from ganeti import utils
from ganeti import http
import time
import signal
-from ganeti import constants
-from ganeti import serializer
-from ganeti import utils
from ganeti import http
# As soon as too many children run, we'll not respond to new
# requests. The real solution would be to add a timeout for children
# and killing them after some time.
- pid, status = os.waitpid(0, 0)
+ pid, _ = os.waitpid(0, 0)
except os.error:
pid = None
if pid and pid in self._children:
def PreHandleRequest(self, req):
"""Called before handling a request.
- Can be overriden by a subclass.
+ Can be overridden by a subclass.
"""
def HandleRequest(self, req):
"""Handles a request.
- Must be overriden by subclass.
+ Must be overridden by subclass.
"""
raise NotImplementedError()
import os
import os.path
-import re
from ganeti import utils
from ganeti import constants
@return: list of (name, id, memory, vcpus, state, time spent)
"""
- for dummy in range(5):
+ for _ in range(5):
result = utils.RunCmd(["xm", "list"])
if not result.failed:
break
constants.HV_NIC_TYPE,
constants.HV_PAE,
constants.HV_VNC_BIND_ADDRESS,
+ constants.HV_KERNEL_PATH,
+ constants.HV_DEVICE_MODEL,
]
@classmethod
" be an absolute path or None, not %s" %
iso_path)
+ if not hvparams[constants.HV_KERNEL_PATH]:
+ raise errors.HypervisorError("Need a kernel for the instance")
+
+ if not os.path.isabs(hvparams[constants.HV_KERNEL_PATH]):
+ raise errors.HypervisorError("The kernel path must be an absolute path")
+
+ if not hvparams[constants.HV_DEVICE_MODEL]:
+ raise errors.HypervisorError("Need a device model for the instance")
+
+ if not os.path.isabs(hvparams[constants.HV_DEVICE_MODEL]):
+ raise errors.HypervisorError("The device model must be an absolute path")
+
+
def ValidateParameters(self, hvparams):
"""Check the given parameters for validity.
" an existing regular file, not %s" %
iso_path)
+ kernel_path = hvparams[constants.HV_KERNEL_PATH]
+ if not os.path.isfile(kernel_path):
+ raise errors.HypervisorError("Instance kernel '%s' not found or"
+ " not a file" % kernel_path)
+
+ device_model = hvparams[constants.HV_DEVICE_MODEL]
+ if not os.path.isfile(device_model):
+ raise errors.HypervisorError("Device model '%s' not found or"
+ " not a file" % device_model)
+
@classmethod
def _WriteConfigFile(cls, instance, block_devices):
"""Create a Xen 3.1 HVM config file.
config = StringIO()
config.write("# this is autogenerated by Ganeti, please do not edit\n#\n")
- config.write("kernel = '/usr/lib/xen/boot/hvmloader'\n")
+
+ # kernel handling
+ kpath = hvp[constants.HV_KERNEL_PATH]
+ config.write("kernel = '%s'\n" % kpath)
+
config.write("builder = 'hvm'\n")
config.write("memory = %d\n" % instance.beparams[constants.BE_MEMORY])
config.write("vcpus = %d\n" % instance.beparams[constants.BE_VCPUS])
config.write("name = '%s'\n" % instance.name)
- if instance.hvparams[constants.HV_PAE]:
+ if hvp[constants.HV_PAE]:
config.write("pae = 1\n")
else:
config.write("pae = 0\n")
- if instance.hvparams[constants.HV_ACPI]:
+ if hvp[constants.HV_ACPI]:
config.write("acpi = 1\n")
else:
config.write("acpi = 0\n")
config.write("apic = 1\n")
- arch = os.uname()[4]
- if '64' in arch:
- config.write("device_model = '/usr/lib64/xen/bin/qemu-dm'\n")
- else:
- config.write("device_model = '/usr/lib/xen/bin/qemu-dm'\n")
+ config.write("device_model = '%s'\n" % hvp[constants.HV_DEVICE_MODEL])
config.write("boot = '%s'\n" % hvp[constants.HV_BOOT_ORDER])
config.write("sdl = 0\n")
config.write("usb = 1\n")
class _QueuedOpCode(object):
- """Encasulates an opcode object.
+ """Encapsulates an opcode object.
@ivar log: holds the execution log and consists of tuples
of the form C{(log_serial, timestamp, level, message)}
@ivar stop_timestamp: timestamp for the end of the execution
"""
+ __slots__ = ["input", "status", "result", "log",
+ "start_timestamp", "end_timestamp",
+ "__weakref__"]
+
def __init__(self, op):
"""Constructor for the _QuededOpCode.
@ivar change: a Condition variable we use for waiting for job changes
"""
+ __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+ "received_timestamp", "start_timestamp", "end_timestamp",
+ "change",
+ "__weakref__"]
+
def __init__(self, queue, job_id, ops):
"""Constructor for the _QueuedJob.
"""Selectively returns the log entries.
@type newer_than: None or int
- @param newer_than: if this is None, return all log enties,
+ @param newer_than: if this is None, return all log entries,
otherwise return only the log entries with serial higher
than this value
@rtype: list
return entries
+ def MarkUnfinishedOps(self, status, result):
+ """Mark unfinished opcodes with a given status and result.
+
+ This is an utility function for marking all running or waiting to
+ be run opcodes with a given status. Opcodes which are already
+ finalised are not changed.
+
+ @param status: a given opcode status
+ @param result: the opcode result
+
+ """
+ not_marked = True
+ for op in self.ops:
+ if op.status in constants.OPS_FINALIZED:
+ assert not_marked, "Finalized opcodes found after non-finalized ones"
+ continue
+ op.status = status
+ op.result = result
+ not_marked = False
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
count = len(job.ops)
for idx, op in enumerate(job.ops):
op_summary = op.input.Summary()
+ if op.status == constants.OP_STATUS_SUCCESS:
+ # this is a job that was partially completed before master
+ # daemon shutdown, so it can be expected that some opcodes
+ # are already completed successfully (if any did error
+ # out, then the whole job should have been aborted and not
+ # resubmitted for processing)
+ logging.info("Op %s/%s: opcode %s already processed, skipping",
+ idx + 1, count, op_summary)
+ continue
try:
logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
op_summary)
queue.acquire()
try:
try:
- job.run_op_idx = -1
+ job.run_op_index = -1
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
finally:
class JobQueue(object):
- """Quue used to manaage the jobs.
+ """Queue used to manage the jobs.
@cvar _RE_JOB_FILE: regex matching the valid job file names
constants.JOB_STATUS_CANCELING):
logging.warning("Unfinished job %s found: %s", job.id, job)
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_ERROR
- op.result = "Unclean master daemon shutdown"
+ job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+ "Unclean master daemon shutdown")
finally:
self.UpdateJobUnlocked(job)
Since we aim to keep consistency should this node (the current
master) fail, we will log errors if our rpc fail, and especially
- log the case when more than half of the nodes failes.
+ log the case when more than half of the nodes fails.
@param result: the data as returned from the rpc call
@type nodes: list
"""
return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
- def _NewSerialUnlocked(self):
+ def _NewSerialsUnlocked(self, count):
"""Generates a new job identifier.
Job identifiers are unique during the lifetime of a cluster.
+ @type count: integer
+ @param count: how many serials to return
@rtype: str
@return: a string representing the job identifier.
"""
+ assert count > 0
# New number
- serial = self._last_serial + 1
+ serial = self._last_serial + count
# Write to file
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
"%s\n" % serial)
+ result = [self._FormatJobID(v)
+ for v in range(self._last_serial, serial + 1)]
# Keep it only if we were able to write the file
self._last_serial = serial
- return self._FormatJobID(serial)
+ return result
@staticmethod
def _GetJobPath(job_id):
and in the future we might merge them.
@type drain_flag: boolean
- @param drain_flag: wheter to set or unset the drain flag
+ @param drain_flag: Whether to set or unset the drain flag
"""
if drain_flag:
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
return True
- @utils.LockedMethod
@_RequireOpenQueue
- def SubmitJob(self, ops):
+ def _SubmitJobUnlocked(self, job_id, ops):
"""Create and store a new job.
This enters the job into our job queue and also puts it on the new
queue, in order for it to be picked up by the queue processors.
+ @type job_id: job ID
+ @param jod_id: the job ID for the new job
@type ops: list
@param ops: The list of OpCodes that will become the new job.
@rtype: job ID
"""
if self._IsQueueMarkedDrain():
- raise errors.JobQueueDrainError()
+ raise errors.JobQueueDrainError("Job queue is drained, refusing job")
# Check job queue size
size = len(self._ListJobFiles())
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
- # Get job identifier
- job_id = self._NewSerialUnlocked()
job = _QueuedJob(self, job_id, ops)
# Write to disk
return job.id
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def SubmitJob(self, ops):
+ """Create and store a new job.
+
+ @see: L{_SubmitJobUnlocked}
+
+ """
+ job_id = self._NewSerialsUnlocked(1)[0]
+ return self._SubmitJobUnlocked(job_id, ops)
+
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def SubmitManyJobs(self, jobs):
+ """Create and store multiple jobs.
+
+ @see: L{_SubmitJobUnlocked}
+
+ """
+ results = []
+ all_job_ids = self._NewSerialsUnlocked(len(jobs))
+ for job_id, ops in zip(all_job_ids, jobs):
+ try:
+ data = self._SubmitJobUnlocked(job_id, ops)
+ status = True
+ except errors.GenericError, err:
+ data = str(err)
+ status = False
+ results.append((status, data))
+
+ return results
+
+
@_RequireOpenQueue
def UpdateJobUnlocked(self, job):
"""Update a job's on disk storage.
"""
logging.debug("Waiting for changes in job %s", job_id)
+
+ job_info = None
+ log_entries = None
+
end_time = time.time() + timeout
while True:
delta_time = end_time - time.time()
logging.debug("Job %s changed", job_id)
- return (job_info, log_entries)
+ if job_info is None and log_entries is None:
+ return None
+ else:
+ return (job_info, log_entries)
@utils.LockedMethod
@_RequireOpenQueue
if job_status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_WAITLOCK):
- logging.debug("Job %s is no longer in the queue", job.id)
- return (False, "Job %s is no longer in the queue" % job.id)
+ logging.debug("Job %s is no longer waiting in the queue", job.id)
+ return (False, "Job %s is no longer waiting in the queue" % job.id)
if job_status == constants.JOB_STATUS_QUEUED:
self.CancelJobUnlocked(job)
elif job_status == constants.JOB_STATUS_WAITLOCK:
# The worker will notice the new status and cancel the job
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_CANCELING
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
finally:
self.UpdateJobUnlocked(job)
return (True, "Job %s will be canceled" % job.id)
"""
try:
- for op in job.ops:
- op.status = constants.OP_STATUS_CANCELED
- op.result = "Job canceled by request"
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+ "Job canceled by request")
finally:
self.UpdateJobUnlocked(job)
"""Module implementing the job queue handling."""
import os
-import logging
import errno
-import re
from ganeti import constants
from ganeti import errors
# Whenever we want to acquire a full LockSet we pass None as the value
-# to acquire. Hide this behing this nicely named constant.
+# to acquire. Hide this behind this nicely named constant.
ALL_SET = None
# Of course something is going to be really wrong, after this.
if lock._is_owned():
lock.release()
- raise
+ raise
except:
# If something went wrong and we had the set-lock let's release it...
class GanetiLockManager:
"""The Ganeti Locking Library
- The purpouse of this small library is to manage locking for ganeti clusters
+ The purpose of this small library is to manage locking for ganeti clusters
in a central place, while at the same time doing dynamic checks against
possible deadlocks. It will also make it easier to transition to a different
lock type should we migrate away from python threads.
"""Acquire a set of resource locks, at the same level.
@param level: the level at which the locks shall be acquired;
- it must be a memmber of LEVELS.
+ it must be a member of LEVELS.
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
@param shared: whether to acquire in shared mode; by default
mode, before releasing them.
@param level: the level at which the locks shall be released;
- it must be a memmber of LEVELS
+ it must be a member of LEVELS
@param names: the names of the locks which shall be released
(defaults to all the locks acquired at that level)
"""Add locks at the specified level.
@param level: the level at which the locks shall be added;
- it must be a memmber of LEVELS_MOD.
+ it must be a member of LEVELS_MOD.
@param names: names of the locks to acquire
@param acquired: whether to acquire the newly added locks
@param shared: whether the acquisition will be shared
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
raise EncodingError("Message terminator found in payload")
self._CheckSocket()
try:
+ # TODO: sendall is not guaranteed to send everything
self.socket.sendall(msg + self.eom)
except socket.timeout, err:
raise TimeoutError("Sending timeout: %s" % str(err))
def Recv(self):
- """Try to receive a messae from the socket.
+ """Try to receive a message from the socket.
In case we already have messages queued, we just return from the
queue. Otherwise, we try to read data with a _rwtimeout network
while not self._msgs:
if time.time() > etime:
raise TimeoutError("Extended receive timeout")
- try:
- data = self.socket.recv(4096)
- except socket.timeout, err:
- raise TimeoutError("Receive timeout: %s" % str(err))
+ while True:
+ try:
+ data = self.socket.recv(4096)
+ except socket.error, err:
+ if err.args and err.args[0] == errno.EAGAIN:
+ continue
+ raise
+ except socket.timeout, err:
+ raise TimeoutError("Receive timeout: %s" % str(err))
+ break
if not data:
raise ConnectionClosedError("Connection closed while reading")
new_msgs = (self._buffer + data).split(self.eom)
old_transp = self.transport
self.transport = None
old_transp.Close()
- except Exception, err:
+ except Exception:
pass
def CallMethod(self, method, args):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+ def SubmitManyJobs(self, jobs):
+ jobs_state = []
+ for ops in jobs:
+ jobs_state.append([op.__getstate__() for op in ops])
+ return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
def CancelJob(self, job_id):
return self.CallMethod(REQ_CANCEL_JOB, job_id)
self.context.glm.add(level, add_locks, acquired=1, shared=share)
except errors.LockError:
raise errors.OpPrereqError(
- "Coudn't add locks (%s), probably because of a race condition"
+ "Couldn't add locks (%s), probably because of a race condition"
" with another job, who added them first" % add_locks)
try:
try:
@type run_notifier: callable (no arguments) or None
@param run_notifier: this function (if callable) will be called when
we are about to call the lu's Exec() method, that
- is, after we have aquired all locks
+ is, after we have acquired all locks
"""
if not isinstance(op, opcodes.OpCode):
phase = constants.HOOKS_PHASE_POST
hpath = constants.HOOKS_NAME_CFGUPDATE
nodes = [self.lu.cfg.GetMasterNode()]
- results = self._RunWrapper(nodes, hpath, phase)
+ self._RunWrapper(nodes, hpath, phase)
def __init__(self, **kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
+ self.UpgradeConfig()
def __getattr__(self, name):
if name not in self.__slots__:
"""Implement __repr__ for ConfigObjects."""
return repr(self.ToDict())
+ def UpgradeConfig(self):
+ """Fill defaults for missing configuration values.
+
+ This method will be called at object init time, and its implementation will
+ be object dependent.
+
+ """
+ pass
+
class TaggableObject(ConfigObject):
"""An generic class supporting tags.
"""Checks that this disk is correctly configured.
"""
- errors = []
+ all_errors = []
if self.mode not in constants.DISK_ACCESS_SET:
- errors.append("Disk access mode '%s' is invalid" % (self.mode, ))
- return errors
+ all_errors.append("Disk access mode '%s' is invalid" % (self.mode, ))
+ return all_errors
class Instance(TaggableObject):
"hvparams",
"beparams",
"candidate_pool_size",
+ "modify_etc_hosts",
]
+ def UpgradeConfig(self):
+ """Fill defaults for missing configuration values.
+
+ """
+ if self.hvparams is None:
+ self.hvparams = constants.HVC_DEFAULTS
+ else:
+ for hypervisor in self.hvparams:
+ self.hvparams[hypervisor] = self.FillDict(
+ constants.HVC_DEFAULTS[hypervisor], self.hvparams[hypervisor])
+
+ if self.beparams is None:
+ self.beparams = {constants.BEGR_DEFAULT: constants.BEC_DEFAULTS}
+ else:
+ for begroup in self.beparams:
+ self.beparams[begroup] = self.FillDict(constants.BEC_DEFAULTS,
+ self.beparams[begroup])
+
+ if self.modify_etc_hosts is None:
+ self.modify_etc_hosts = True
+
def ToDict(self):
"""Custom function for cluster.
raise ValueError("Invalid data to LoadOpcode, missing OP_ID")
op_id = data["OP_ID"]
op_class = None
- for item in globals().values():
- if (isinstance(item, type) and
- issubclass(item, cls) and
- hasattr(item, "OP_ID") and
- getattr(item, "OP_ID") == op_id):
- op_class = item
- break
- if op_class is None:
+ if op_id in OP_MAPPING:
+ op_class = OP_MAPPING[op_id]
+ else:
raise ValueError("Invalid data to LoadOpCode: OP_ID %s unsupported" %
op_id)
op = op_class()
"mem_size", "disks", "disk_template",
"os", "tags", "nics", "vcpus", "hypervisor",
]
+
+OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
+ if (isinstance(v, type) and issubclass(v, OpCode) and
+ hasattr(v, "OP_ID"))])
import logging
-import ganeti.cli
-
from ganeti import luxi
from ganeti import rapi
from ganeti import http
val = 0
try:
val = int(val)
- except (ValueError, TypeError), err:
+ except (ValueError, TypeError):
raise http.HttpBadRequest("Invalid value for the"
" '%s' parameter" % (name,))
return val
"mtotal", "mnode", "mfree",
"pinst_cnt", "sinst_cnt", "tags",
"ctotal", "cnodes", "csockets",
+ "pip", "sip", "serial_no", "role",
+ "pinst_list", "sinst_list",
]
instance_name = self.items[0]
reboot_type = self.queryargs.get('type',
[constants.INSTANCE_REBOOT_HARD])[0]
- ignore_secondaries = bool(self.queryargs.get('ignore_secondaries',
- [False])[0])
+ ignore_secondaries = bool(self._checkIntVariable('ignore_secondaries'))
op = opcodes.OpRebootInstance(instance_name=instance_name,
reboot_type=reboot_type,
ignore_secondaries=ignore_secondaries)
"""
instance_name = self.items[0]
- force_startup = bool(self.queryargs.get('force', [False])[0])
+ force_startup = bool(self._checkIntVariable('force'))
op = opcodes.OpStartupInstance(instance_name=instance_name,
force=force_startup)
# R0904: Too many public methods
import os
-import socket
import logging
import zlib
import base64
calls we can't raise an exception just because one one out of many
failed, and therefore we use this class to encapsulate the result.
- @ivar data: the data payload, for successfull results, or None
+ @ivar data: the data payload, for successful results, or None
@type failed: boolean
@ivar failed: whether the operation failed at RPC level (not
application level on the remote node)
list of nodes, will contact (in parallel) all nodes, and return a
dict of results (key: node name, value: result).
- One current bug is that generic failure is still signalled by
+ One current bug is that generic failure is still signaled by
'False' result, which is not good. This overloading of values can
cause bugs.
@return: List of RPC results
"""
- assert _http_manager, "RPC module not intialized"
+ assert _http_manager, "RPC module not initialized"
_http_manager.ExecRequests(self.nc.values())
@type instance: L{objects.Instance}
@param instance: an Instance object
@type hvp: dict or None
- @param hvp: a dictionary with overriden hypervisor parameters
+ @param hvp: a dictionary with overridden hypervisor parameters
@type bep: dict or None
- @param bep: a dictionary with overriden backend parameters
+ @param bep: a dictionary with overridden backend parameters
@rtype: dict
@return: the instance dict, with the hvparams filled with the
cluster defaults
def _ConnectList(self, client, node_list, call):
"""Helper for computing node addresses.
- @type client: L{Client}
+ @type client: L{ganeti.rpc.Client}
@param client: a C{Client} instance
@type node_list: list
@param node_list: the node list we should connect
def _ConnectNode(self, client, node, call):
"""Helper for computing one node's address.
- @type client: L{Client}
+ @type client: L{ganeti.rpc.Client}
@param client: a C{Client} instance
@type node: str
@param node: the node we should connect
"""Simple class to write configuration file.
"""
- def SetMasterNode(self, node):
- """Change master node.
-
- """
- self._config_data["cluster"]["master_node"] = node
-
def Save(self):
"""Writes configuration file.
connected to).
This is used to detect problems in ssh known_hosts files
- (conflicting known hosts) and incosistencies between dns/hosts
+ (conflicting known hosts) and inconsistencies between dns/hosts
entries and local machine names
@param node: nodename of a host to check; can be short or
"""
-import sys
import os
import time
import subprocess
_locksheld = []
_re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
-debug = False
debug_locks = False
#: when set to True, L{RunCmd} is disabled
directory for the command; the default will be /
@rtype: L{RunResult}
@return: RunResult instance
- @raise erors.ProgrammerError: if we call this when forks are disabled
+ @raise errors.ProgrammerError: if we call this when forks are disabled
"""
if no_fork:
"""
try:
nv = fn(val)
- except (ValueError, TypeError), err:
+ except (ValueError, TypeError):
nv = val
return nv
@type ip: str
@param ip: the address to be checked
@rtype: a regular expression match object
- @return: a regular epression match object, or None if the
+ @return: a regular expression match object, or None if the
address is not valid
"""
This function will check all arguments in the args list so that they
are valid shell parameters (i.e. they don't contain shell
- metacharaters). If everything is ok, it will return the result of
+ metacharacters). If everything is ok, it will return the result of
template % args.
@type template: str
@type args: list
@param args: list of arguments to be quoted
@rtype: str
- @return: the quoted arguments concatenaned with spaces
+ @return: the quoted arguments concatenated with spaces
"""
return ' '.join([ShellQuote(i) for i in args])
@type port: int
@param port: the port to connect to
@type timeout: int
- @param timeout: the timeout on the connection attemp
+ @param timeout: the timeout on the connection attempt
@type live_port_needed: boolean
@param live_port_needed: whether a closed port will cause the
function to return failure, as if there was a timeout
if source is not None:
try:
sock.bind((source, 0))
- except socket.error, (errcode, errstring):
+ except socket.error, (errcode, _):
if errcode == errno.EADDRNOTAVAIL:
success = False
address.
@type address: string
- @param address: the addres to check
+ @param address: the address to check
@rtype: bool
@return: True if we own the address
@type size: None or int
@param size: Read at most size bytes
@rtype: str
- @return: the (possibly partial) conent of the file
+ @return: the (possibly partial) content of the file
"""
f = open(file_name, "r")
def all(seq, pred=bool):
"Returns True if pred(x) is True for every element in the iterable"
- for elem in itertools.ifilterfalse(pred, seq):
+ for _ in itertools.ifilterfalse(pred, seq):
return False
return True
def any(seq, pred=bool):
"Returns True if pred(x) is True for at least one element in the iterable"
- for elem in itertools.ifilter(pred, seq):
+ for _ in itertools.ifilter(pred, seq):
return True
return False
Element order is preserved.
@type seq: sequence
- @param seq: the sequence with the source elementes
+ @param seq: the sequence with the source elements
@rtype: list
@return: list of unique elements from seq
def IsValidMac(mac):
"""Predicate to check if a MAC address is valid.
- Checks wether the supplied MAC address is formally correct, only
+ Checks whether the supplied MAC address is formally correct, only
accepts colon separated format.
@type mac: str
@param name: the daemon name used to derive the pidfile name
"""
- pid = os.getpid()
pidfilename = DaemonPidFileName(name)
# TODO: we could check here that the file contains our pid
try:
"""
if isinstance(text, unicode):
- # onli if unicode; if str already, we handle it below
+ # only if unicode; if str already, we handle it below
text = text.encode('ascii', 'backslashreplace')
resu = ""
for char in text:
<cmdsynopsis>
<command>submit-job</command>
-
+ <arg choice="opt">--verbose</arg>
+ <arg choice="opt">--timing-stats</arg>
+ <arg choice="opt">--job-repeat <option>N</option></arg>
+ <arg choice="opt">--op-repeat <option>N</option></arg>
<arg choice="req" rep="repeat">opcodes_file</arg>
</cmdsynopsis>
command line.
</para>
+ <para>
+ The <option>verbose</option> option will job the job IDs of
+ the submitted jobs and the progress in waiting for the jobs;
+ the <option>timing-stats</option> option will show some
+ overall statistics with the number of total opcodes and jobs
+ submitted, and time time for each stage (submit, exec, total).
+ </para>
+
+ <para>
+ The <option>job-repeat</option> and <option>op-repeat</option>
+ options allow to submit multiple copies of the passed
+ arguments; the job repeat will cause N copies of each job
+ (input file) to be submitted (equivalent to passing the
+ arguments N times) while the op repeat will cause each job to
+ contain multiple copies of the opcodes (equivalent to each
+ file containing N copies of the opcodes).
+ </para>
+
</refsect2>
</refsect1>
<cmdsynopsis>
<command>reinstall</command>
<arg choice="opt">-o <replaceable>os-type</replaceable></arg>
- <arg choice="opt">-f <replaceable>force</replaceable></arg>
<arg>--select-os</arg>
+ <arg choice="opt">-f <replaceable>force</replaceable></arg>
+ <arg>--force-multiple</arg>
+ <sbr>
+ <group choice="opt">
+ <arg>--instance</arg>
+ <arg>--node</arg>
+ <arg>--primary</arg>
+ <arg>--secondary</arg>
+ <arg>--all</arg>
+ </group>
<arg>--submit</arg>
- <arg choice="req"><replaceable>instance</replaceable></arg>
+ <arg choice="opt" rep="repeat"><replaceable>instance</replaceable></arg>
</cmdsynopsis>
<para>
- Reinstalls the operating system on the given instance. The
- instance must be stopped when running this command. If the
+ Reinstalls the operating system on the given instance(s). The
+ instance(s) must be stopped when running this command. If the
<option>--os-type</option> is specified, the operating
system is changed.
</para>
<para>
- Since reinstall is potentially dangerous command, the user
- will be required to confirm this action, unless the
- <option>-f</option> flag is passed.
- </para>
-
- <para>
The <option>--select-os</option> option switches to an
interactive OS reinstall. The user is prompted to select the OS
template from the list of available OS templates.
</para>
<para>
+ Since this is a potentially dangerous command, the user will
+ be required to confirm this action, unless the
+ <option>-f</option> flag is passed. When multiple instances
+ are selected (either by passing multiple arguments or by
+ using the <option>--node</option>,
+ <option>--primary</option>, <option>--secondary</option> or
+ <option>--all</option> options), the user must pass both the
+ <option>--force</option> and
+ <option>--force-multiple</option> options to skip the
+ interactive confirmation.
+ </para>
+
+ <para>
The <option>--submit</option> option is used to send the job to
the master daemon but not wait for its completion. The job
ID will be shown so that it can be examined via
<refsect2>
<title>INFO</title>
<cmdsynopsis>
- <command>cancel</command>
+ <command>info</command>
<arg choice="req" rep="repeat"><replaceable>id</replaceable></arg>
</cmdsynopsis>
</para>
</refsect2>
+
+ <refsect2>
+ <title>WATCH</title>
+ <cmdsynopsis>
+ <command>watch</command>
+ <arg>id</arg>
+ </cmdsynopsis>
+
+ <para>
+ This command follows the output of the job by the given
+ <replaceable>id</replaceable> and prints it.
+ </para>
+ </refsect2>
+
</refsect1>
&footer;
discussion in <citerefentry>
<refentrytitle>gnt-cluster</refentrytitle>
<manvolnum>8</manvolnum> </citerefentry> for more
- informations.
+ information.
</para>
<para>
--- /dev/null
+# Configuration file for pylint (http://www.logilab.org/project/pylint). See
+# http://www.logilab.org/card/pylintfeatures for more detailed variable
+# descriptions.
+
+[MASTER]
+profile = no
+ignore =
+persistent = no
+cache-size = 50000
+load-plugins =
+
+[REPORTS]
+output-format = colorized
+include-ids = no
+files-output = no
+reports = no
+evaluation = 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
+comment = yes
+
+[BASIC]
+required-attributes =
+no-docstring-rgx = __.*__
+module-rgx = (([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
+const-rgx = ((_{0,2}[A-Z][A-Z0-9_]*)|(__.*__))$
+class-rgx = _?[A-Z][a-zA-Z0-9]+$
+function-rgx = (_?([A-Z]+[a-z0-9]+([A-Z]+[a-z0-9]*)*)|main)$
+method-rgx = (_{0,2}[A-Z]+[a-z0-9]+([A-Z]+[a-z0-9]*)*|__.*__)$
+attr-rgx = [a-z_][a-z0-9_]{1,30}$
+argument-rgx = [a-z_][a-z0-9_]*$
+variable-rgx = (_?([a-z_][a-z0-9_]*)|([A-Z0-9_]+))$
+inlinevar-rgx = [A-Za-z_][A-Za-z0-9_]*$
+good-names = i,j,k,_
+bad-names = foo,bar,baz,toto,tutu,tata
+bad-functions =
+
+[TYPECHECK]
+ignore-mixin-members = yes
+zope = no
+acquired-members =
+
+[VARIABLES]
+init-import = no
+dummy-variables-rgx = _
+additional-builtins =
+
+[CLASSES]
+ignore-iface-methods =
+defining-attr-methods = __init__,__new__,setUp
+
+[DESIGN]
+max-args = 6
+max-locals = 15
+max-returns = 6
+max-branchs = 12
+max-statements = 50
+max-parents = 7
+max-attributes = 7
+min-public-methods = 2
+max-public-methods = 20
+
+[IMPORTS]
+deprecated-modules = regsub,string,TERMIOS,Bastion,rexec
+import-graph =
+ext-import-graph =
+int-import-graph =
+
+[FORMAT]
+max-line-length = 80
+max-module-lines = 1000
+indent-string = " "
+
+[MISCELLANEOUS]
+notes = FIXME,XXX,TODO
+
+[SIMILARITIES]
+min-similarity-lines = 4
+ignore-comments = yes
+ignore-docstrings = yes
"""Return whether remote API tests should be run.
"""
- return constants.RAPI_ENABLE and qa_config.TestEnabled('rapi')
-
-
-def PrintRemoteAPIWarning():
- """Print warning if remote API is not enabled.
-
- """
- if constants.RAPI_ENABLE or not qa_config.TestEnabled('rapi'):
- return
- msg = ("Remote API is not enabled in this Ganeti build. Please run"
- " `configure [...] --enable-rapi'.")
- print
- print qa_utils.FormatWarning(msg)
+ return qa_config.TestEnabled('rapi')
def _DoTests(uris):
hvparams[hv][parameter] = constants.HVC_DEFAULTS[hv][parameter]
utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
- for hv in hvlist:
- if hv not in constants.HYPER_TYPES:
- ToStderr("invalid hypervisor: %s", hv)
- return 1
-
bootstrap.InitCluster(cluster_name=args[0],
secondary_ip=opts.secondary_ip,
vg_name=vg_name,
hvparams=hvparams,
beparams=beparams,
candidate_pool_size=opts.candidate_pool_size,
+ modify_etc_hosts=opts.modify_etc_hosts,
)
return 0
ToStdout("Architecture (this node): %s (%s)",
result["architecture"][0], result["architecture"][1])
+ if result["tags"]:
+ tags = ", ".join(utils.NiceSort(result["tags"]))
+ else:
+ tags = "(none)"
+
+ ToStdout("Tags: %s", tags)
+
ToStdout("Default hypervisor: %s", result["default_hypervisor"])
ToStdout("Enabled hypervisors: %s", ", ".join(result["enabled_hypervisors"]))
help="No support for lvm based instances"
" (cluster-wide)",
action="store_false", default=True,),
+ make_option("--no-etc-hosts", dest="modify_etc_hosts",
+ help="Don't modify /etc/hosts"
+ " (cluster-wide)",
+ action="store_false", default=True,),
make_option("--enabled-hypervisors", dest="enabled_hypervisors",
help="Comma-separated list of hypervisors",
type="string", default=None),
"""
cl = cli.GetClient()
- job_data = []
- job_ids = []
- for fname in args:
- op_data = simplejson.loads(open(fname).read())
- op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
- job_data.append((fname, op_list))
- for fname, op_list in job_data:
- jid = cli.SendJob(op_list, cl=cl)
- ToStdout("File '%s', job id: %s", fname, jid)
- job_ids.append(jid)
- for jid in job_ids:
- ToStdout("Waiting for job id %s", jid)
- cli.PollJob(jid, cl=cl)
+ jex = cli.JobExecutor(cl=cl, verbose=opts.verbose)
+
+ job_cnt = 0
+ op_cnt = 0
+ if opts.timing_stats:
+ ToStdout("Loading...")
+ for job_idx in range(opts.rep_job):
+ for fname in args:
+ op_data = simplejson.loads(open(fname).read())
+ op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
+ op_list = op_list * opts.rep_op
+ jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
+ op_cnt += len(op_list)
+ job_cnt += 1
+
+ if opts.timing_stats:
+ t1 = time.time()
+ ToStdout("Submitting...")
+ jex.SubmitPending()
+
+ if opts.timing_stats:
+ t2 = time.time()
+ ToStdout("Executing...")
+ jex.GetResults()
+ if opts.timing_stats:
+ t3 = time.time()
+ ToStdout("C:op %4d" % op_cnt)
+ ToStdout("C:job %4d" % job_cnt)
+ ToStdout("T:submit %4.4f" % (t2-t1))
+ ToStdout("T:exec %4.4f" % (t3-t2))
+ ToStdout("T:total %4.4f" % (t3-t1))
return 0
"[opts...] <duration>", "Executes a TestDelay OpCode"),
'submit-job': (GenericOpCodes, ARGS_ATLEAST(1),
[DEBUG_OPT,
+ make_option("--op-repeat", type="int", default="1",
+ dest="rep_op",
+ help="Repeat the opcode sequence this number"
+ " of times"),
+ make_option("--job-repeat", type="int", default="1",
+ dest="rep_job",
+ help="Repeat the job this number"
+ " of times"),
+ make_option("-v", "--verbose", default=False,
+ action="store_true",
+ help="Make the operation more verbose"),
+ make_option("--timing-stats", default=False,
+ action="store_true",
+ help="Show timing stats"),
],
"<op_list_file...>", "Submits jobs built from json files"
" containing a list of serialized opcodes"),
return inames
-def _ConfirmOperation(inames, text):
+def _ConfirmOperation(inames, text, extra=""):
"""Ask the user to confirm an operation on a list of instances.
This function is used to request confirmation for doing an operation
"""
count = len(inames)
- msg = ("The %s will operate on %d instances.\n"
- "Do you want to continue?" % (text, count))
+ msg = ("The %s will operate on %d instances.\n%s"
+ "Do you want to continue?" % (text, count, extra))
affected = ("\nAffected instances:\n" +
"\n".join([" %s" % name for name in inames]))
This function will raise an OpPrereqError in case they don't
exist. Otherwise it will exit cleanly.
- @type client: L{luxi.Client}
+ @type client: L{ganeti.luxi.Client}
@param client: the client to use for the query
@type names: list
@param names: the list of instance names to query
ToStderr("Can't parse the instance definition file: %s" % str(err))
return 1
+ jex = JobExecutor()
+
# Iterate over the instances and do:
# * Populate the specs with default value
# * Validate the instance specs
file_storage_dir=specs['file_storage_dir'],
file_driver=specs['file_driver'])
- ToStdout("%s: %s", name, cli.SendJob([op]))
+ jex.QueueJob(name, op)
+ # we never want to wait, just show the submitted job IDs
+ jex.WaitOrShow(False)
return 0
@return: the desired exit code
"""
- instance_name = args[0]
+ # first, compute the desired name list
+ if opts.multi_mode is None:
+ opts.multi_mode = _SHUTDOWN_INSTANCES
+
+ inames = _ExpandMultiNames(opts.multi_mode, args)
+ if not inames:
+ raise errors.OpPrereqError("Selection filter does not match any instances")
+ # second, if requested, ask for an OS
if opts.select_os is True:
op = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
result = SubmitOpCode(op)
choices)
if selected == 'exit':
- ToStdout("User aborted reinstall, exiting")
+ ToStderr("User aborted reinstall, exiting")
return 1
os_name = selected
else:
os_name = opts.os
- if not opts.force:
- usertext = ("This will reinstall the instance %s and remove"
- " all data. Continue?") % instance_name
- if not AskUser(usertext):
+ # third, get confirmation: multi-reinstall requires --force-multi
+ # *and* --force, single-reinstall just --force
+ multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
+ if multi_on:
+ warn_msg = "Note: this will remove *all* data for the below instances!\n"
+ if not ((opts.force_multi and opts.force) or
+ _ConfirmOperation(inames, "reinstall", extra=warn_msg)):
return 1
+ else:
+ if not opts.force:
+ usertext = ("This will reinstall the instance %s and remove"
+ " all data. Continue?") % inames[0]
+ if not AskUser(usertext):
+ return 1
+
+ jex = JobExecutor(verbose=multi_on)
+ for instance_name in inames:
+ op = opcodes.OpReinstallInstance(instance_name=instance_name,
+ os_type=os_name)
+ jex.QueueJob(instance_name, op)
- op = opcodes.OpReinstallInstance(instance_name=instance_name,
- os_type=os_name)
- SubmitOrSend(op, opts)
-
+ jex.WaitOrShow(not opts.submit_only)
return 0
" The default field"
" list is (in order): %s." % ", ".join(_LIST_DEF_FIELDS),
),
- 'reinstall': (ReinstallInstance, ARGS_ONE,
+ 'reinstall': (ReinstallInstance, ARGS_ANY,
[DEBUG_OPT, FORCE_OPT, os_opt,
+ m_force_multi,
+ m_node_opt, m_pri_node_opt, m_sec_node_opt,
+ m_clust_opt, m_inst_opt,
make_option("--select-os", dest="select_os",
action="store_true", default=False,
help="Interactive OS reinstall, lists available"
from ganeti import constants
from ganeti import errors
from ganeti import utils
+from ganeti import cli
#: default list of fields for L{ListJobs}
return 0
+def WatchJob(opts, args):
+ """Follow a job and print its output as it arrives.
+
+ @param opts: the command line options selected by the user
+ @type args: list
+ @param args: Contains the job ID
+ @rtype: int
+ @return: the desired exit code
+
+ """
+ job_id = args[0]
+
+ msg = ("Output from job %s follows" % job_id)
+ ToStdout(msg)
+ ToStdout("-" * len(msg))
+
+ retcode = 0
+ try:
+ cli.PollJob(job_id)
+ except errors.GenericError, err:
+ (retcode, job_result) = cli.FormatError(err)
+ ToStderr("Job %s failed: %s", job_id, job_result)
+
+ return retcode
+
+
commands = {
'list': (ListJobs, ARGS_ANY,
[DEBUG_OPT, NOHDR_OPT, SEP_OPT, FIELDS_OPT],
'info': (ShowJobs, ARGS_ANY, [DEBUG_OPT],
"<job-id> [<job-id> ...]",
"Show detailed information about the specified jobs"),
+ 'watch': (WatchJob, ARGS_ONE, [DEBUG_OPT],
+ "<job-id>",
+ "Follows a job and prints its output as it arrives"),
}
cnt = [dst_node, iallocator].count(None)
if cnt != 1:
- raise errors.OpPrereqError("One and only one of the -n and -i"
+ raise errors.OpPrereqError("One and only one of the -n and -I"
" options must be passed")
selected_fields = ["name", "sinst_list"]
choices=('yes', 'no'), default=None,
help="Set the drained flag on the node"),
],
- "<instance>", "Alters the parameters of an instance"),
+ "<node_name>", "Alters the parameters of a node"),
'remove': (RemoveNode, ARGS_ONE, [DEBUG_OPT],
"<node_name>", "Removes a node from the cluster"),
'volumes': (ListVolumes, ARGS_ANY,
default_bridge=constants.DEFAULT_BRIDGE,
tcpudp_port_pool=set(),
default_hypervisor=constants.HT_FAKE,
+ enabled_hypervisors=[constants.HT_FAKE],
master_node=me.name,
master_ip="127.0.0.1",
master_netdev=constants.DEFAULT_BRIDGE,
USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
+MAX_RETRIES = 3
class InstanceDown(Exception):
"""The checked instance was not up"""
+class BurninFailure(Exception):
+ """Failure detected during burning"""
+
+
def Usage():
"""Shows program usage information and exits the program."""
self.to_rem = []
self.queued_ops = []
self.opts = None
+ self.queue_retry = False
+ self.disk_count = self.disk_growth = self.disk_size = None
+ self.hvp = self.bep = None
self.ParseOptions()
self.cl = cli.GetClient()
self.GetState()
if self.opts.verbose:
Log(msg, indent=3)
- def ExecOp(self, *ops):
+ def MaybeRetry(self, retry_count, msg, fn, *args):
+ """Possibly retry a given function execution.
+
+ @type retry_count: int
+ @param retry_count: retry counter:
+ - 0: non-retryable action
+ - 1: last retry for a retryable action
+ - MAX_RETRIES: original try for a retryable action
+ @type msg: str
+ @param msg: the kind of the operation
+ @type fn: callable
+ @param fn: the function to be called
+
+ """
+ try:
+ val = fn(*args)
+ if retry_count > 0 and retry_count < MAX_RETRIES:
+ Log("Idempotent %s succeeded after %d retries" %
+ (msg, MAX_RETRIES - retry_count))
+ return val
+ except Exception, err:
+ if retry_count == 0:
+ Log("Non-idempotent %s failed, aborting" % (msg, ))
+ raise
+ elif retry_count == 1:
+ Log("Idempotent %s repeated failure, aborting" % (msg, ))
+ raise
+ else:
+ Log("Idempotent %s failed, retry #%d/%d: %s" %
+ (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
+ self.MaybeRetry(retry_count - 1, msg, fn, *args)
+
+ def _ExecOp(self, *ops):
"""Execute one or more opcodes and manage the exec buffer.
@result: if only opcode has been passed, we return its result;
else:
return results
+ def ExecOp(self, retry, *ops):
+ """Execute one or more opcodes and manage the exec buffer.
+
+ @result: if only opcode has been passed, we return its result;
+ otherwise we return the list of results
+
+ """
+ if retry:
+ rval = MAX_RETRIES
+ else:
+ rval = 0
+ return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
+
def ExecOrQueue(self, name, *ops):
"""Execute an opcode and manage the exec buffer."""
if self.opts.parallel:
self.queued_ops.append((ops, name))
else:
- return self.ExecOp(*ops)
+ return self.ExecOp(self.queue_retry, *ops)
+
+ def StartBatch(self, retry):
+ """Start a new batch of jobs.
+
+ @param retry: whether this is a retryable batch
+
+ """
+ self.queued_ops = []
+ self.queue_retry = retry
def CommitQueue(self):
"""Execute all submitted opcodes in case of parallel burnin"""
if not self.opts.parallel:
return
+ if self.queue_retry:
+ rval = MAX_RETRIES
+ else:
+ rval = 0
+
try:
- results = self.ExecJobSet(self.queued_ops)
+ results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
+ self.queued_ops)
finally:
self.queued_ops = []
return results
results = []
for jid, (_, iname) in zip(job_ids, jobs):
Log("waiting for job %s for %s" % (jid, iname), indent=2)
- results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
-
+ try:
+ results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
+ except Exception, err:
+ Log("Job for %s failed: %s" % (iname, err))
+ if len(results) != len(jobs):
+ raise BurninFailure()
return results
+ def _DoCheckInstances(fn):
+ """Decorator for checking instances.
+
+ """
+ def wrapper(self, *args, **kwargs):
+ val = fn(self, *args, **kwargs)
+ for instance in self.instances:
+ self._CheckInstanceAlive(instance)
+ return val
+
+ return wrapper
+
+ def _DoBatch(retry):
+ """Decorator for possible batch operations.
+
+ Must come after the _DoCheckInstances decorator (if any).
+
+ @param retry: whether this is a retryable batch, will be
+ passed to StartBatch
+
+ """
+ def wrap(fn):
+ def batched(self, *args, **kwargs):
+ self.StartBatch(retry)
+ val = fn(self, *args, **kwargs)
+ self.CommitQueue()
+ return val
+ return batched
+
+ return wrap
+
def ParseOptions(self):
"""Parses the command line options.
try:
op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
names=names, use_locking=True)
- result = self.ExecOp(op)
+ result = self.ExecOp(True, op)
except errors.GenericError, err:
err_code, msg = cli.FormatError(err)
Err(msg, exit_code=err_code)
self.nodes = [data[0] for data in result if not (data[1] or data[2])]
- result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
- names=[]))
+ op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
+ result = self.ExecOp(True, op_diagos)
if not result:
Err("Can't get the OS list")
if self.opts.os not in os_set:
Err("OS '%s' not found" % self.opts.os)
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnCreateInstances(self):
"""Create the given instances.
self.ExecOrQueue(instance, op)
self.to_rem.append(instance)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoBatch(False)
def BurnGrowDisks(self):
"""Grow both the os and the swap disks by the requested amount, if any."""
Log("Growing disks")
amount=growth, wait_for_sync=True)
Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
+ @_DoBatch(True)
def BurnReplaceDisks1D8(self):
"""Replace disks on primary and secondary for drbd8."""
Log("Replacing disks on the same nodes")
Log("run %s" % mode, indent=2)
ops.append(op)
self.ExecOrQueue(instance, *ops)
- self.CommitQueue()
+ @_DoBatch(True)
def BurnReplaceDisks2(self):
"""Replace secondary node."""
Log("Changing the secondary node")
disks=[i for i in range(self.disk_count)])
Log("run %s %s" % (mode, msg), indent=2)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnFailover(self):
"""Failover the instances."""
Log("Failing over instances")
ignore_consistency=False)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
+ @_DoBatch(False)
def BurnMigrate(self):
"""Migrate the instances."""
Log("Migrating instances")
cleanup=True)
Log("migration and migration cleanup", indent=2)
self.ExecOrQueue(instance, op1, op2)
- self.CommitQueue()
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnImportExport(self):
"""Export the instance, delete it, and import it back.
# read the full name of the instance
nam_op = opcodes.OpQueryInstances(output_fields=["name"],
names=[instance], use_locking=True)
- full_name = self.ExecOp(nam_op)[0][0]
+ full_name = self.ExecOp(False, nam_op)[0][0]
if self.opts.iallocator:
pnode = snode = None
Log("remove export", indent=2)
self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
def StopInstanceOp(self, instance):
"""Stop given instance."""
return opcodes.OpShutdownInstance(instance_name=instance)
return opcodes.OpRenameInstance(instance_name=instance,
new_name=instance_new)
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnStopStart(self):
"""Stop/start the instances."""
Log("Stopping and starting instances")
op2 = self.StartInstanceOp(instance)
self.ExecOrQueue(instance, op1, op2)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoBatch(False)
def BurnRemove(self):
"""Remove the instances."""
Log("Removing instances")
ignore_failures=True)
self.ExecOrQueue(instance, op)
- self.CommitQueue()
-
def BurnRename(self):
"""Rename the instances.
op_rename2 = self.RenameInstanceOp(rename, instance)
op_start1 = self.StartInstanceOp(rename)
op_start2 = self.StartInstanceOp(instance)
- self.ExecOp(op_stop1, op_rename1, op_start1)
+ self.ExecOp(False, op_stop1, op_rename1, op_start1)
self._CheckInstanceAlive(rename)
- self.ExecOp(op_stop2, op_rename2, op_start2)
+ self.ExecOp(False, op_stop2, op_rename2, op_start2)
self._CheckInstanceAlive(instance)
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnReinstall(self):
"""Reinstall the instances."""
Log("Reinstalling instances")
op4 = self.StartInstanceOp(instance)
self.ExecOrQueue(instance, op1, op2, op3, op4)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnReboot(self):
"""Reboot the instances."""
Log("Rebooting instances")
ops.append(op)
self.ExecOrQueue(instance, *ops)
- self.CommitQueue()
-
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
-
+ @_DoCheckInstances
+ @_DoBatch(True)
def BurnActivateDisks(self):
"""Activate and deactivate disks of the instances."""
Log("Activating/deactivating disks")
Log("activate disks when offline", indent=2)
Log("deactivate disks (when offline)", indent=2)
self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
+ @_DoCheckInstances
+ @_DoBatch(False)
def BurnAddRemoveDisks(self):
"""Add and remove an extra disk for the instances."""
Log("Adding and removing disks")
Log("adding a disk", indent=2)
Log("removing last disk", indent=2)
self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
- self.CommitQueue()
- for instance in self.instances:
- self._CheckInstanceAlive(instance)
+ @_DoBatch(False)
def BurnAddRemoveNICs(self):
"""Add and remove an extra NIC for the instances."""
Log("Adding and removing NICs")
Log("adding a NIC", indent=2)
Log("removing last NIC", indent=2)
self.ExecOrQueue(instance, op_add, op_rem)
- self.CommitQueue()
def _CheckInstanceAlive(self, instance):
"""Check if an instance is alive by doing http checks.
Log(self.GetFeedbackBuf())
Log("\n\n")
if not self.opts.keep_instances:
- self.BurnRemove()
+ try:
+ self.BurnRemove()
+ except Exception, err:
+ if has_err: # already detected errors, so errors in removal
+ # are quite expected
+ Log("Note: error detected during instance remove: %s" % str(err))
+ else: # non-expected error
+ raise
return 0
from ganeti.utils import RunCmd
from ganeti import constants
+from ganeti import cli
USAGE = ("\tlvmstrap diskinfo\n"
"\tlvmstrap [--vgname=NAME] [--allow-removable]"
devnum: the device number, e.g. 0x803 (2051 in decimal) for sda3
Returns:
- None; failure of the check is signalled by raising a
+ None; failure of the check is signaled by raising a
SysconfigError exception
"""
def DevInfo(name, dev, mountinfo):
- """Computes miscellaneous informations about a block device.
+ """Computes miscellaneous information about a block device.
Args:
name: the device name, e.g. sda
def ShowDiskInfo(opts):
"""Shows a nicely formatted block device list for this system.
- This function shows the user a table with the informations gathered
+ This function shows the user a table with the information gathered
by the other functions defined, in order to help the user make a
choice about which disks should be allocated to our volume group.
dlist = GetDiskList(opts)
print "------- Disk information -------"
- print ("%5s %7s %4s %5s %-10s %s" %
- ("Name", "Size[M]", "Used", "Mount", "LVM?", "Info"))
+ headers = {
+ "name": "Name",
+ "size": "Size[M]",
+ "used": "Used",
+ "mount": "Mount",
+ "lvm": "LVM?",
+ "info": "Info"
+ }
+ fields = ["name", "size", "used", "mount", "lvm", "info"]
flatlist = []
# Flatten the [(disk, [partition,...]), ...] list
for partname, partsize, partdev in parts:
flatlist.append((partname, partsize, partdev, ""))
+ strlist = []
for name, size, dev, in_use in flatlist:
mp, vgname, fileinfo = DevInfo(name, dev, mounts)
if mp is None:
if len(name) > 3:
# Indent partitions
name = " %s" % name
- print ("%-5s %7.2f %-4s %-5s %-10s %s" %
- (name, float(size) / 1024 / 1024, in_use, mp, lvminfo, fileinfo))
+
+ strlist.append([name, "%.2f" % (float(size) / 1024 / 1024),
+ in_use, mp, lvminfo, fileinfo])
+
+ data = cli.GenerateTable(headers, fields, None,
+ strlist, numfields=["size"])
+
+ for line in data:
+ print line
def CheckReread(name):