Merge commit 'origin/next'
authorIustin Pop <iustin@google.com>
Fri, 25 Sep 2009 12:22:58 +0000 (14:22 +0200)
committerIustin Pop <iustin@google.com>
Fri, 25 Sep 2009 12:22:58 +0000 (14:22 +0200)
* commit 'origin/next': (74 commits)
  Fix gnt-node modify online help
  Fix gnt-job info entry in gnt-job(8)
  locking: Don't swallow exceptions
  Add check for duplicate MACs in instance add
  scripts/gnt-node: fix a help string
  Optimise multi-job submit
  Extend gnt-debug with more debugging options
  Return cluster tags from LUQueryClusterInfo
  Add script to clean archived jobs after 21 days
  rapi: export more static node information
  Pass the correct signal to handlers
  cli: Use ToStdout/ToStderr instead of print
  Fix small typo in gnt-node
  Simplify handling of boolean args in rapi
  Fix checks in LUSetNodeParms for the master node
  Improve the example startup script
  Fix insserv dependencies
  Fix a typo in InitCluster
  Ignore results from drained nodes in iallocator
  Ship the ethers hook
  ...

52 files changed:
.gitignore
Makefile.am
daemons/ganeti-cleaner.in [new file with mode: 0755]
daemons/ganeti-masterd
daemons/ganeti-noded
daemons/ganeti-rapi
doc/examples/ganeti.cron.in
doc/examples/ganeti.initd.in
doc/examples/hooks/ethers [new file with mode: 0755]
doc/hooks.rst
doc/iallocator.rst
lib/backend.py
lib/bdev.py
lib/bootstrap.py
lib/cli.py
lib/cmdlib.py
lib/config.py
lib/constants.py
lib/daemon.py
lib/http/__init__.py
lib/http/auth.py
lib/http/client.py
lib/http/server.py
lib/hypervisor/hv_fake.py
lib/hypervisor/hv_xen.py
lib/jqueue.py
lib/jstore.py
lib/locking.py
lib/luxi.py
lib/mcpu.py
lib/objects.py
lib/opcodes.py
lib/rapi/baserlib.py
lib/rapi/rlib2.py
lib/rpc.py
lib/ssconf.py
lib/ssh.py
lib/utils.py
man/gnt-debug.sgml
man/gnt-instance.sgml
man/gnt-job.sgml
man/gnt-node.sgml
pylintrc [new file with mode: 0644]
qa/qa_rapi.py
scripts/gnt-cluster
scripts/gnt-debug
scripts/gnt-instance
scripts/gnt-job
scripts/gnt-node
test/ganeti.config_unittest.py
tools/burnin
tools/lvmstrap

index 0b4512b..cff682b 100644 (file)
@@ -6,6 +6,7 @@
 
 # global ignores
 *.py[co]
+*.swp
 
 # /
 /Makefile
@@ -26,6 +27,9 @@
 /*.tar.bz2
 /*.tar.gz
 
+# daemons
+/daemons/ganeti-cleaner
+
 # devel
 /devel/clean-cluster
 /devel/upload
index 2a24662..062f827 100644 (file)
@@ -26,6 +26,7 @@ DIRS = \
        devel \
        doc \
        doc/examples \
+       doc/examples/hooks \
        lib \
        lib/http \
        lib/hypervisor \
@@ -44,6 +45,7 @@ MAINTAINERCLEANFILES = \
 
 CLEANFILES = \
        autotools/replace_vars.sed \
+       daemons/ganeti-cleaner \
        devel/upload \
        doc/rapi-resources.gen \
        doc/examples/bash_completion \
@@ -138,6 +140,9 @@ dist_sbin_SCRIPTS = \
        scripts/gnt-node \
        scripts/gnt-os
 
+nodist_sbin_SCRIPTS = \
+       daemons/ganeti-cleaner
+
 dist_tools_SCRIPTS = \
        tools/burnin \
        tools/cfgshell \
@@ -148,7 +153,9 @@ EXTRA_DIST = \
        $(MAINTAINERCLEANFILES) \
        NEWS \
        DEVNOTES \
+       pylintrc \
        autotools/docbook-wrapper \
+       daemons/ganeti-cleaner.in \
        devel/upload.in \
        $(docrst) \
        $(docdot) \
@@ -157,6 +164,7 @@ EXTRA_DIST = \
        doc/examples/ganeti.initd.in \
        doc/examples/ganeti.cron.in \
        doc/examples/dumb-allocator \
+       doc/examples/hooks/ethers \
        doc/locking.txt \
        test/testutils.py \
        test/mocks.py \
@@ -241,6 +249,11 @@ doc/examples/%: doc/examples/%.in stamp-directories \
                $(REPLACE_VARS_SED)
        sed -f $(REPLACE_VARS_SED) < $< > $@
 
+daemons/ganeti-cleaner: daemons/ganeti-cleaner.in stamp-directories \
+               $(REPLACE_VARS_SED)
+       sed -f $(REPLACE_VARS_SED) < $< > $@
+       chmod +x $@
+
 doc/%.html: doc/%.rst
        @test -n "$(RST2HTML)" || { echo 'rst2html' not found during configure; exit 1; }
        $(RST2HTML) $< $@
diff --git a/daemons/ganeti-cleaner.in b/daemons/ganeti-cleaner.in
new file mode 100755 (executable)
index 0000000..1815e6d
--- /dev/null
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+
+# Copyright (C) 2009 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+set -e
+
+DATA_DIR=@LOCALSTATEDIR@/lib/ganeti
+QUEUE_ARCHIVE_DIR=$DATA_DIR/queue/archive
+
+# Define how many days archived jobs should be left alone
+REMOVE_AFTER=21
+
+# Exit if machine is not part of a cluster
+[[ -e $DATA_DIR/ssconf_master_node ]] || echo 0
+
+# Exit if queue archive directory doesn't exist
+[[ -d $QUEUE_ARCHIVE_DIR ]] || exit 0
+
+# Remove old jobs
+find $QUEUE_ARCHIVE_DIR -mindepth 2 -type f -mtime +$REMOVE_AFTER -print0 | \
+xargs -r0 rm -f
+
+exit 0
index ca864e9..d81b944 100755 (executable)
@@ -28,13 +28,10 @@ inheritance from parent classes requires it.
 
 
 import os
-import errno
 import sys
 import SocketServer
 import time
 import collections
-import Queue
-import random
 import signal
 import logging
 
@@ -195,6 +192,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
 
   def send_message(self, msg):
     #print "sending", msg
+    # TODO: sendall is not guaranteed to send everything
     self.request.sendall(msg + self.EOM)
 
 
@@ -213,6 +211,13 @@ class ClientOps:
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
       return queue.SubmitJob(ops)
 
+    if method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Received multiple jobs")
+      jobs = []
+      for ops in args:
+        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+      return queue.SubmitManyJobs(jobs)
+
     elif method == luxi.REQ_CANCEL_JOB:
       job_id = args
       logging.info("Received job cancel request for %s", job_id)
@@ -465,7 +470,6 @@ def main():
   """Main function"""
 
   options, args = ParseOptions()
-  utils.debug = options.debug
   utils.no_fork = True
 
   if options.fork:
@@ -516,7 +520,7 @@ def main():
     rpc.Init()
     try:
       # activate ip
-      master_node = ssconf.SimpleConfigReader().GetMasterNode()
+      master_node = ssconf.SimpleStore().GetMasterNode()
       if not rpc.RpcRunner.call_node_start_master(master_node, False, False):
         logging.error("Can't activate master IP address")
 
index 36e8e70..cc60f87 100755 (executable)
@@ -26,9 +26,7 @@
 
 import os
 import sys
-import traceback
 import SocketServer
-import errno
 import logging
 import signal
 
@@ -754,7 +752,6 @@ def main():
   global queue_lock
 
   options, args = ParseOptions()
-  utils.debug = options.debug
 
   if options.fork:
     utils.CloseFDs()
@@ -762,13 +759,9 @@ def main():
   for fname in (constants.SSL_CERT_FILE,):
     if not os.path.isfile(fname):
       print "config %s not there, will not run." % fname
-      sys.exit(5)
+      sys.exit(constants.EXIT_NOTCLUSTER)
 
-  try:
-    port = utils.GetNodeDaemonPort()
-  except errors.ConfigurationError, err:
-    print "Cluster configuration incomplete: '%s'" % str(err)
-    sys.exit(5)
+  port = utils.GetNodeDaemonPort()
 
   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
   dirs.append((constants.LOG_OS_DIR, 0750))
index 7b9710a..2288f17 100755 (executable)
@@ -206,17 +206,20 @@ def ParseOptions():
   parser.add_option("-f", "--foreground", dest="fork",
                     help="Don't detach from the current terminal",
                     default=True, action="store_false")
+  parser.add_option("-b", "--bind", dest="bind_address",
+                     help="Bind address",
+                     default="", metavar="ADDRESS")
 
   options, args = parser.parse_args()
 
   if len(args) != 0:
     print >> sys.stderr, "Usage: %s [-d] [-p port]" % sys.argv[0]
-    sys.exit(1)
+    sys.exit(constants.EXIT_FAILURE)
 
   if options.ssl and not (options.ssl_cert and options.ssl_key):
     print >> sys.stderr, ("For secure mode please provide "
-                         "--ssl-key and --ssl-cert arguments")
-    sys.exit(1)
+                          "--ssl-key and --ssl-cert arguments")
+    sys.exit(constants.EXIT_FAILURE)
 
   return options, args
 
@@ -237,7 +240,7 @@ def main():
                                       ssl_cert_path=options.ssl_cert)
     except Exception, err:
       sys.stderr.write("Can't load the SSL certificate/key: %s\n" % (err,))
-      sys.exit(1)
+      sys.exit(constants.EXIT_FAILURE)
   else:
     ssl_params = None
 
@@ -252,7 +255,7 @@ def main():
   utils.WritePidFile(constants.RAPI_PID)
   try:
     mainloop = daemon.Mainloop()
-    server = RemoteApiHttpServer(mainloop, "", options.port,
+    server = RemoteApiHttpServer(mainloop, options.bind_address, options.port,
                                  ssl_params=ssl_params, ssl_verify_peer=False,
                                  request_executor_class=
                                  JsonErrorRequestExecutor)
index 155411a..6d7d9b0 100644 (file)
@@ -1,3 +1,7 @@
 PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin
-# restart failed instances
+
+# Restart failed instances (every 5 minutes)
 */5 * * * * root [ -x @SBINDIR@/ganeti-watcher ] && @SBINDIR@/ganeti-watcher
+
+# Clean job archive (at 01:45 AM)
+45 1 * * * root [ -x @SBINDIR@/ganeti-cleaner ] && @SBINDIR@/ganeti-cleaner
index b346530..d77595e 100644 (file)
@@ -3,12 +3,12 @@
 # based on skeleton from Debian GNU/Linux
 ### BEGIN INIT INFO
 # Provides:          ganeti
-# Required-Start:    $syslog $remote_fs xend
-# Required-Stop:     $syslog $remote_fs xend
+# Required-Start:    $syslog $remote_fs
+# Required-Stop:     $syslog $remote_fs
 # Default-Start:     2 3 4 5
 # Default-Stop:      0 1 6
-# Short-Description: Ganeti Xen Cluster Manager
-# Description:       Ganeti Xen Cluster Manager
+# Short-Description: Ganeti Cluster Manager
+# Description:       Ganeti Cluster Manager
 ### END INIT INFO
 
 PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin
@@ -18,24 +18,18 @@ GANETIRUNDIR="@LOCALSTATEDIR@/run/ganeti"
 
 GANETI_DEFAULTS_FILE="@SYSCONFDIR@/default/ganeti"
 
-NODED_NAME="ganeti-noded"
-NODED="@PREFIX@/sbin/${NODED_NAME}"
-NODED_PID="${GANETIRUNDIR}/${NODED_NAME}.pid"
+NODED="ganeti-noded"
 NODED_ARGS=""
 
-MASTERD_NAME="ganeti-masterd"
-MASTERD="@PREFIX@/sbin/${MASTERD_NAME}"
-MASTERD_PID="${GANETIRUNDIR}/${MASTERD_NAME}.pid"
+MASTERD="ganeti-masterd"
 MASTERD_ARGS=""
 
-RAPI_NAME="ganeti-rapi"
-RAPI="@PREFIX@/sbin/${RAPI_NAME}"
-RAPI_PID="${GANETIRUNDIR}/${RAPI_NAME}.pid"
+RAPI="ganeti-rapi"
 RAPI_ARGS=""
 
 SCRIPTNAME="@SYSCONFDIR@/init.d/ganeti"
 
-test -f $NODED || exit 0
+test -f "@PREFIX@/sbin/$NODED" || exit 0
 
 . /lib/lsb/init-functions
 
@@ -71,47 +65,66 @@ check_exitcode() {
 }
 
 start_action() {
-    # called as start_action daemon pidfile
+    # called as start_action daemon-name
     local daemon="$1"; shift
-    local pidfile="$1"; shift
     log_action_begin_msg "$daemon"
-    start-stop-daemon --start --quiet --exec "$daemon" --pidfile "$pidfile" \
+    start-stop-daemon --start --quiet \
+        --pidfile "${GANETIRUNDIR}/${daemon}.pid" \
+        --startas "@PREFIX@/sbin/$daemon" \
+        --oknodo \
         -- "$@"
     check_exitcode $?
 }
 
 stop_action() {
-    # called as stop_action daemon pidfile
-    log_action_begin_msg "$1"
+    # called as stop_action daemon-name
+    local daemon="$1"
+    log_action_begin_msg "$daemon"
     start-stop-daemon --stop --quiet --oknodo \
-        --retry 30 --pidfile "$2"
+        --retry 30 --pidfile "${GANETIRUNDIR}/${daemon}.pid"
     check_exitcode $?
 }
 
+maybe_do() {
+    requested="$1"; shift
+    action="$1"; shift
+    target="$1"
+    if [ -z "$requested" -o "$requested" = "$target" ]; then
+        $action "$@"
+    fi
+}
+
+if [ -n "$2" -a \
+    "$2" != "$NODED" -a \
+    "$2" != "$MASTERD" -a \
+    "$2" != "$RAPI" ]; then
+    log_failure_msg "Unknown daemon '$2' requested"
+    exit 1
+fi
 
 case "$1" in
     start)
-        log_daemon_msg "Starting $DESC" "$NAME"
+        log_daemon_msg "Starting $DESC" "$2"
         check_config
-        start_action $NODED $NODED_PID $NODED_ARGS
-        start_action $MASTERD $MASTERD_PID $MASTERD_ARGS
-        start_action $RAPI $RAPI_PID $RAPI_ARGS
+        maybe_do "$2" start_action $NODED $NODED_ARGS
+        maybe_do "$2" start_action $MASTERD $MASTERD_ARGS
+        maybe_do "$2" start_action $RAPI $RAPI_ARGS
         ;;
     stop)
-        log_daemon_msg "Stopping $DESC" "$NAME"
-        stop_action $RAPI $RAPI_PID
-        stop_action $MASTERD $MASTERD_PID
-        stop_action $NODED $NODED_PID
+        log_daemon_msg "Stopping $DESC" "$2"
+        maybe_do "$2" stop_action $RAPI
+        maybe_do "$2" stop_action $MASTERD
+        maybe_do "$2" stop_action $NODED
         ;;
     restart|force-reload)
-        log_daemon_msg "Reloading $DESC"
-        stop_action $RAPI $RAPI_PID
-        stop_action $MASTERD $MASTERD_PID
-        stop_action $NODED $NODED_PID
+        log_daemon_msg "Reloading $DESC" "$2"
+        maybe_do "$2" stop_action $RAPI
+        maybe_do "$2" stop_action $MASTERD
+        maybe_do "$2" stop_action $NODED
         check_config
-        start_action $NODED $NODED_PID
-        start_action $MASTERD $MASTERD_PID
-        start_action $RAPI $RAPI_PID
+        maybe_do "$2" start_action $NODED $NODED_ARGS
+        maybe_do "$2" start_action $MASTERD $MASTERD_ARGS
+        maybe_do "$2" start_action $RAPI $RAPI_ARGS
         ;;
     *)
         log_success_msg "Usage: $SCRIPTNAME {start|stop|force-reload|restart}"
diff --git a/doc/examples/hooks/ethers b/doc/examples/hooks/ethers
new file mode 100755 (executable)
index 0000000..387ed7e
--- /dev/null
@@ -0,0 +1,88 @@
+#!/bin/bash
+
+# Copyright (C) 2009 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# This is an example ganeti hook that writes the instance mac addresses in the
+# node's /etc/ether file. It will pic up the first nic connected to the
+# TARGET_BRIDGE bridge, and write it down with the syntax "MAC INSTANCE_NAME".
+
+# The hook will also send a HUP signal the daemon whose PID is in
+# DAEMON_PID_FILE, so that it can load the new /etc/ethers file and use it.
+# This has been tested in conjunction with dnsmasq's dhcp implementation.
+
+# It will also remove any other occurrences for the same instance in the
+# aformentioned file. This hook supports the "instance-add", "instance-modify"
+# "instance-remove", and "instance-mirror-replace" ganeti post hook paths. To
+# install it add a symlink from those hooks' directories to where this file is
+# installed (with a mode which permits execution).
+
+# TARGET_BRIDGE: We'll only add the first nic which gets connected to this
+# bridge to /etc/ethers.
+TARGET_BRIDGE="br0"
+DAEMON_PID_FILE="/var/run/dnsmasq.pid"
+
+# In order to handle concurrent execution of this lock, we use the $LOCKFILE.
+# LOCKFILE_CREATE and LOCKFILE_REMOVE are the path names for the lockfile-progs
+# programs which we use as helpers.
+LOCKFILE="/var/lock/ganeti_ethers"
+LOCKFILE_CREATE="/usr/bin/lockfile-create"
+LOCKFILE_REMOVE="/usr/bin/lockfile-remove"
+
+hooks_path=$GANETI_HOOKS_PATH
+[ -n "$hooks_path" ] || exit 1
+instance=$GANETI_INSTANCE_NAME
+[ -n "$instance" ] || exit 1
+nic_count=$GANETI_INSTANCE_NIC_COUNT
+
+acquire_lockfile() {
+  $LOCKFILE_CREATE $LOCKFILE || exit 1
+  trap "$LOCKFILE_REMOVE $LOCKFILE" EXIT
+}
+
+update_ethers_from_new() {
+  chmod 644 /etc/ethers.new
+  mv /etc/ethers.new /etc/ethers
+  [ -f "$DAEMON_PID_FILE" ] && kill -HUP $(< $DAEMON_PID_FILE)
+}
+
+if [ "$hooks_path" = "instance-add" -o \
+     "$hooks_path" = "instance-modify" -o \
+     "$hooks_path" = "instance-mirror-replace" ]
+then
+  for i in $(seq 0 $((nic_count - 1)) ); do
+    bridge_var="GANETI_INSTANCE_NIC${i}_BRIDGE"
+    bridge=${!bridge_var}
+    if [ -n "$bridge" -a "$bridge" = "$TARGET_BRIDGE" ]; then
+      mac_var="GANETI_INSTANCE_NIC${i}_MAC"
+      mac=${!mac_var}
+      acquire_lockfile
+      cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/;
+      END {print \"$mac\t$instance\"}" > /etc/ethers.new
+      update_ethers_from_new
+      break
+    fi
+  done
+fi
+if [ "$hooks_path" = "instance-remove" -o \
+       \( "$hooks_path" = "instance-modify" -a "$nic_count" -eq 0 \) ]; then
+  acquire_lockfile
+  cat /etc/ethers | awk -- "! /^([[:xdigit:]:]*)[[:blank:]]+$instance\>/" \
+    > /etc/ethers.new
+  update_ethers_from_new
+fi
+
index 7dbe7d5..b2f05ce 100644 (file)
@@ -104,7 +104,7 @@ The scripts will be run as follows:
   be left
 
 
-All informations about the cluster is passed using environment
+All information about the cluster is passed using environment
 variables. Different operations will have sligthly different
 environments, but most of the variables are common.
 
index f4b8bfc..408b908 100644 (file)
@@ -176,7 +176,7 @@ instances
 
 nodes
   dictionary with the data for the nodes in the cluster, indexed by
-  the node name; the dict contains:
+  the node name; the dict contains [*]_ :
 
   total_disk
     the total disk size of this node (mebibytes)
@@ -225,15 +225,19 @@ nodes
   or ``offline`` flags set. More details about these of node status
   flags is available in the manpage *ganeti(7)*.
 
+.. [*] Note that no run-time data is present for offline or drained nodes;
+   this means the tags total_memory, reserved_memory, free_memory, total_disk,
+   free_disk, total_cpus, i_pri_memory and i_pri_up memory will be absent
 
-Respone message
-~~~~~~~~~~~~~~~
+
+Response message
+~~~~~~~~~~~~~~~~
 
 The response message is much more simple than the input one. It is
 also a dict having three keys:
 
 success
-  a boolean value denoting if the allocation was successfull or not
+  a boolean value denoting if the allocation was successful or not
 
 info
   a string with information from the scripts; if the allocation fails,
index 2ef35fb..f4e308e 100644 (file)
 # 02110-1301, USA.
 
 
-"""Functions used by the node daemon"""
+"""Functions used by the node daemon
+
+@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
+     the L{UploadFile} function
+
+"""
 
 
 import os
@@ -115,6 +120,23 @@ def _CleanDirectory(path, exclude=None):
       utils.RemoveFile(full_name)
 
 
+def _BuildUploadFileList():
+  """Build the list of allowed upload files.
+
+  This is abstracted so that it's built only once at module import time.
+
+  """
+  return frozenset([
+      constants.CLUSTER_CONF_FILE,
+      constants.ETC_HOSTS,
+      constants.SSH_KNOWN_HOSTS_FILE,
+      constants.VNC_PASSWORD_FILE,
+      ])
+
+
+_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
+
+
 def JobQueuePurge():
   """Removes job queue files and archived jobs.
 
@@ -141,7 +163,7 @@ def GetMasterInfo():
     master_netdev = cfg.GetMasterNetdev()
     master_ip = cfg.GetMasterIP()
     master_node = cfg.GetMasterNode()
-  except errors.ConfigurationError, err:
+  except errors.ConfigurationError:
     logging.exception("Cluster configuration incomplete")
     return (None, None, None)
   return (master_netdev, master_ip, master_node)
@@ -320,7 +342,7 @@ def LeaveCluster():
 
 
 def GetNodeInfo(vgname, hypervisor_type):
-  """Gives back a hash with different informations about the node.
+  """Gives back a hash with different information about the node.
 
   @type vgname: C{string}
   @param vgname: the name of the volume group to ask for disk space information
@@ -585,7 +607,7 @@ def GetInstanceList(hypervisor_list):
     try:
       names = hypervisor.GetHypervisor(hname).ListInstances()
       results.extend(names)
-    except errors.HypervisorError, err:
+    except errors.HypervisorError:
       logging.exception("Error enumerating instances for hypevisor %s", hname)
       raise
 
@@ -593,7 +615,7 @@ def GetInstanceList(hypervisor_list):
 
 
 def GetInstanceInfo(instance, hname):
-  """Gives back the informations about an instance as a dictionary.
+  """Gives back the information about an instance as a dictionary.
 
   @type instance: string
   @param instance: the instance name
@@ -758,7 +780,7 @@ def RunRenameInstance(instance, old_name):
 
 
 def _GetVGInfo(vg_name):
-  """Get informations about the volume group.
+  """Get information about the volume group.
 
   @type vg_name: str
   @param vg_name: the volume group which we query
@@ -930,7 +952,7 @@ def InstanceShutdown(instance):
   # test every 10secs for 2min
 
   time.sleep(1)
-  for dummy in range(11):
+  for _ in range(11):
     if instance.name not in GetInstanceList([hv_name]):
       break
     time.sleep(10)
@@ -1044,7 +1066,7 @@ def AcceptInstance(instance, info, target):
     msg = "Failed to accept instance"
     logging.exception(msg)
     return (False, '%s: %s' % (msg, err))
-  return (True, "Accept successfull")
+  return (True, "Accept successful")
 
 
 def FinalizeMigration(instance, info, success):
@@ -1092,7 +1114,7 @@ def MigrateInstance(instance, target, live):
     msg = "Failed to migrate instance"
     logging.exception(msg)
     return (False, "%s: %s" % (msg, err))
-  return (True, "Migration successfull")
+  return (True, "Migration successful")
 
 
 def BlockdevCreate(disk, size, owner, on_primary, info):
@@ -1285,7 +1307,7 @@ def BlockdevAssemble(disk, owner, as_primary):
 def BlockdevShutdown(disk):
   """Shut down a block device.
 
-  First, if the device is assembled (Attach() is successfull), then
+  First, if the device is assembled (Attach() is successful), then
   the device is shutdown. Then the children of the device are
   shutdown.
 
@@ -1403,7 +1425,7 @@ def BlockdevGetmirrorstatus(disks):
 def _RecursiveFindBD(disk):
   """Check if a device is activated.
 
-  If so, return informations about the real device.
+  If so, return information about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk object we need to find
@@ -1423,7 +1445,7 @@ def _RecursiveFindBD(disk):
 def BlockdevFind(disk):
   """Check if a device is activated.
 
-  If it is, return informations about the real device.
+  If it is, return information about the real device.
 
   @type disk: L{objects.Disk}
   @param disk: the disk to find
@@ -1498,14 +1520,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
                   file_name)
     return False
 
-  allowed_files = [
-    constants.CLUSTER_CONF_FILE,
-    constants.ETC_HOSTS,
-    constants.SSH_KNOWN_HOSTS_FILE,
-    constants.VNC_PASSWORD_FILE,
-    ]
-
-  if file_name not in allowed_files:
+  if file_name not in _ALLOWED_UPLOAD_FILES:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
     return False
@@ -2062,7 +2077,7 @@ def BlockdevRename(devlist):
         # but we don't have the owner here - maybe parse from existing
         # cache? for now, we only lose lvm data when we rename, which
         # is less critical than DRBD or MD
-    except errors.BlockDeviceError, err:
+    except errors.BlockDeviceError:
       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
       result = False
   return result
@@ -2132,7 +2147,7 @@ def RemoveFileStorageDir(file_storage_dir):
   @param file_storage_dir: the directory we should cleanup
   @rtype: tuple (success,)
   @return: tuple of one element, C{success}, denoting
-      whether the operation was successfull
+      whether the operation was successful
 
   """
   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
@@ -2147,7 +2162,7 @@ def RemoveFileStorageDir(file_storage_dir):
       # deletes dir only if empty, otherwise we want to return False
       try:
         os.rmdir(file_storage_dir)
-      except OSError, err:
+      except OSError:
         logging.exception("Cannot remove file storage directory '%s'",
                           file_storage_dir)
         result = False,
@@ -2176,7 +2191,7 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
       if os.path.isdir(old_file_storage_dir):
         try:
           os.rename(old_file_storage_dir, new_file_storage_dir)
-        except OSError, err:
+        except OSError:
           logging.exception("Cannot rename '%s' to '%s'",
                             old_file_storage_dir, new_file_storage_dir)
           result =  False,
@@ -2582,7 +2597,7 @@ class HooksRunner(object):
     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
     try:
       dir_contents = utils.ListVisibleFiles(dir_name)
-    except OSError, err:
+    except OSError:
       # FIXME: must log output in case of failures
       return rr
 
@@ -2705,7 +2720,7 @@ class DevCacheManager(object):
     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
     try:
       utils.WriteFile(fpath, data=fdata)
-    except EnvironmentError, err:
+    except EnvironmentError:
       logging.exception("Can't update bdev cache for %s", dev_path)
 
   @classmethod
@@ -2727,5 +2742,5 @@ class DevCacheManager(object):
     fpath = cls._ConvertPath(dev_path)
     try:
       utils.RemoveFile(fpath)
-    except EnvironmentError, err:
+    except EnvironmentError:
       logging.exception("Can't update bdev cache for %s", dev_path)
index 5c2c8bf..131fbcf 100644 (file)
@@ -161,7 +161,7 @@ class BlockDev(object):
     """Remove this device.
 
     This makes sense only for some of the device types: LV and file
-    storeage. Also note that if the device can't attach, the removal
+    storage. Also note that if the device can't attach, the removal
     can't be completed.
 
     """
@@ -486,7 +486,7 @@ class LogicalVolume(BlockDev):
   def Assemble(self):
     """Assemble the device.
 
-    We alway run `lvchange -ay` on the LV to ensure it's active before
+    We always run `lvchange -ay` on the LV to ensure it's active before
     use, as there were cases when xenvg was not active after boot
     (also possibly after disk issues).
 
@@ -1310,14 +1310,14 @@ class DRBD8(BaseDRBD):
 
 
     If sync_percent is None, it means all is ok
-    If estimated_time is None, it means we can't esimate
+    If estimated_time is None, it means we can't estimate
     the time needed, otherwise it's the time left in seconds.
 
 
     We set the is_degraded parameter to True on two conditions:
     network not connected or local disk missing.
 
-    We compute the ldisk parameter based on wheter we have a local
+    We compute the ldisk parameter based on whether we have a local
     disk or not.
 
     @rtype: tuple
@@ -1387,14 +1387,14 @@ class DRBD8(BaseDRBD):
 
     ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor)
     timeout_limit = time.time() + self._NET_RECONFIG_TIMEOUT
-    sleep_time = 0.100 # we start the retry time at 100 miliseconds
+    sleep_time = 0.100 # we start the retry time at 100 milliseconds
     while time.time() < timeout_limit:
       status = self.GetProcStatus()
       if status.is_standalone:
         break
       # retry the disconnect, it seems possible that due to a
       # well-time disconnect on the peer, my disconnect command might
-      # be ingored and forgotten
+      # be ignored and forgotten
       ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor) or \
                           ever_disconnected
       time.sleep(sleep_time)
@@ -1700,7 +1700,7 @@ class FileStorage(BlockDev):
   def Shutdown(self):
     """Shutdown the device.
 
-    This is a no-op for the file type, as we don't deacivate
+    This is a no-op for the file type, as we don't deactivate
     the file on shutdown.
 
     """
index 3f96561..4fce0d3 100644 (file)
@@ -79,24 +79,27 @@ def _GenerateSelfSignedSslCert(file_name, validity=(365 * 5)):
   """
   (fd, tmp_file_name) = tempfile.mkstemp(dir=os.path.dirname(file_name))
   try:
-    # Set permissions before writing key
-    os.chmod(tmp_file_name, 0600)
-
-    result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
-                           "-days", str(validity), "-nodes", "-x509",
-                           "-keyout", tmp_file_name, "-out", tmp_file_name,
-                           "-batch"])
-    if result.failed:
-      raise errors.OpExecError("Could not generate SSL certificate, command"
-                               " %s had exitcode %s and error message %s" %
-                               (result.cmd, result.exit_code, result.output))
-
-    # Make read-only
-    os.chmod(tmp_file_name, 0400)
-
-    os.rename(tmp_file_name, file_name)
+    try:
+      # Set permissions before writing key
+      os.chmod(tmp_file_name, 0600)
+
+      result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
+                             "-days", str(validity), "-nodes", "-x509",
+                             "-keyout", tmp_file_name, "-out", tmp_file_name,
+                             "-batch"])
+      if result.failed:
+        raise errors.OpExecError("Could not generate SSL certificate, command"
+                                 " %s had exitcode %s and error message %s" %
+                                 (result.cmd, result.exit_code, result.output))
+
+      # Make read-only
+      os.chmod(tmp_file_name, 0400)
+
+      os.rename(tmp_file_name, file_name)
+    finally:
+      utils.RemoveFile(tmp_file_name)
   finally:
-    utils.RemoveFile(tmp_file_name)
+    os.close(fd)
 
 
 def _InitGanetiServerSetup():
@@ -123,7 +126,8 @@ def _InitGanetiServerSetup():
 def InitCluster(cluster_name, mac_prefix, def_bridge,
                 master_netdev, file_storage_dir, candidate_pool_size,
                 secondary_ip=None, vg_name=None, beparams=None, hvparams=None,
-                enabled_hypervisors=None, default_hypervisor=None):
+                enabled_hypervisors=None, default_hypervisor=None,
+                modify_etc_hosts=True):
   """Initialise the cluster.
 
   @type candidate_pool_size: int
@@ -134,6 +138,14 @@ def InitCluster(cluster_name, mac_prefix, def_bridge,
   if config.ConfigWriter.IsCluster():
     raise errors.OpPrereqError("Cluster is already initialised")
 
+  if not enabled_hypervisors:
+    raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+                               " least one member")
+  invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
+  if invalid_hvs:
+    raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+                               " entries: %s" % invalid_hvs)
+
   hostname = utils.HostInfo()
 
   if hostname.ip.startswith("127."):
@@ -225,7 +237,9 @@ def InitCluster(cluster_name, mac_prefix, def_bridge,
     f.close()
   sshkey = sshline.split(" ")[1]
 
-  utils.AddHostToEtcHosts(hostname.name)
+  if modify_etc_hosts:
+    utils.AddHostToEtcHosts(hostname.name)
+
   _InitSSHSetup()
 
   # init of cluster config file
@@ -247,6 +261,7 @@ def InitCluster(cluster_name, mac_prefix, def_bridge,
     beparams={constants.BEGR_DEFAULT: beparams},
     hvparams=hvparams,
     candidate_pool_size=candidate_pool_size,
+    modify_etc_hosts=modify_etc_hosts,
     )
   master_node_config = objects.Node(name=hostname.name,
                                     primary_ip=hostname.ip,
@@ -483,7 +498,7 @@ def GatherMasterVotes(node_list):
 
   @type node_list: list
   @param node_list: the list of nodes to query for master info; the current
-      node wil be removed if it is in the list
+      node will be removed if it is in the list
   @rtype: list
   @return: list of (node, votes)
 
index d351f2f..8ba7198 100644 (file)
@@ -41,6 +41,7 @@ from ganeti import rpc
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
                       Option, OptionValueError)
 
+
 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "SubmitOpCode", "GetClient",
            "cli_option", "ikv_option", "keyval_option",
@@ -55,6 +56,7 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            ]
 
 
+
 def _ExtractTagsObject(opts, args):
   """Extract the tag type object.
 
@@ -120,7 +122,7 @@ def ListTags(opts, args):
   result = list(result)
   result.sort()
   for tag in result:
-    print tag
+    ToStdout(tag)
 
 
 def AddTags(opts, args):
@@ -320,7 +322,7 @@ keyval_option = KeyValOption
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
 
-  This function parses the arguements and returns the function which
+  This function parses the arguments and returns the function which
   must be executed together with its (modified) arguments.
 
   @param argv: the command line
@@ -335,7 +337,7 @@ def _ParseArgs(argv, commands, aliases):
     binary = argv[0].split("/")[-1]
 
   if len(argv) > 1 and argv[1] == "--version":
-    print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
+    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
     # Quit right away. That way we don't have to care about this special
     # argument. optparse.py does it the same.
     sys.exit(0)
@@ -345,22 +347,27 @@ def _ParseArgs(argv, commands, aliases):
     # let's do a nice thing
     sortedcmds = commands.keys()
     sortedcmds.sort()
-    print ("Usage: %(bin)s {command} [options...] [argument...]"
-           "\n%(bin)s <command> --help to see details, or"
-           " man %(bin)s\n" % {"bin": binary})
+
+    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
+    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
+    ToStdout("")
+
     # compute the max line length for cmd + usage
     mlen = max([len(" %s" % cmd) for cmd in commands])
     mlen = min(60, mlen) # should not get here...
+
     # and format a nice command list
-    print "Commands:"
+    ToStdout("Commands:")
     for cmd in sortedcmds:
       cmdstr = " %s" % (cmd,)
       help_text = commands[cmd][4]
-      help_lines = textwrap.wrap(help_text, 79-3-mlen)
-      print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
+      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
+      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
       for line in help_lines:
-        print "%-*s   %s" % (mlen, "", line)
-    print
+        ToStdout("%-*s   %s", mlen, "", line)
+
+    ToStdout("")
+
     return None, None, None
 
   # get command, unalias it, and look it up in commands
@@ -385,15 +392,13 @@ def _ParseArgs(argv, commands, aliases):
   options, args = parser.parse_args()
   if nargs is None:
     if len(args) != 0:
-      print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
+      ToStderr("Error: Command %s expects no arguments", cmd)
       return None, None, None
   elif nargs < 0 and len(args) != -nargs:
-    print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
-                         (cmd, -nargs))
+    ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
     return None, None, None
   elif nargs >= 0 and len(args) < nargs:
-    print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
-                         (cmd, nargs))
+    ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
     return None, None, None
 
   return func, options, args
@@ -438,10 +443,10 @@ def AskUser(text, choices=None):
     choices = [('y', True, 'Perform the operation'),
                ('n', False, 'Do not perform the operation')]
   if not choices or not isinstance(choices, list):
-    raise errors.ProgrammerError("Invalid choiches argument to AskUser")
+    raise errors.ProgrammerError("Invalid choices argument to AskUser")
   for entry in choices:
     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
-      raise errors.ProgrammerError("Invalid choiches element to AskUser")
+      raise errors.ProgrammerError("Invalid choices element to AskUser")
 
   answer = choices[-1][1]
   new_text = []
@@ -539,7 +544,7 @@ def PollJob(job_id, cl=None, feedback_fn=None):
           feedback_fn(log_entry[1:])
         else:
           encoded = utils.SafeEncode(message)
-          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
+          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
@@ -735,8 +740,6 @@ def GenericMain(commands, override=None, aliases=None):
   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
                      stderr_logging=True, program=binary)
 
-  utils.debug = options.debug
-
   if old_cmdline:
     logging.info("run with arguments '%s'", old_cmdline)
   else:
@@ -747,7 +750,7 @@ def GenericMain(commands, override=None, aliases=None):
   except (errors.GenericError, luxi.ProtocolError,
           JobSubmittedException), err:
     result, err_msg = FormatError(err)
-    logging.exception("Error durring command processing")
+    logging.exception("Error during command processing")
     ToStderr(err_msg)
 
   return result
@@ -994,15 +997,23 @@ class JobExecutor(object):
       cl = GetClient()
     self.cl = cl
     self.verbose = verbose
+    self.jobs = []
 
   def QueueJob(self, name, *ops):
-    """Submit a job for execution.
+    """Record a job for later submit.
 
     @type name: string
     @param name: a description of the job, will be used in WaitJobSet
     """
-    job_id = SendJob(ops, cl=self.cl)
-    self.queue.append((job_id, name))
+    self.queue.append((name, ops))
+
+  def SubmitPending(self):
+    """Submit all pending jobs.
+
+    """
+    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+    for ((status, data), (name, _)) in zip(results, self.queue):
+      self.jobs.append((status, data, name))
 
   def GetResults(self):
     """Wait for and return the results of all jobs.
@@ -1013,10 +1024,18 @@ class JobExecutor(object):
         there will be the error message
 
     """
+    if not self.jobs:
+      self.SubmitPending()
     results = []
     if self.verbose:
-      ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
-    for jid, name in self.queue:
+      ok_jobs = [row[1] for row in self.jobs if row[0]]
+      if ok_jobs:
+        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
+    for submit_status, jid, name in self.jobs:
+      if not submit_status:
+        ToStderr("Failed to submit job for %s: %s", name, jid)
+        results.append((False, jid))
+        continue
       if self.verbose:
         ToStdout("Waiting for job %s for %s...", jid, name)
       try:
@@ -1041,5 +1060,10 @@ class JobExecutor(object):
     if wait:
       return self.GetResults()
     else:
-      for jid, name in self.queue:
-        ToStdout("%s: %s", jid, name)
+      if not self.jobs:
+        self.SubmitPending()
+      for status, result, name in self.jobs:
+        if status:
+          ToStdout("%s: %s", result, name)
+        else:
+          ToStderr("Failure for %s: %s", name, result)
index a9b3bac..bc46ba4 100644 (file)
 import os
 import os.path
 import time
-import tempfile
 import re
 import platform
 import logging
 import copy
-import random
 
 from ganeti import ssh
 from ganeti import utils
@@ -40,7 +38,6 @@ from ganeti import hypervisor
 from ganeti import locking
 from ganeti import constants
 from ganeti import objects
-from ganeti import opcodes
 from ganeti import serializer
 from ganeti import ssconf
 
@@ -68,7 +65,7 @@ class LogicalUnit(object):
   def __init__(self, processor, op, context, rpc):
     """Constructor for LogicalUnit.
 
-    This needs to be overriden in derived classes in order to check op
+    This needs to be overridden in derived classes in order to check op
     validity.
 
     """
@@ -116,7 +113,7 @@ class LogicalUnit(object):
     CheckPrereq, doing these separate is better because:
 
       - ExpandNames is left as as purely a lock-related function
-      - CheckPrereq is run after we have aquired locks (and possible
+      - CheckPrereq is run after we have acquired locks (and possible
         waited for them)
 
     The function is allowed to change the self.op attribute so that
@@ -454,7 +451,7 @@ def _CheckNodeNotDrained(lu, node):
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           memory, vcpus, nics, disk_template, disks,
-                          bep, hvp, hypervisor):
+                          bep, hvp, hypervisor_name):
   """Builds instance related env variables for hooks
 
   This builds the hook environment from individual variables.
@@ -477,15 +474,15 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param nics: list of tuples (ip, bridge, mac) representing
       the NICs the instance  has
   @type disk_template: string
-  @param disk_template: the distk template of the instance
+  @param disk_template: the disk template of the instance
   @type disks: list
   @param disks: the list of (size, mode) pairs
   @type bep: dict
   @param bep: the backend parameters for the instance
   @type hvp: dict
   @param hvp: the hypervisor parameters for the instance
-  @type hypervisor: string
-  @param hypervisor: the hypervisor for the instance
+  @type hypervisor_name: string
+  @param hypervisor_name: the hypervisor for the instance
   @rtype: dict
   @return: the hook environment for this instance
 
@@ -504,7 +501,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
     "INSTANCE_DISK_TEMPLATE": disk_template,
-    "INSTANCE_HYPERVISOR": hypervisor,
+    "INSTANCE_HYPERVISOR": hypervisor_name,
   }
 
   if nics:
@@ -568,7 +565,7 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
     'disks': [(disk.size, disk.mode) for disk in instance.disks],
     'bep': bep,
     'hvp': hvp,
-    'hypervisor': instance.hypervisor,
+    'hypervisor_name': instance.hypervisor,
   }
   if override:
     args.update(override)
@@ -592,10 +589,10 @@ def _AdjustCandidatePool(lu):
 
 
 def _CheckInstanceBridgesExist(lu, instance):
-  """Check that the brigdes needed by an instance exist.
+  """Check that the bridges needed by an instance exist.
 
   """
-  # check bridges existance
+  # check bridges existence
   brlist = [nic.bridge for nic in instance.nics]
   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
   result.Raise()
@@ -616,7 +613,7 @@ class LUDestroyCluster(NoHooksLU):
 
     This checks whether the cluster is empty.
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     master = self.cfg.GetMasterNode()
@@ -669,7 +666,7 @@ class LUVerifyCluster(LogicalUnit):
     Test list:
 
       - compares ganeti version
-      - checks vg existance and size > 20G
+      - checks vg existence and size > 20G
       - checks config file checksum
       - checks ssh to other nodes
 
@@ -908,7 +905,7 @@ class LUVerifyCluster(LogicalUnit):
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
         if nodeinfo['mfree'] < needed_mem:
-          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
+          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
                       " failovers should node %s fail" % (node, prinode))
           bad = True
     return bad
@@ -927,7 +924,7 @@ class LUVerifyCluster(LogicalUnit):
   def BuildHooksEnv(self):
     """Build hooks env.
 
-    Cluster-Verify hooks just rone in the post phase and their failure makes
+    Cluster-Verify hooks just ran in the post phase and their failure makes
     the output be logged in the verify output and the verification to fail.
 
     """
@@ -1194,7 +1191,7 @@ class LUVerifyCluster(LogicalUnit):
     return not bad
 
   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
-    """Analize the post-hooks' result
+    """Analyze the post-hooks' result
 
     This method analyses the hook result, handles it, and sends some
     nicely-formatted feedback back to the user.
@@ -1293,7 +1290,6 @@ class LUVerifyDisks(NoHooksLU):
 
     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
 
-    to_act = set()
     for node in nodes:
       # node_volume
       lvs = node_lvs[node]
@@ -1536,7 +1532,7 @@ def _RecursiveCheckIfLVMBased(disk):
 
   @type disk: L{objects.Disk}
   @param disk: the disk to check
-  @rtype: booleean
+  @rtype: boolean
   @return: boolean indicating whether a LD_LV dev_type was found or not
 
   """
@@ -1642,6 +1638,13 @@ class LUSetClusterParams(LogicalUnit):
 
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
+      if not self.hv_list:
+        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+                                   " least one member")
+      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
+      if invalid_hvs:
+        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+                                   " entries: %s" % invalid_hvs)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -1937,7 +1940,7 @@ class LURemoveNode(LogicalUnit):
      - it does not have primary or secondary instances
      - it's not the master
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
@@ -2258,7 +2261,7 @@ class LUAddNode(LogicalUnit):
      - it is resolvable
      - its parameters (single/dual homed) matches the cluster
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     node_name = self.op.node_name
@@ -2312,7 +2315,7 @@ class LUAddNode(LogicalUnit):
         raise errors.OpPrereqError("The master has a private ip but the"
                                    " new node doesn't have one")
 
-    # checks reachablity
+    # checks reachability
     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
       raise errors.OpPrereqError("Node not reachable by ping")
 
@@ -2403,7 +2406,8 @@ class LUAddNode(LogicalUnit):
                                " new node: %s" % msg)
 
     # Add node to our /etc/hosts, and add key to known_hosts
-    utils.AddHostToEtcHosts(new_node.name)
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
       result = self.rpc.call_node_has_ip_address(new_node.name,
@@ -2523,12 +2527,16 @@ class LUSetNodeParams(LogicalUnit):
     """
     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
 
+    if (self.op.master_candidate is not None or
+        self.op.drained is not None or
+        self.op.offline is not None):
+      # we can't change the master's node flags
+      if self.op.node_name == self.cfg.GetMasterNode():
+        raise errors.OpPrereqError("The master role can be changed"
+                                   " only via masterfailover")
+
     if ((self.op.master_candidate == False or self.op.offline == True or
          self.op.drained == True) and node.master_candidate):
-      # we will demote the node from master_candidate
-      if self.op.node_name == self.cfg.GetMasterNode():
-        raise errors.OpPrereqError("The master node has to be a"
-                                   " master candidate, online and not drained")
       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
       num_candidates, _ = self.cfg.GetMasterCandidateStats()
       if num_candidates <= cp_size:
@@ -2635,14 +2643,15 @@ class LUQueryClusterInfo(NoHooksLU):
       "master": cluster.master_node,
       "default_hypervisor": cluster.default_hypervisor,
       "enabled_hypervisors": cluster.enabled_hypervisors,
-      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
-                        for hypervisor in cluster.enabled_hypervisors]),
+      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
+                        for hypervisor_name in cluster.enabled_hypervisors]),
       "beparams": cluster.beparams,
       "candidate_pool_size": cluster.candidate_pool_size,
       "default_bridge": cluster.default_bridge,
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "file_storage_dir": cluster.file_storage_dir,
+      "tags": list(cluster.GetTags()),
       }
 
     return result
@@ -2814,7 +2823,7 @@ def _StartInstanceDisks(lu, instance, force):
   """Start the disks of an instance.
 
   """
-  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
+  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
                                            ignore_secondaries=force)
   if not disks_ok:
     _ShutdownInstanceDisks(lu, instance)
@@ -3003,7 +3012,7 @@ class LUStartupInstance(LogicalUnit):
     _CheckNodeOnline(self, instance.primary_node)
 
     bep = self.cfg.GetClusterInfo().FillBE(instance)
-    # check bridges existance
+    # check bridges existence
     _CheckInstanceBridgesExist(self, instance)
 
     remote_info = self.rpc.call_instance_info(instance.primary_node,
@@ -3081,7 +3090,7 @@ class LURebootInstance(LogicalUnit):
 
     _CheckNodeOnline(self, instance.primary_node)
 
-    # check bridges existance
+    # check bridges existence
     _CheckInstanceBridgesExist(self, instance)
 
   def Exec(self, feedback_fn):
@@ -3752,7 +3761,7 @@ class LUFailoverInstance(LogicalUnit):
       self.LogInfo("Not checking memory on the secondary node as"
                    " instance will not be started")
 
-    # check bridge existance
+    # check bridge existence
     brlist = [nic.bridge for nic in instance.nics]
     result = self.rpc.call_bridges_exist(target_node, brlist)
     result.Raise()
@@ -3812,7 +3821,7 @@ class LUFailoverInstance(LogicalUnit):
       logging.info("Starting instance %s on node %s",
                    instance.name, target_node)
 
-      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
                                                ignore_secondaries=True)
       if not disks_ok:
         _ShutdownInstanceDisks(self, instance)
@@ -3890,7 +3899,7 @@ class LUMigrateInstance(LogicalUnit):
                          instance.name, i_be[constants.BE_MEMORY],
                          instance.hypervisor)
 
-    # check bridge existance
+    # check bridge existence
     brlist = [nic.bridge for nic in instance.nics]
     result = self.rpc.call_bridges_exist(target_node, brlist)
     if result.failed or not result.data:
@@ -4326,7 +4335,7 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    names = _GenerateUniqueNames(lu, [".disk%d" % i
+    names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
@@ -4343,7 +4352,7 @@ def _GenerateDiskTemplate(lu, template_name,
       [primary_node, remote_node] * len(disk_info), instance_name)
 
     names = []
-    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                                for i in range(disk_count)]):
       names.append(lv_prefix + "_data")
       names.append(lv_prefix + "_meta")
@@ -4611,6 +4620,12 @@ class LUCreateInstance(LogicalUnit):
         if not utils.IsValidMac(mac.lower()):
           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
                                      mac)
+        else:
+          # or validate/reserve the current one
+          if self.cfg.IsMacInUse(mac):
+            raise errors.OpPrereqError("MAC address %s already in use"
+                                       " in cluster" % mac)
+
       # bridge verification
       bridge = nic.get("bridge", None)
       if bridge is None:
@@ -4749,7 +4764,7 @@ class LUCreateInstance(LogicalUnit):
       disks=[(d["size"], d["mode"]) for d in self.disks],
       bep=self.be_full,
       hvp=self.hv_full,
-      hypervisor=self.op.hypervisor,
+      hypervisor_name=self.op.hypervisor,
     ))
 
     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
@@ -5568,7 +5583,6 @@ class LUReplaceDisks(LogicalUnit):
     logging.debug("Allocated minors %s" % (minors,))
     self.proc.LogStep(4, steps_total, "changing drbd configuration")
     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
-      size = dev.size
       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
       # create new devices on new_node; note that we create two IDs:
       # one without port, so the drbd will be activated without
@@ -6103,7 +6117,7 @@ class LUSetInstanceParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
-    force = self.force = self.op.force
+    self.force = self.op.force
 
     # checking the new params on the primary/secondary nodes
 
@@ -6426,7 +6440,7 @@ class LUExportInstance(LogicalUnit):
     # remove it from its current node. In the future we could fix this by:
     #  - making a tasklet to search (share-lock all), then create the new one,
     #    then one to remove, after
-    #  - removing the removal operation altoghether
+    #  - removing the removal operation altogether
     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def DeclareLocks(self, level):
@@ -6932,7 +6946,7 @@ class IAllocator(object):
         "master_candidate": ninfo.master_candidate,
         }
 
-      if not ninfo.offline:
+      if not (ninfo.offline or ninfo.drained):
         nresult.Raise()
         if not isinstance(nresult.data, dict):
           raise errors.OpExecError("Can't get data for node %s" % nname)
@@ -7089,7 +7103,6 @@ class IAllocator(object):
     """
     if call_fn is None:
       call_fn = self.lu.rpc.call_iallocator_runner
-    data = self.in_text
 
     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
     result.Raise()
index 4075156..58871d5 100644 (file)
@@ -273,6 +273,20 @@ class ConfigWriter:
     data = self._config_data
     seen_lids = []
     seen_pids = []
+
+    # global cluster checks
+    if not data.cluster.enabled_hypervisors:
+      result.append("enabled hypervisors list doesn't have any entries")
+    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
+    if invalid_hvs:
+      result.append("enabled hypervisors contains invalid entries: %s" %
+                    invalid_hvs)
+
+    if data.cluster.master_node not in data.nodes:
+      result.append("cluster has invalid primary node '%s'" %
+                    data.cluster.master_node)
+
+    # per-instance checks
     for instance_name in data.instances:
       instance = data.instances[instance_name]
       if instance.primary_node not in data.nodes:
@@ -474,8 +488,8 @@ class ConfigWriter:
     def _AppendUsedPorts(instance_name, disk, used):
       duplicates = []
       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
-        nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
-        for node, port in ((nodeA, minorA), (nodeB, minorB)):
+        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
+        for node, port in ((node_a, minor_a), (node_b, minor_b)):
           assert node in used, ("Node '%s' of instance '%s' not found"
                                 " in node list" % (node, instance_name))
           if port in used[node]:
@@ -796,7 +810,7 @@ class ConfigWriter:
                                     self._config_data.instances.keys())
 
   def _UnlockedGetInstanceInfo(self, instance_name):
-    """Returns informations about an instance.
+    """Returns information about an instance.
 
     This function is for internal use, when the config lock is already held.
 
@@ -808,9 +822,9 @@ class ConfigWriter:
 
   @locking.ssynchronized(_config_lock, shared=1)
   def GetInstanceInfo(self, instance_name):
-    """Returns informations about an instance.
+    """Returns information about an instance.
 
-    It takes the information from the configuration file. Other informations of
+    It takes the information from the configuration file. Other information of
     an instance are taken from the live systems.
 
     @param instance_name: name of the instance, e.g.
@@ -1150,32 +1164,6 @@ class ConfigWriter:
       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
       }
 
-  @locking.ssynchronized(_config_lock)
-  def InitConfig(self, version, cluster_config, master_node_config):
-    """Create the initial cluster configuration.
-
-    It will contain the current node, which will also be the master
-    node, and no instances.
-
-    @type version: int
-    @param version: Configuration version
-    @type cluster_config: objects.Cluster
-    @param cluster_config: Cluster configuration
-    @type master_node_config: objects.Node
-    @param master_node_config: Master node configuration
-
-    """
-    nodes = {
-      master_node_config.name: master_node_config,
-      }
-
-    self._config_data = objects.ConfigData(version=version,
-                                           cluster=cluster_config,
-                                           nodes=nodes,
-                                           instances={},
-                                           serial_no=1)
-    self._WriteConfig()
-
   @locking.ssynchronized(_config_lock, shared=1)
   def GetVGName(self):
     """Return the volume group name.
@@ -1208,7 +1196,7 @@ class ConfigWriter:
 
   @locking.ssynchronized(_config_lock, shared=1)
   def GetClusterInfo(self):
-    """Returns informations about the cluster
+    """Returns information about the cluster
 
     @rtype: L{objects.Cluster}
     @return: the cluster object
index 3949f56..e5dafb1 100644 (file)
@@ -224,6 +224,7 @@ DDM_REMOVE = 'remove'
 # common exit codes
 EXIT_SUCCESS = 0
 EXIT_FAILURE = 1
+EXIT_NOTCLUSTER = 5
 EXIT_NOTMASTER = 11
 EXIT_NODESETUP_ERROR = 12
 EXIT_CONFIRMATION = 13 # need user confirmation
@@ -307,6 +308,7 @@ HV_INITRD_PATH = "initrd_path"
 HV_ROOT_PATH = "root_path"
 HV_SERIAL_CONSOLE = "serial_console"
 HV_USB_MOUSE = "usb_mouse"
+HV_DEVICE_MODEL = "device_model"
 
 HVS_PARAMETER_TYPES = {
   HV_BOOT_ORDER: VTYPE_STRING,
@@ -325,6 +327,7 @@ HVS_PARAMETER_TYPES = {
   HV_ROOT_PATH: VTYPE_STRING,
   HV_SERIAL_CONSOLE: VTYPE_BOOL,
   HV_USB_MOUSE: VTYPE_STRING,
+  HV_DEVICE_MODEL: VTYPE_STRING,
   }
 
 HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
@@ -453,20 +456,24 @@ JOB_STATUS_CANCELED = "canceled"
 JOB_STATUS_SUCCESS = "success"
 JOB_STATUS_ERROR = "error"
 
+# OpCode status
+# not yet finalized
 OP_STATUS_QUEUED = "queued"
 OP_STATUS_WAITLOCK = "waiting"
 OP_STATUS_CANCELING = "canceling"
 OP_STATUS_RUNNING = "running"
+# finalized
 OP_STATUS_CANCELED = "canceled"
 OP_STATUS_SUCCESS = "success"
 OP_STATUS_ERROR = "error"
+OPS_FINALIZED = frozenset([OP_STATUS_CANCELED,
+                           OP_STATUS_SUCCESS,
+                           OP_STATUS_ERROR])
 
 # Execution log types
 ELOG_MESSAGE = "message"
 ELOG_PROGRESS = "progress"
 
-# Temporary RAPI constants until we have cluster parameters
-RAPI_ENABLE = True
 RAPI_PORT = 5080
 
 # max dynamic devices
@@ -505,6 +512,8 @@ HVC_DEFAULTS = {
     HV_VNC_BIND_ADDRESS: '0.0.0.0',
     HV_ACPI: True,
     HV_PAE: True,
+    HV_KERNEL_PATH: "/usr/lib/xen/boot/hvmloader",
+    HV_DEVICE_MODEL: "/usr/lib/xen/bin/qemu-dm",
     },
   HT_KVM: {
     HV_KERNEL_PATH: "/boot/vmlinuz-2.6-kvmU",
index 26f9cdf..5115837 100644 (file)
@@ -215,7 +215,7 @@ class Mainloop(object):
 
     """
     for owner in self._signal_wait:
-      owner.OnSignal(signal.SIGCHLD)
+      owner.OnSignal(signum)
 
   def RegisterIO(self, owner, fd, condition):
     """Registers a receiver for I/O notifications
index 008cf9c..c98fa58 100644 (file)
@@ -367,15 +367,12 @@ def SocketOperation(sock, op, arg1, timeout):
   # TODO: event_poll/event_check/override
   if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
     event_poll = select.POLLOUT
-    event_check = select.POLLOUT
 
   elif op == SOCKOP_RECV:
     event_poll = select.POLLIN
-    event_check = select.POLLIN | select.POLLPRI
 
   elif op == SOCKOP_SHUTDOWN:
     event_poll = None
-    event_check = None
 
     # The timeout is only used when OpenSSL requests polling for a condition.
     # It is not advisable to have no timeout for shutdown.
@@ -744,7 +741,7 @@ class HttpMessageWriter(object):
   def HasMessageBody(self):
     """Checks whether the HTTP message contains a body.
 
-    Can be overriden by subclasses.
+    Can be overridden by subclasses.
 
     """
     return bool(self._msg.body)
@@ -937,7 +934,7 @@ class HttpMessageReader(object):
   def ParseStartLine(self, start_line):
     """Parses the start line of a message.
 
-    Must be overriden by subclass.
+    Must be overridden by subclass.
 
     @type start_line: string
     @param start_line: Start line string
index 8a8d720..670b897 100644 (file)
 """
 
 import logging
-import time
 import re
 import base64
 import binascii
 
-from ganeti import constants
 from ganeti import utils
 from ganeti import http
 
@@ -80,7 +78,7 @@ class HttpServerRequestAuthentication(object):
   def GetAuthRealm(self, req):
     """Returns the authentication realm for a request.
 
-    MAY be overriden by a subclass, which then can return different realms for
+    MAY be overridden by a subclass, which then can return different realms for
     different paths. Returning "None" means no authentication is needed for a
     request.
 
@@ -195,7 +193,7 @@ class HttpServerRequestAuthentication(object):
   def Authenticate(self, req, user, password):
     """Checks the password for a user.
 
-    This function MUST be overriden by a subclass.
+    This function MUST be overridden by a subclass.
 
     """
     raise NotImplementedError()
index 776fade..717581f 100644 (file)
 
 """
 
-import BaseHTTPServer
-import cgi
-import logging
-import OpenSSL
 import os
 import select
 import socket
-import sys
-import time
-import signal
 import errno
 import threading
 
-from ganeti import constants
-from ganeti import serializer
 from ganeti import workerpool
-from ganeti import utils
 from ganeti import http
 
 
index b74eb36..0afdcd0 100644 (file)
@@ -31,9 +31,6 @@ import socket
 import time
 import signal
 
-from ganeti import constants
-from ganeti import serializer
-from ganeti import utils
 from ganeti import http
 
 
@@ -498,7 +495,7 @@ class HttpServer(http.HttpBase):
           # As soon as too many children run, we'll not respond to new
           # requests. The real solution would be to add a timeout for children
           # and killing them after some time.
-          pid, status = os.waitpid(0, 0)
+          pid, _ = os.waitpid(0, 0)
         except os.error:
           pid = None
         if pid and pid in self._children:
@@ -536,14 +533,14 @@ class HttpServer(http.HttpBase):
   def PreHandleRequest(self, req):
     """Called before handling a request.
 
-    Can be overriden by a subclass.
+    Can be overridden by a subclass.
 
     """
 
   def HandleRequest(self, req):
     """Handles a request.
 
-    Must be overriden by subclass.
+    Must be overridden by subclass.
 
     """
     raise NotImplementedError()
index ccac842..73ed60e 100644 (file)
@@ -25,7 +25,6 @@
 
 import os
 import os.path
-import re
 
 from ganeti import utils
 from ganeti import constants
index 4579f4d..8a29d6b 100644 (file)
@@ -89,7 +89,7 @@ class XenHypervisor(hv_base.BaseHypervisor):
     @return: list of (name, id, memory, vcpus, state, time spent)
 
     """
-    for dummy in range(5):
+    for _ in range(5):
       result = utils.RunCmd(["xm", "list"])
       if not result.failed:
         break
@@ -518,6 +518,8 @@ class XenHvmHypervisor(XenHypervisor):
     constants.HV_NIC_TYPE,
     constants.HV_PAE,
     constants.HV_VNC_BIND_ADDRESS,
+    constants.HV_KERNEL_PATH,
+    constants.HV_DEVICE_MODEL,
     ]
 
   @classmethod
@@ -559,6 +561,19 @@ class XenHvmHypervisor(XenHypervisor):
                                    " be an absolute path or None, not %s" %
                                    iso_path)
 
+    if not hvparams[constants.HV_KERNEL_PATH]:
+      raise errors.HypervisorError("Need a kernel for the instance")
+
+    if not os.path.isabs(hvparams[constants.HV_KERNEL_PATH]):
+      raise errors.HypervisorError("The kernel path must be an absolute path")
+
+    if not hvparams[constants.HV_DEVICE_MODEL]:
+      raise errors.HypervisorError("Need a device model for the instance")
+
+    if not os.path.isabs(hvparams[constants.HV_DEVICE_MODEL]):
+      raise errors.HypervisorError("The device model must be an absolute path")
+
+
   def ValidateParameters(self, hvparams):
     """Check the given parameters for validity.
 
@@ -580,6 +595,16 @@ class XenHvmHypervisor(XenHypervisor):
                                    " an existing regular file, not %s" %
                                    iso_path)
 
+    kernel_path = hvparams[constants.HV_KERNEL_PATH]
+    if not os.path.isfile(kernel_path):
+      raise errors.HypervisorError("Instance kernel '%s' not found or"
+                                   " not a file" % kernel_path)
+
+    device_model = hvparams[constants.HV_DEVICE_MODEL]
+    if not os.path.isfile(device_model):
+      raise errors.HypervisorError("Device model '%s' not found or"
+                                   " not a file" % device_model)
+
   @classmethod
   def _WriteConfigFile(cls, instance, block_devices):
     """Create a Xen 3.1 HVM config file.
@@ -589,25 +614,25 @@ class XenHvmHypervisor(XenHypervisor):
 
     config = StringIO()
     config.write("# this is autogenerated by Ganeti, please do not edit\n#\n")
-    config.write("kernel = '/usr/lib/xen/boot/hvmloader'\n")
+
+    # kernel handling
+    kpath = hvp[constants.HV_KERNEL_PATH]
+    config.write("kernel = '%s'\n" % kpath)
+
     config.write("builder = 'hvm'\n")
     config.write("memory = %d\n" % instance.beparams[constants.BE_MEMORY])
     config.write("vcpus = %d\n" % instance.beparams[constants.BE_VCPUS])
     config.write("name = '%s'\n" % instance.name)
-    if instance.hvparams[constants.HV_PAE]:
+    if hvp[constants.HV_PAE]:
       config.write("pae = 1\n")
     else:
       config.write("pae = 0\n")
-    if instance.hvparams[constants.HV_ACPI]:
+    if hvp[constants.HV_ACPI]:
       config.write("acpi = 1\n")
     else:
       config.write("acpi = 0\n")
     config.write("apic = 1\n")
-    arch = os.uname()[4]
-    if '64' in arch:
-      config.write("device_model = '/usr/lib64/xen/bin/qemu-dm'\n")
-    else:
-      config.write("device_model = '/usr/lib/xen/bin/qemu-dm'\n")
+    config.write("device_model = '%s'\n" % hvp[constants.HV_DEVICE_MODEL])
     config.write("boot = '%s'\n" % hvp[constants.HV_BOOT_ORDER])
     config.write("sdl = 0\n")
     config.write("usb = 1\n")
index 3364a93..9219cae 100644 (file)
@@ -69,7 +69,7 @@ def TimeStampNow():
 
 
 class _QueuedOpCode(object):
-  """Encasulates an opcode object.
+  """Encapsulates an opcode object.
 
   @ivar log: holds the execution log and consists of tuples
   of the form C{(log_serial, timestamp, level, message)}
@@ -80,6 +80,10 @@ class _QueuedOpCode(object):
   @ivar stop_timestamp: timestamp for the end of the execution
 
   """
+  __slots__ = ["input", "status", "result", "log",
+               "start_timestamp", "end_timestamp",
+               "__weakref__"]
+
   def __init__(self, op):
     """Constructor for the _QuededOpCode.
 
@@ -152,6 +156,11 @@ class _QueuedJob(object):
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
+  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
+               "received_timestamp", "start_timestamp", "end_timestamp",
+               "change",
+               "__weakref__"]
+
   def __init__(self, queue, job_id, ops):
     """Constructor for the _QueuedJob.
 
@@ -286,7 +295,7 @@ class _QueuedJob(object):
     """Selectively returns the log entries.
 
     @type newer_than: None or int
-    @param newer_than: if this is None, return all log enties,
+    @param newer_than: if this is None, return all log entries,
         otherwise return only the log entries with serial higher
         than this value
     @rtype: list
@@ -304,6 +313,26 @@ class _QueuedJob(object):
 
     return entries
 
+  def MarkUnfinishedOps(self, status, result):
+    """Mark unfinished opcodes with a given status and result.
+
+    This is an utility function for marking all running or waiting to
+    be run opcodes with a given status. Opcodes which are already
+    finalised are not changed.
+
+    @param status: a given opcode status
+    @param result: the opcode result
+
+    """
+    not_marked = True
+    for op in self.ops:
+      if op.status in constants.OPS_FINALIZED:
+        assert not_marked, "Finalized opcodes found after non-finalized ones"
+        continue
+      op.status = status
+      op.result = result
+      not_marked = False
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -353,6 +382,15 @@ class _JobQueueWorker(workerpool.BaseWorker):
         count = len(job.ops)
         for idx, op in enumerate(job.ops):
           op_summary = op.input.Summary()
+          if op.status == constants.OP_STATUS_SUCCESS:
+            # this is a job that was partially completed before master
+            # daemon shutdown, so it can be expected that some opcodes
+            # are already completed successfully (if any did error
+            # out, then the whole job should have been aborted and not
+            # resubmitted for processing)
+            logging.info("Op %s/%s: opcode %s already processed, skipping",
+                         idx + 1, count, op_summary)
+            continue
           try:
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
@@ -446,7 +484,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
-          job.run_op_idx = -1
+          job.run_op_index = -1
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
         finally:
@@ -469,7 +507,7 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
 
 
 class JobQueue(object):
-  """Quue used to manaage the jobs.
+  """Queue used to manage the jobs.
 
   @cvar _RE_JOB_FILE: regex matching the valid job file names
 
@@ -575,9 +613,8 @@ class JobQueue(object):
                           constants.JOB_STATUS_CANCELING):
             logging.warning("Unfinished job %s found: %s", job.id, job)
             try:
-              for op in job.ops:
-                op.status = constants.OP_STATUS_ERROR
-                op.result = "Unclean master daemon shutdown"
+              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                    "Unclean master daemon shutdown")
             finally:
               self.UpdateJobUnlocked(job)
 
@@ -651,7 +688,7 @@ class JobQueue(object):
 
     Since we aim to keep consistency should this node (the current
     master) fail, we will log errors if our rpc fail, and especially
-    log the case when more than half of the nodes failes.
+    log the case when more than half of the nodes fails.
 
     @param result: the data as returned from the rpc call
     @type nodes: list
@@ -759,26 +796,31 @@ class JobQueue(object):
     """
     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialsUnlocked(self, count):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
 
+    @type count: integer
+    @param count: how many serials to return
     @rtype: str
     @return: a string representing the job identifier.
 
     """
+    assert count > 0
     # New number
-    serial = self._last_serial + 1
+    serial = self._last_serial + count
 
     # Write to file
     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                         "%s\n" % serial)
 
+    result = [self._FormatJobID(v)
+              for v in range(self._last_serial, serial + 1)]
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
-    return self._FormatJobID(serial)
+    return result
 
   @staticmethod
   def _GetJobPath(job_id):
@@ -934,7 +976,7 @@ class JobQueue(object):
     and in the future we might merge them.
 
     @type drain_flag: boolean
-    @param drain_flag: wheter to set or unset the drain flag
+    @param drain_flag: Whether to set or unset the drain flag
 
     """
     if drain_flag:
@@ -943,14 +985,15 @@ class JobQueue(object):
       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
     return True
 
-  @utils.LockedMethod
   @_RequireOpenQueue
-  def SubmitJob(self, ops):
+  def _SubmitJobUnlocked(self, job_id, ops):
     """Create and store a new job.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
+    @type job_id: job ID
+    @param jod_id: the job ID for the new job
     @type ops: list
     @param ops: The list of OpCodes that will become the new job.
     @rtype: job ID
@@ -959,7 +1002,7 @@ class JobQueue(object):
 
     """
     if self._IsQueueMarkedDrain():
-      raise errors.JobQueueDrainError()
+      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
 
     # Check job queue size
     size = len(self._ListJobFiles())
@@ -972,8 +1015,6 @@ class JobQueue(object):
     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
       raise errors.JobQueueFull()
 
-    # Get job identifier
-    job_id = self._NewSerialUnlocked()
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -987,6 +1028,39 @@ class JobQueue(object):
 
     return job.id
 
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitJob(self, ops):
+    """Create and store a new job.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    job_id = self._NewSerialsUnlocked(1)[0]
+    return self._SubmitJobUnlocked(job_id, ops)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def SubmitManyJobs(self, jobs):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    all_job_ids = self._NewSerialsUnlocked(len(jobs))
+    for job_id, ops in zip(all_job_ids, jobs):
+      try:
+        data = self._SubmitJobUnlocked(job_id, ops)
+        status = True
+      except errors.GenericError, err:
+        data = str(err)
+        status = False
+      results.append((status, data))
+
+    return results
+
+
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
@@ -1034,6 +1108,10 @@ class JobQueue(object):
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
+
+    job_info = None
+    log_entries = None
+
     end_time = time.time() + timeout
     while True:
       delta_time = end_time - time.time()
@@ -1075,7 +1153,10 @@ class JobQueue(object):
 
     logging.debug("Job %s changed", job_id)
 
-    return (job_info, log_entries)
+    if job_info is None and log_entries is None:
+      return None
+    else:
+      return (job_info, log_entries)
 
   @utils.LockedMethod
   @_RequireOpenQueue
@@ -1099,8 +1180,8 @@ class JobQueue(object):
 
     if job_status not in (constants.JOB_STATUS_QUEUED,
                           constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer in the queue", job.id)
-      return (False, "Job %s is no longer in the queue" % job.id)
+      logging.debug("Job %s is no longer waiting in the queue", job.id)
+      return (False, "Job %s is no longer waiting in the queue" % job.id)
 
     if job_status == constants.JOB_STATUS_QUEUED:
       self.CancelJobUnlocked(job)
@@ -1109,8 +1190,7 @@ class JobQueue(object):
     elif job_status == constants.JOB_STATUS_WAITLOCK:
       # The worker will notice the new status and cancel the job
       try:
-        for op in job.ops:
-          op.status = constants.OP_STATUS_CANCELING
+        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
       finally:
         self.UpdateJobUnlocked(job)
       return (True, "Job %s will be canceled" % job.id)
@@ -1121,9 +1201,8 @@ class JobQueue(object):
 
     """
     try:
-      for op in job.ops:
-        op.status = constants.OP_STATUS_CANCELED
-        op.result = "Job canceled by request"
+      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                            "Job canceled by request")
     finally:
       self.UpdateJobUnlocked(job)
 
index 4d9189e..5c59968 100644 (file)
@@ -22,9 +22,7 @@
 """Module implementing the job queue handling."""
 
 import os
-import logging
 import errno
-import re
 
 from ganeti import constants
 from ganeti import errors
index 647e14f..d24abdf 100644 (file)
@@ -297,7 +297,7 @@ class SharedLock:
 
 
 # Whenever we want to acquire a full LockSet we pass None as the value
-# to acquire.  Hide this behing this nicely named constant.
+# to acquire.  Hide this behind this nicely named constant.
 ALL_SET = None
 
 
@@ -498,7 +498,7 @@ class LockSet:
           # Of course something is going to be really wrong, after this.
           if lock._is_owned():
             lock.release()
-            raise
+          raise
 
     except:
       # If something went wrong and we had the set-lock let's release it...
@@ -689,7 +689,7 @@ BGL = 'BGL'
 class GanetiLockManager:
   """The Ganeti Locking Library
 
-  The purpouse of this small library is to manage locking for ganeti clusters
+  The purpose of this small library is to manage locking for ganeti clusters
   in a central place, while at the same time doing dynamic checks against
   possible deadlocks. It will also make it easier to transition to a different
   lock type should we migrate away from python threads.
@@ -774,7 +774,7 @@ class GanetiLockManager:
     """Acquire a set of resource locks, at the same level.
 
     @param level: the level at which the locks shall be acquired;
-        it must be a memmber of LEVELS.
+        it must be a member of LEVELS.
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
     @param shared: whether to acquire in shared mode; by default
@@ -809,7 +809,7 @@ class GanetiLockManager:
     mode, before releasing them.
 
     @param level: the level at which the locks shall be released;
-        it must be a memmber of LEVELS
+        it must be a member of LEVELS
     @param names: the names of the locks which shall be released
         (defaults to all the locks acquired at that level)
 
@@ -827,7 +827,7 @@ class GanetiLockManager:
     """Add locks at the specified level.
 
     @param level: the level at which the locks shall be added;
-        it must be a memmber of LEVELS_MOD.
+        it must be a member of LEVELS_MOD.
     @param names: names of the locks to acquire
     @param acquired: whether to acquire the newly added locks
     @param shared: whether the acquisition will be shared
index 308de9f..11ea61d 100644 (file)
@@ -45,6 +45,7 @@ KEY_SUCCESS = "success"
 KEY_RESULT = "result"
 
 REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
@@ -186,12 +187,13 @@ class Transport:
       raise EncodingError("Message terminator found in payload")
     self._CheckSocket()
     try:
+      # TODO: sendall is not guaranteed to send everything
       self.socket.sendall(msg + self.eom)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
   def Recv(self):
-    """Try to receive a messae from the socket.
+    """Try to receive a message from the socket.
 
     In case we already have messages queued, we just return from the
     queue. Otherwise, we try to read data with a _rwtimeout network
@@ -204,10 +206,16 @@ class Transport:
     while not self._msgs:
       if time.time() > etime:
         raise TimeoutError("Extended receive timeout")
-      try:
-        data = self.socket.recv(4096)
-      except socket.timeout, err:
-        raise TimeoutError("Receive timeout: %s" % str(err))
+      while True:
+        try:
+          data = self.socket.recv(4096)
+        except socket.error, err:
+          if err.args and err.args[0] == errno.EAGAIN:
+            continue
+          raise
+        except socket.timeout, err:
+          raise TimeoutError("Receive timeout: %s" % str(err))
+        break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
       new_msgs = (self._buffer + data).split(self.eom)
@@ -277,7 +285,7 @@ class Client(object):
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
-    except Exception, err:
+    except Exception:
       pass
 
   def CallMethod(self, method, args):
@@ -335,6 +343,12 @@ class Client(object):
     ops_state = map(lambda op: op.__getstate__(), ops)
     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
 
+  def SubmitManyJobs(self, jobs):
+    jobs_state = []
+    for ops in jobs:
+      jobs_state.append([op.__getstate__() for op in ops])
+    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
   def CancelJob(self, job_id):
     return self.CallMethod(REQ_CANCEL_JOB, job_id)
 
index 1e42d12..959a837 100644 (file)
@@ -159,7 +159,7 @@ class Processor(object):
           self.context.glm.add(level, add_locks, acquired=1, shared=share)
         except errors.LockError:
           raise errors.OpPrereqError(
-            "Coudn't add locks (%s), probably because of a race condition"
+            "Couldn't add locks (%s), probably because of a race condition"
             " with another job, who added them first" % add_locks)
       try:
         try:
@@ -188,7 +188,7 @@ class Processor(object):
     @type run_notifier: callable (no arguments) or None
     @param run_notifier:  this function (if callable) will be called when
                           we are about to call the lu's Exec() method, that
-                          is, after we have aquired all locks
+                          is, after we have acquired all locks
 
     """
     if not isinstance(op, opcodes.OpCode):
@@ -362,4 +362,4 @@ class HooksMaster(object):
     phase = constants.HOOKS_PHASE_POST
     hpath = constants.HOOKS_NAME_CFGUPDATE
     nodes = [self.lu.cfg.GetMasterNode()]
-    results = self._RunWrapper(nodes, hpath, phase)
+    self._RunWrapper(nodes, hpath, phase)
index 947678b..d5f446e 100644 (file)
@@ -58,6 +58,7 @@ class ConfigObject(object):
   def __init__(self, **kwargs):
     for k, v in kwargs.iteritems():
       setattr(self, k, v)
+    self.UpgradeConfig()
 
   def __getattr__(self, name):
     if name not in self.__slots__:
@@ -165,6 +166,15 @@ class ConfigObject(object):
     """Implement __repr__ for ConfigObjects."""
     return repr(self.ToDict())
 
+  def UpgradeConfig(self):
+    """Fill defaults for missing configuration values.
+
+    This method will be called at object init time, and its implementation will
+    be object dependent.
+
+    """
+    pass
+
 
 class TaggableObject(ConfigObject):
   """An generic class supporting tags.
@@ -521,10 +531,10 @@ class Disk(ConfigObject):
     """Checks that this disk is correctly configured.
 
     """
-    errors = []
+    all_errors = []
     if self.mode not in constants.DISK_ACCESS_SET:
-      errors.append("Disk access mode '%s' is invalid" % (self.mode, ))
-    return errors
+      all_errors.append("Disk access mode '%s' is invalid" % (self.mode, ))
+    return all_errors
 
 
 class Instance(TaggableObject):
@@ -742,8 +752,30 @@ class Cluster(TaggableObject):
     "hvparams",
     "beparams",
     "candidate_pool_size",
+    "modify_etc_hosts",
     ]
 
+  def UpgradeConfig(self):
+    """Fill defaults for missing configuration values.
+
+    """
+    if self.hvparams is None:
+      self.hvparams = constants.HVC_DEFAULTS
+    else:
+      for hypervisor in self.hvparams:
+        self.hvparams[hypervisor] = self.FillDict(
+            constants.HVC_DEFAULTS[hypervisor], self.hvparams[hypervisor])
+
+    if self.beparams is None:
+      self.beparams = {constants.BEGR_DEFAULT: constants.BEC_DEFAULTS}
+    else:
+      for begroup in self.beparams:
+        self.beparams[begroup] = self.FillDict(constants.BEC_DEFAULTS,
+                                               self.beparams[begroup])
+
+    if self.modify_etc_hosts is None:
+      self.modify_etc_hosts = True
+
   def ToDict(self):
     """Custom function for cluster.
 
index 8fe24a9..6a86477 100644 (file)
@@ -142,14 +142,9 @@ class OpCode(BaseOpCode):
       raise ValueError("Invalid data to LoadOpcode, missing OP_ID")
     op_id = data["OP_ID"]
     op_class = None
-    for item in globals().values():
-      if (isinstance(item, type) and
-          issubclass(item, cls) and
-          hasattr(item, "OP_ID") and
-          getattr(item, "OP_ID") == op_id):
-        op_class = item
-        break
-    if op_class is None:
+    if op_id in OP_MAPPING:
+      op_class = OP_MAPPING[op_id]
+    else:
       raise ValueError("Invalid data to LoadOpCode: OP_ID %s unsupported" %
                        op_id)
     op = op_class()
@@ -598,3 +593,7 @@ class OpTestAllocator(OpCode):
     "mem_size", "disks", "disk_template",
     "os", "tags", "nics", "vcpus", "hypervisor",
     ]
+
+OP_MAPPING = dict([(v.OP_ID, v) for v in globals().values()
+                   if (isinstance(v, type) and issubclass(v, OpCode) and
+                       hasattr(v, "OP_ID"))])
index 7a02527..77a7e62 100644 (file)
@@ -25,8 +25,6 @@
 
 import logging
 
-import ganeti.cli
-
 from ganeti import luxi
 from ganeti import rapi
 from ganeti import http
@@ -247,7 +245,7 @@ class R_Generic(object):
         val = 0
     try:
       val = int(val)
-    except (ValueError, TypeError), err:
+    except (ValueError, TypeError):
       raise http.HttpBadRequest("Invalid value for the"
                                 " '%s' parameter" % (name,))
     return val
index 45f649b..96671ec 100644 (file)
@@ -46,6 +46,8 @@ N_FIELDS = ["name", "offline", "master_candidate", "drained",
             "mtotal", "mnode", "mfree",
             "pinst_cnt", "sinst_cnt", "tags",
             "ctotal", "cnodes", "csockets",
+            "pip", "sip", "serial_no", "role",
+            "pinst_list", "sinst_list",
             ]
 
 
@@ -441,8 +443,7 @@ class R_2_instances_name_reboot(baserlib.R_Generic):
     instance_name = self.items[0]
     reboot_type = self.queryargs.get('type',
                                      [constants.INSTANCE_REBOOT_HARD])[0]
-    ignore_secondaries = bool(self.queryargs.get('ignore_secondaries',
-                                                 [False])[0])
+    ignore_secondaries = bool(self._checkIntVariable('ignore_secondaries'))
     op = opcodes.OpRebootInstance(instance_name=instance_name,
                                   reboot_type=reboot_type,
                                   ignore_secondaries=ignore_secondaries)
@@ -467,7 +468,7 @@ class R_2_instances_name_startup(baserlib.R_Generic):
 
     """
     instance_name = self.items[0]
-    force_startup = bool(self.queryargs.get('force', [False])[0])
+    force_startup = bool(self._checkIntVariable('force'))
     op = opcodes.OpStartupInstance(instance_name=instance_name,
                                    force=force_startup)
 
index 6392f50..e138859 100644 (file)
@@ -31,7 +31,6 @@
 # R0904: Too many public methods
 
 import os
-import socket
 import logging
 import zlib
 import base64
@@ -83,7 +82,7 @@ class RpcResult(object):
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
-  @ivar data: the data payload, for successfull results, or None
+  @ivar data: the data payload, for successful results, or None
   @type failed: boolean
   @ivar failed: whether the operation failed at RPC level (not
       application level on the remote node)
@@ -161,7 +160,7 @@ class Client:
   list of nodes, will contact (in parallel) all nodes, and return a
   dict of results (key: node name, value: result).
 
-  One current bug is that generic failure is still signalled by
+  One current bug is that generic failure is still signaled by
   'False' result, which is not good. This overloading of values can
   cause bugs.
 
@@ -220,7 +219,7 @@ class Client:
     @return: List of RPC results
 
     """
-    assert _http_manager, "RPC module not intialized"
+    assert _http_manager, "RPC module not initialized"
 
     _http_manager.ExecRequests(self.nc.values())
 
@@ -269,9 +268,9 @@ class RpcRunner(object):
     @type instance: L{objects.Instance}
     @param instance: an Instance object
     @type hvp: dict or None
-    @param hvp: a dictionary with overriden hypervisor parameters
+    @param hvp: a dictionary with overridden hypervisor parameters
     @type bep: dict or None
-    @param bep: a dictionary with overriden backend parameters
+    @param bep: a dictionary with overridden backend parameters
     @rtype: dict
     @return: the instance dict, with the hvparams filled with the
         cluster defaults
@@ -290,7 +289,7 @@ class RpcRunner(object):
   def _ConnectList(self, client, node_list, call):
     """Helper for computing node addresses.
 
-    @type client: L{Client}
+    @type client: L{ganeti.rpc.Client}
     @param client: a C{Client} instance
     @type node_list: list
     @param node_list: the node list we should connect
@@ -320,7 +319,7 @@ class RpcRunner(object):
   def _ConnectNode(self, client, node, call):
     """Helper for computing one node's address.
 
-    @type client: L{Client}
+    @type client: L{ganeti.rpc.Client}
     @param client: a C{Client} instance
     @type node: str
     @param node: the node we should connect
index 19a95c9..78ae9b0 100644 (file)
@@ -94,12 +94,6 @@ class SimpleConfigWriter(SimpleConfigReader):
   """Simple class to write configuration file.
 
   """
-  def SetMasterNode(self, node):
-    """Change master node.
-
-    """
-    self._config_data["cluster"]["master_node"] = node
-
   def Save(self):
     """Writes configuration file.
 
index 40df999..f0362b4 100644 (file)
@@ -201,7 +201,7 @@ class SshRunner:
     connected to).
 
     This is used to detect problems in ssh known_hosts files
-    (conflicting known hosts) and incosistencies between dns/hosts
+    (conflicting known hosts) and inconsistencies between dns/hosts
     entries and local machine names
 
     @param node: nodename of a host to check; can be short or
index ac781fb..df2d180 100644 (file)
@@ -27,7 +27,6 @@ the command line scripts.
 """
 
 
-import sys
 import os
 import time
 import subprocess
@@ -59,7 +58,6 @@ from ganeti import constants
 _locksheld = []
 _re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
 
-debug = False
 debug_locks = False
 
 #: when set to True, L{RunCmd} is disabled
@@ -136,7 +134,7 @@ def RunCmd(cmd, env=None, output=None, cwd='/'):
       directory for the command; the default will be /
   @rtype: L{RunResult}
   @return: RunResult instance
-  @raise erors.ProgrammerError: if we call this when forks are disabled
+  @raise errors.ProgrammerError: if we call this when forks are disabled
 
   """
   if no_fork:
@@ -687,7 +685,7 @@ def TryConvert(fn, val):
   """
   try:
     nv = fn(val)
-  except (ValueError, TypeError), err:
+  except (ValueError, TypeError):
     nv = val
   return nv
 
@@ -701,7 +699,7 @@ def IsValidIP(ip):
   @type ip: str
   @param ip: the address to be checked
   @rtype: a regular expression match object
-  @return: a regular epression match object, or None if the
+  @return: a regular expression match object, or None if the
       address is not valid
 
   """
@@ -734,7 +732,7 @@ def BuildShellCmd(template, *args):
 
   This function will check all arguments in the args list so that they
   are valid shell parameters (i.e. they don't contain shell
-  metacharaters). If everything is ok, it will return the result of
+  metacharacters). If everything is ok, it will return the result of
   template % args.
 
   @type template: str
@@ -1063,7 +1061,7 @@ def ShellQuoteArgs(args):
   @type args: list
   @param args: list of arguments to be quoted
   @rtype: str
-  @return: the quoted arguments concatenaned with spaces
+  @return: the quoted arguments concatenated with spaces
 
   """
   return ' '.join([ShellQuote(i) for i in args])
@@ -1080,7 +1078,7 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None):
   @type port: int
   @param port: the port to connect to
   @type timeout: int
-  @param timeout: the timeout on the connection attemp
+  @param timeout: the timeout on the connection attempt
   @type live_port_needed: boolean
   @param live_port_needed: whether a closed port will cause the
       function to return failure, as if there was a timeout
@@ -1097,7 +1095,7 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None):
   if source is not None:
     try:
       sock.bind((source, 0))
-    except socket.error, (errcode, errstring):
+    except socket.error, (errcode, _):
       if errcode == errno.EADDRNOTAVAIL:
         success = False
 
@@ -1122,7 +1120,7 @@ def OwnIpAddress(address):
   address.
 
   @type address: string
-  @param address: the addres to check
+  @param address: the address to check
   @rtype: bool
   @return: True if we own the address
 
@@ -1218,7 +1216,7 @@ def ReadFile(file_name, size=None):
   @type size: None or int
   @param size: Read at most size bytes
   @rtype: str
-  @return: the (possibly partial) conent of the file
+  @return: the (possibly partial) content of the file
 
   """
   f = open(file_name, "r")
@@ -1360,14 +1358,14 @@ def FirstFree(seq, base=0):
 
 def all(seq, pred=bool):
   "Returns True if pred(x) is True for every element in the iterable"
-  for elem in itertools.ifilterfalse(pred, seq):
+  for _ in itertools.ifilterfalse(pred, seq):
     return False
   return True
 
 
 def any(seq, pred=bool):
   "Returns True if pred(x) is True for at least one element in the iterable"
-  for elem in itertools.ifilter(pred, seq):
+  for _ in itertools.ifilter(pred, seq):
     return True
   return False
 
@@ -1378,7 +1376,7 @@ def UniqueSequence(seq):
   Element order is preserved.
 
   @type seq: sequence
-  @param seq: the sequence with the source elementes
+  @param seq: the sequence with the source elements
   @rtype: list
   @return: list of unique elements from seq
 
@@ -1390,7 +1388,7 @@ def UniqueSequence(seq):
 def IsValidMac(mac):
   """Predicate to check if a MAC address is valid.
 
-  Checks wether the supplied MAC address is formally correct, only
+  Checks whether the supplied MAC address is formally correct, only
   accepts colon separated format.
 
   @type mac: str
@@ -1552,7 +1550,6 @@ def RemovePidFile(name):
   @param name: the daemon name used to derive the pidfile name
 
   """
-  pid = os.getpid()
   pidfilename = DaemonPidFileName(name)
   # TODO: we could check here that the file contains our pid
   try:
@@ -1831,7 +1828,7 @@ def SafeEncode(text):
 
   """
   if isinstance(text, unicode):
-    # onli if unicode; if str already, we handle it below
+    # only if unicode; if str already, we handle it below
     text = text.encode('ascii', 'backslashreplace')
   resu = ""
   for char in text:
index 89fd9e4..a42a441 100644 (file)
 
       <cmdsynopsis>
         <command>submit-job</command>
-
+        <arg choice="opt">--verbose</arg>
+        <arg choice="opt">--timing-stats</arg>
+        <arg choice="opt">--job-repeat <option>N</option></arg>
+        <arg choice="opt">--op-repeat <option>N</option></arg>
         <arg choice="req" rep="repeat">opcodes_file</arg>
       </cmdsynopsis>
 
         command line.
       </para>
 
+      <para>
+        The <option>verbose</option> option will job the job IDs of
+        the submitted jobs and the progress in waiting for the jobs;
+        the <option>timing-stats</option> option will show some
+        overall statistics with the number of total opcodes and jobs
+        submitted, and time time for each stage (submit, exec, total).
+      </para>
+
+      <para>
+        The <option>job-repeat</option> and <option>op-repeat</option>
+        options allow to submit multiple copies of the passed
+        arguments; the job repeat will cause N copies of each job
+        (input file) to be submitted (equivalent to passing the
+        arguments N times) while the op repeat will cause each job to
+        contain multiple copies of the opcodes (equivalent to each
+        file containing N copies of the opcodes).
+      </para>
+
     </refsect2>
 
   </refsect1>
index d17a4a6..a703d68 100644 (file)
@@ -1229,32 +1229,48 @@ instance5: 11225
         <cmdsynopsis>
           <command>reinstall</command>
           <arg choice="opt">-o <replaceable>os-type</replaceable></arg>
-          <arg choice="opt">-f <replaceable>force</replaceable></arg>
           <arg>--select-os</arg>
+          <arg choice="opt">-f <replaceable>force</replaceable></arg>
+          <arg>--force-multiple</arg>
+          <sbr>
+          <group choice="opt">
+            <arg>--instance</arg>
+            <arg>--node</arg>
+            <arg>--primary</arg>
+            <arg>--secondary</arg>
+            <arg>--all</arg>
+          </group>
           <arg>--submit</arg>
-          <arg choice="req"><replaceable>instance</replaceable></arg>
+          <arg choice="opt" rep="repeat"><replaceable>instance</replaceable></arg>
         </cmdsynopsis>
 
         <para>
-          Reinstalls the operating system on the given instance. The
-          instance must be stopped when running this command. If the
+          Reinstalls the operating system on the given instance(s). The
+          instance(s) must be stopped when running this command. If the
           <option>--os-type</option> is specified, the operating
           system is changed.
         </para>
 
         <para>
-          Since reinstall is potentially dangerous command, the user
-          will be required to confirm this action, unless the
-          <option>-f</option> flag is passed.
-        </para>
-
-        <para>
           The <option>--select-os</option> option switches to an
           interactive OS reinstall. The user is prompted to select the OS
           template from the list of available OS templates.
         </para>
 
         <para>
+          Since this is a potentially dangerous command, the user will
+          be required to confirm this action, unless the
+          <option>-f</option> flag is passed. When multiple instances
+          are selected (either by passing multiple arguments or by
+          using the <option>--node</option>,
+          <option>--primary</option>, <option>--secondary</option> or
+          <option>--all</option> options), the user must pass both the
+          <option>--force</option> and
+          <option>--force-multiple</option> options to skip the
+          interactive confirmation.
+        </para>
+
+        <para>
           The <option>--submit</option> option is used to send the job to
           the master daemon but not wait for its completion. The job
           ID will be shown so that it can be examined via
index bf81ec7..2fa7307 100644 (file)
     <refsect2>
       <title>INFO</title>
       <cmdsynopsis>
-        <command>cancel</command>
+        <command>info</command>
         <arg choice="req" rep="repeat"><replaceable>id</replaceable></arg>
       </cmdsynopsis>
 
       </para>
 
     </refsect2>
+
+    <refsect2>
+      <title>WATCH</title>
+      <cmdsynopsis>
+        <command>watch</command>
+        <arg>id</arg>
+      </cmdsynopsis>
+
+      <para>
+        This command follows the output of the job by the given
+        <replaceable>id</replaceable> and prints it.
+      </para>
+    </refsect2>
+
   </refsect1>
 
   &footer;
index a35042d..278a633 100644 (file)
@@ -91,7 +91,7 @@
         discussion in <citerefentry>
         <refentrytitle>gnt-cluster</refentrytitle>
         <manvolnum>8</manvolnum> </citerefentry> for more
-        informations.
+        information.
       </para>
 
       <para>
diff --git a/pylintrc b/pylintrc
new file mode 100644 (file)
index 0000000..61163e7
--- /dev/null
+++ b/pylintrc
@@ -0,0 +1,78 @@
+# Configuration file for pylint (http://www.logilab.org/project/pylint). See
+# http://www.logilab.org/card/pylintfeatures for more detailed variable
+# descriptions.
+
+[MASTER]
+profile = no
+ignore =
+persistent = no
+cache-size = 50000
+load-plugins =
+
+[REPORTS]
+output-format = colorized
+include-ids = no
+files-output = no
+reports = no
+evaluation = 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
+comment = yes
+
+[BASIC]
+required-attributes =
+no-docstring-rgx = __.*__
+module-rgx = (([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
+const-rgx = ((_{0,2}[A-Z][A-Z0-9_]*)|(__.*__))$
+class-rgx = _?[A-Z][a-zA-Z0-9]+$
+function-rgx = (_?([A-Z]+[a-z0-9]+([A-Z]+[a-z0-9]*)*)|main)$
+method-rgx = (_{0,2}[A-Z]+[a-z0-9]+([A-Z]+[a-z0-9]*)*|__.*__)$
+attr-rgx = [a-z_][a-z0-9_]{1,30}$
+argument-rgx = [a-z_][a-z0-9_]*$
+variable-rgx = (_?([a-z_][a-z0-9_]*)|([A-Z0-9_]+))$
+inlinevar-rgx = [A-Za-z_][A-Za-z0-9_]*$
+good-names = i,j,k,_
+bad-names = foo,bar,baz,toto,tutu,tata
+bad-functions =
+
+[TYPECHECK]
+ignore-mixin-members = yes
+zope = no
+acquired-members =
+
+[VARIABLES]
+init-import = no
+dummy-variables-rgx = _
+additional-builtins =
+
+[CLASSES]
+ignore-iface-methods =
+defining-attr-methods = __init__,__new__,setUp
+
+[DESIGN]
+max-args = 6
+max-locals = 15
+max-returns = 6
+max-branchs = 12
+max-statements = 50
+max-parents = 7
+max-attributes = 7
+min-public-methods = 2
+max-public-methods = 20
+
+[IMPORTS]
+deprecated-modules = regsub,string,TERMIOS,Bastion,rexec
+import-graph =
+ext-import-graph =
+int-import-graph =
+
+[FORMAT]
+max-line-length = 80
+max-module-lines = 1000
+indent-string = "  "
+
+[MISCELLANEOUS]
+notes = FIXME,XXX,TODO
+
+[SIMILARITIES]
+min-similarity-lines = 4
+ignore-comments = yes
+ignore-docstrings = yes
index 7c668c5..76b96f1 100644 (file)
@@ -58,19 +58,7 @@ def Enabled():
   """Return whether remote API tests should be run.
 
   """
-  return constants.RAPI_ENABLE and qa_config.TestEnabled('rapi')
-
-
-def PrintRemoteAPIWarning():
-  """Print warning if remote API is not enabled.
-
-  """
-  if constants.RAPI_ENABLE or not qa_config.TestEnabled('rapi'):
-    return
-  msg = ("Remote API is not enabled in this Ganeti build. Please run"
-         " `configure [...] --enable-rapi'.")
-  print
-  print qa_utils.FormatWarning(msg)
+  return qa_config.TestEnabled('rapi')
 
 
 def _DoTests(uris):
index b5fd166..2d53d1d 100755 (executable)
@@ -93,11 +93,6 @@ def InitCluster(opts, args):
         hvparams[hv][parameter] = constants.HVC_DEFAULTS[hv][parameter]
     utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
 
-  for hv in hvlist:
-    if hv not in constants.HYPER_TYPES:
-      ToStderr("invalid hypervisor: %s", hv)
-      return 1
-
   bootstrap.InitCluster(cluster_name=args[0],
                         secondary_ip=opts.secondary_ip,
                         vg_name=vg_name,
@@ -110,6 +105,7 @@ def InitCluster(opts, args):
                         hvparams=hvparams,
                         beparams=beparams,
                         candidate_pool_size=opts.candidate_pool_size,
+                        modify_etc_hosts=opts.modify_etc_hosts,
                         )
   return 0
 
@@ -232,6 +228,13 @@ def ShowClusterConfig(opts, args):
   ToStdout("Architecture (this node): %s (%s)",
            result["architecture"][0], result["architecture"][1])
 
+  if result["tags"]:
+    tags = ", ".join(utils.NiceSort(result["tags"]))
+  else:
+    tags = "(none)"
+
+  ToStdout("Tags: %s", tags)
+
   ToStdout("Default hypervisor: %s", result["default_hypervisor"])
   ToStdout("Enabled hypervisors: %s", ", ".join(result["enabled_hypervisors"]))
 
@@ -587,6 +590,10 @@ commands = {
                         help="No support for lvm based instances"
                              " (cluster-wide)",
                         action="store_false", default=True,),
+            make_option("--no-etc-hosts", dest="modify_etc_hosts",
+                        help="Don't modify /etc/hosts"
+                             " (cluster-wide)",
+                        action="store_false", default=True,),
             make_option("--enabled-hypervisors", dest="enabled_hypervisors",
                         help="Comma-separated list of hypervisors",
                         type="string", default=None),
index d3bf054..df48e60 100755 (executable)
@@ -71,19 +71,37 @@ def GenericOpCodes(opts, args):
 
   """
   cl = cli.GetClient()
-  job_data = []
-  job_ids = []
-  for fname in args:
-    op_data = simplejson.loads(open(fname).read())
-    op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
-    job_data.append((fname, op_list))
-  for fname, op_list in job_data:
-    jid = cli.SendJob(op_list, cl=cl)
-    ToStdout("File '%s', job id: %s", fname, jid)
-    job_ids.append(jid)
-  for jid in job_ids:
-    ToStdout("Waiting for job id %s", jid)
-    cli.PollJob(jid, cl=cl)
+  jex = cli.JobExecutor(cl=cl, verbose=opts.verbose)
+
+  job_cnt = 0
+  op_cnt = 0
+  if opts.timing_stats:
+    ToStdout("Loading...")
+  for job_idx in range(opts.rep_job):
+    for fname in args:
+      op_data = simplejson.loads(open(fname).read())
+      op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
+      op_list = op_list * opts.rep_op
+      jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
+      op_cnt += len(op_list)
+      job_cnt += 1
+
+  if opts.timing_stats:
+    t1 = time.time()
+    ToStdout("Submitting...")
+  jex.SubmitPending()
+
+  if opts.timing_stats:
+    t2 = time.time()
+    ToStdout("Executing...")
+  jex.GetResults()
+  if opts.timing_stats:
+    t3 = time.time()
+    ToStdout("C:op     %4d" % op_cnt)
+    ToStdout("C:job    %4d" % job_cnt)
+    ToStdout("T:submit %4.4f" % (t2-t1))
+    ToStdout("T:exec   %4.4f" % (t3-t2))
+    ToStdout("T:total  %4.4f" % (t3-t1))
   return 0
 
 
@@ -148,6 +166,20 @@ commands = {
             "[opts...] <duration>", "Executes a TestDelay OpCode"),
   'submit-job': (GenericOpCodes, ARGS_ATLEAST(1),
                  [DEBUG_OPT,
+                  make_option("--op-repeat", type="int", default="1",
+                              dest="rep_op",
+                              help="Repeat the opcode sequence this number"
+                              " of times"),
+                  make_option("--job-repeat", type="int", default="1",
+                              dest="rep_job",
+                              help="Repeat the job this number"
+                              " of times"),
+                  make_option("-v", "--verbose", default=False,
+                              action="store_true",
+                              help="Make the operation more verbose"),
+                  make_option("--timing-stats", default=False,
+                              action="store_true",
+                              help="Show timing stats"),
                   ],
                  "<op_list_file...>", "Submits jobs built from json files"
                  " containing a list of serialized opcodes"),
index e16fed7..0563377 100755 (executable)
@@ -116,7 +116,7 @@ def _ExpandMultiNames(mode, names, client=None):
   return inames
 
 
-def _ConfirmOperation(inames, text):
+def _ConfirmOperation(inames, text, extra=""):
   """Ask the user to confirm an operation on a list of instances.
 
   This function is used to request confirmation for doing an operation
@@ -133,8 +133,8 @@ def _ConfirmOperation(inames, text):
 
   """
   count = len(inames)
-  msg = ("The %s will operate on %d instances.\n"
-         "Do you want to continue?" % (text, count))
+  msg = ("The %s will operate on %d instances.\n%s"
+         "Do you want to continue?" % (text, count, extra))
   affected = ("\nAffected instances:\n" +
               "\n".join(["  %s" % name for name in inames]))
 
@@ -160,7 +160,7 @@ def _EnsureInstancesExist(client, names):
   This function will raise an OpPrereqError in case they don't
   exist. Otherwise it will exit cleanly.
 
-  @type client: L{luxi.Client}
+  @type client: L{ganeti.luxi.Client}
   @param client: the client to use for the query
   @type names: list
   @param names: the list of instance names to query
@@ -441,6 +441,8 @@ def BatchCreate(opts, args):
     ToStderr("Can't parse the instance definition file: %s" % str(err))
     return 1
 
+  jex = JobExecutor()
+
   # Iterate over the instances and do:
   #  * Populate the specs with default value
   #  * Validate the instance specs
@@ -486,7 +488,9 @@ def BatchCreate(opts, args):
                                   file_storage_dir=specs['file_storage_dir'],
                                   file_driver=specs['file_driver'])
 
-    ToStdout("%s: %s", name, cli.SendJob([op]))
+    jex.QueueJob(name, op)
+  # we never want to wait, just show the submitted job IDs
+  jex.WaitOrShow(False)
 
   return 0
 
@@ -502,8 +506,15 @@ def ReinstallInstance(opts, args):
   @return: the desired exit code
 
   """
-  instance_name = args[0]
+  # first, compute the desired name list
+  if opts.multi_mode is None:
+    opts.multi_mode = _SHUTDOWN_INSTANCES
+
+  inames = _ExpandMultiNames(opts.multi_mode, args)
+  if not inames:
+    raise errors.OpPrereqError("Selection filter does not match any instances")
 
+  # second, if requested, ask for an OS
   if opts.select_os is True:
     op = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
     result = SubmitOpCode(op)
@@ -525,23 +536,35 @@ def ReinstallInstance(opts, args):
                        choices)
 
     if selected == 'exit':
-      ToStdout("User aborted reinstall, exiting")
+      ToStderr("User aborted reinstall, exiting")
       return 1
 
     os_name = selected
   else:
     os_name = opts.os
 
-  if not opts.force:
-    usertext = ("This will reinstall the instance %s and remove"
-                " all data. Continue?") % instance_name
-    if not AskUser(usertext):
+  # third, get confirmation: multi-reinstall requires --force-multi
+  # *and* --force, single-reinstall just --force
+  multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
+  if multi_on:
+    warn_msg = "Note: this will remove *all* data for the below instances!\n"
+    if not ((opts.force_multi and opts.force) or
+            _ConfirmOperation(inames, "reinstall", extra=warn_msg)):
       return 1
+  else:
+    if not opts.force:
+      usertext = ("This will reinstall the instance %s and remove"
+                  " all data. Continue?") % inames[0]
+      if not AskUser(usertext):
+        return 1
+
+  jex = JobExecutor(verbose=multi_on)
+  for instance_name in inames:
+    op = opcodes.OpReinstallInstance(instance_name=instance_name,
+                                     os_type=os_name)
+    jex.QueueJob(instance_name, op)
 
-  op = opcodes.OpReinstallInstance(instance_name=instance_name,
-                                   os_type=os_name)
-  SubmitOrSend(op, opts)
-
+  jex.WaitOrShow(not opts.submit_only)
   return 0
 
 
@@ -1374,8 +1397,11 @@ commands = {
            " The default field"
            " list is (in order): %s." % ", ".join(_LIST_DEF_FIELDS),
            ),
-  'reinstall': (ReinstallInstance, ARGS_ONE,
+  'reinstall': (ReinstallInstance, ARGS_ANY,
                 [DEBUG_OPT, FORCE_OPT, os_opt,
+                 m_force_multi,
+                 m_node_opt, m_pri_node_opt, m_sec_node_opt,
+                 m_clust_opt, m_inst_opt,
                  make_option("--select-os", dest="select_os",
                              action="store_true", default=False,
                              help="Interactive OS reinstall, lists available"
index 2da75a3..1402583 100755 (executable)
@@ -29,6 +29,7 @@ from ganeti.cli import *
 from ganeti import constants
 from ganeti import errors
 from ganeti import utils
+from ganeti import cli
 
 
 #: default list of fields for L{ListJobs}
@@ -312,6 +313,32 @@ def ShowJobs(opts, args):
   return 0
 
 
+def WatchJob(opts, args):
+  """Follow a job and print its output as it arrives.
+
+  @param opts: the command line options selected by the user
+  @type args: list
+  @param args: Contains the job ID
+  @rtype: int
+  @return: the desired exit code
+
+  """
+  job_id = args[0]
+
+  msg = ("Output from job %s follows" % job_id)
+  ToStdout(msg)
+  ToStdout("-" * len(msg))
+
+  retcode = 0
+  try:
+    cli.PollJob(job_id)
+  except errors.GenericError, err:
+    (retcode, job_result) = cli.FormatError(err)
+    ToStderr("Job %s failed: %s", job_id, job_result)
+
+  return retcode
+
+
 commands = {
   'list': (ListJobs, ARGS_ANY,
             [DEBUG_OPT, NOHDR_OPT, SEP_OPT, FIELDS_OPT],
@@ -336,6 +363,9 @@ commands = {
   'info': (ShowJobs, ARGS_ANY, [DEBUG_OPT],
            "<job-id> [<job-id> ...]",
            "Show detailed information about the specified jobs"),
+  'watch': (WatchJob, ARGS_ONE, [DEBUG_OPT],
+           "<job-id>",
+           "Follows a job and prints its output as it arrives"),
   }
 
 
index bb244d5..90ec77a 100755 (executable)
@@ -189,7 +189,7 @@ def EvacuateNode(opts, args):
 
   cnt = [dst_node, iallocator].count(None)
   if cnt != 1:
-    raise errors.OpPrereqError("One and only one of the -n and -i"
+    raise errors.OpPrereqError("One and only one of the -n and -I"
                                " options must be passed")
 
   selected_fields = ["name", "sinst_list"]
@@ -541,7 +541,7 @@ commands = {
                           choices=('yes', 'no'), default=None,
                           help="Set the drained flag on the node"),
               ],
-             "<instance>", "Alters the parameters of an instance"),
+             "<node_name>", "Alters the parameters of a node"),
   'remove': (RemoveNode, ARGS_ONE, [DEBUG_OPT],
              "<node_name>", "Removes a node from the cluster"),
   'volumes': (ListVolumes, ARGS_ANY,
index 24c9491..37bcb20 100755 (executable)
@@ -69,6 +69,7 @@ class TestConfigRunner(unittest.TestCase):
       default_bridge=constants.DEFAULT_BRIDGE,
       tcpudp_port_pool=set(),
       default_hypervisor=constants.HT_FAKE,
+      enabled_hypervisors=[constants.HT_FAKE],
       master_node=me.name,
       master_ip="127.0.0.1",
       master_netdev=constants.DEFAULT_BRIDGE,
index d0ef877..b38680f 100755 (executable)
@@ -41,11 +41,16 @@ from ganeti import utils
 
 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
 
+MAX_RETRIES = 3
 
 class InstanceDown(Exception):
   """The checked instance was not up"""
 
 
+class BurninFailure(Exception):
+  """Failure detected during burning"""
+
+
 def Usage():
   """Shows program usage information and exits the program."""
 
@@ -106,6 +111,9 @@ class Burner(object):
     self.to_rem = []
     self.queued_ops = []
     self.opts = None
+    self.queue_retry = False
+    self.disk_count = self.disk_growth = self.disk_size = None
+    self.hvp = self.bep = None
     self.ParseOptions()
     self.cl = cli.GetClient()
     self.GetState()
@@ -125,7 +133,39 @@ class Burner(object):
     if self.opts.verbose:
       Log(msg, indent=3)
 
-  def ExecOp(self, *ops):
+  def MaybeRetry(self, retry_count, msg, fn, *args):
+    """Possibly retry a given function execution.
+
+    @type retry_count: int
+    @param retry_count: retry counter:
+        - 0: non-retryable action
+        - 1: last retry for a retryable action
+        - MAX_RETRIES: original try for a retryable action
+    @type msg: str
+    @param msg: the kind of the operation
+    @type fn: callable
+    @param fn: the function to be called
+
+    """
+    try:
+      val = fn(*args)
+      if retry_count > 0 and retry_count < MAX_RETRIES:
+        Log("Idempotent %s succeeded after %d retries" %
+            (msg, MAX_RETRIES - retry_count))
+      return val
+    except Exception, err:
+      if retry_count == 0:
+        Log("Non-idempotent %s failed, aborting" % (msg, ))
+        raise
+      elif retry_count == 1:
+        Log("Idempotent %s repeated failure, aborting" % (msg, ))
+        raise
+      else:
+        Log("Idempotent %s failed, retry #%d/%d: %s" %
+            (msg, MAX_RETRIES - retry_count + 1, MAX_RETRIES, err))
+        self.MaybeRetry(retry_count - 1, msg, fn, *args)
+
+  def _ExecOp(self, *ops):
     """Execute one or more opcodes and manage the exec buffer.
 
     @result: if only opcode has been passed, we return its result;
@@ -139,20 +179,48 @@ class Burner(object):
     else:
       return results
 
+  def ExecOp(self, retry, *ops):
+    """Execute one or more opcodes and manage the exec buffer.
+
+    @result: if only opcode has been passed, we return its result;
+        otherwise we return the list of results
+
+    """
+    if retry:
+      rval = MAX_RETRIES
+    else:
+      rval = 0
+    return self.MaybeRetry(rval, "opcode", self._ExecOp, *ops)
+
   def ExecOrQueue(self, name, *ops):
     """Execute an opcode and manage the exec buffer."""
     if self.opts.parallel:
       self.queued_ops.append((ops, name))
     else:
-      return self.ExecOp(*ops)
+      return self.ExecOp(self.queue_retry, *ops)
+
+  def StartBatch(self, retry):
+    """Start a new batch of jobs.
+
+    @param retry: whether this is a retryable batch
+
+    """
+    self.queued_ops = []
+    self.queue_retry = retry
 
   def CommitQueue(self):
     """Execute all submitted opcodes in case of parallel burnin"""
     if not self.opts.parallel:
       return
 
+    if self.queue_retry:
+      rval = MAX_RETRIES
+    else:
+      rval = 0
+
     try:
-      results = self.ExecJobSet(self.queued_ops)
+      results = self.MaybeRetry(rval, "jobset", self.ExecJobSet,
+                                self.queued_ops)
     finally:
       self.queued_ops = []
     return results
@@ -171,10 +239,45 @@ class Burner(object):
     results = []
     for jid, (_, iname) in zip(job_ids, jobs):
       Log("waiting for job %s for %s" % (jid, iname), indent=2)
-      results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
-
+      try:
+        results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
+      except Exception, err:
+        Log("Job for %s failed: %s" % (iname, err))
+    if len(results) != len(jobs):
+      raise BurninFailure()
     return results
 
+  def _DoCheckInstances(fn):
+    """Decorator for checking instances.
+
+    """
+    def wrapper(self, *args, **kwargs):
+      val = fn(self, *args, **kwargs)
+      for instance in self.instances:
+        self._CheckInstanceAlive(instance)
+      return val
+
+    return wrapper
+
+  def _DoBatch(retry):
+    """Decorator for possible batch operations.
+
+    Must come after the _DoCheckInstances decorator (if any).
+
+    @param retry: whether this is a retryable batch, will be
+        passed to StartBatch
+
+    """
+    def wrap(fn):
+      def batched(self, *args, **kwargs):
+        self.StartBatch(retry)
+        val = fn(self, *args, **kwargs)
+        self.CommitQueue()
+        return val
+      return batched
+
+    return wrap
+
   def ParseOptions(self):
     """Parses the command line options.
 
@@ -325,14 +428,14 @@ class Burner(object):
     try:
       op = opcodes.OpQueryNodes(output_fields=["name", "offline", "drained"],
                                 names=names, use_locking=True)
-      result = self.ExecOp(op)
+      result = self.ExecOp(True, op)
     except errors.GenericError, err:
       err_code, msg = cli.FormatError(err)
       Err(msg, exit_code=err_code)
     self.nodes = [data[0] for data in result if not (data[1] or data[2])]
 
-    result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
-                                              names=[]))
+    op_diagos = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
+    result = self.ExecOp(True, op_diagos)
 
     if not result:
       Err("Can't get the OS list")
@@ -343,6 +446,8 @@ class Burner(object):
     if self.opts.os not in os_set:
       Err("OS '%s' not found" % self.opts.os)
 
+  @_DoCheckInstances
+  @_DoBatch(False)
   def BurnCreateInstances(self):
     """Create the given instances.
 
@@ -388,11 +493,7 @@ class Burner(object):
       self.ExecOrQueue(instance, op)
       self.to_rem.append(instance)
 
-    self.CommitQueue()
-
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
-
+  @_DoBatch(False)
   def BurnGrowDisks(self):
     """Grow both the os and the swap disks by the requested amount, if any."""
     Log("Growing disks")
@@ -404,8 +505,8 @@ class Burner(object):
                                   amount=growth, wait_for_sync=True)
           Log("increase disk/%s by %s MB" % (idx, growth), indent=2)
           self.ExecOrQueue(instance, op)
-    self.CommitQueue()
 
+  @_DoBatch(True)
   def BurnReplaceDisks1D8(self):
     """Replace disks on primary and secondary for drbd8."""
     Log("Replacing disks on the same nodes")
@@ -419,8 +520,8 @@ class Burner(object):
         Log("run %s" % mode, indent=2)
         ops.append(op)
       self.ExecOrQueue(instance, *ops)
-    self.CommitQueue()
 
+  @_DoBatch(True)
   def BurnReplaceDisks2(self):
     """Replace secondary node."""
     Log("Changing the secondary node")
@@ -442,8 +543,9 @@ class Burner(object):
                                   disks=[i for i in range(self.disk_count)])
       Log("run %s %s" % (mode, msg), indent=2)
       self.ExecOrQueue(instance, op)
-    self.CommitQueue()
 
+  @_DoCheckInstances
+  @_DoBatch(False)
   def BurnFailover(self):
     """Failover the instances."""
     Log("Failing over instances")
@@ -453,10 +555,8 @@ class Burner(object):
                                       ignore_consistency=False)
 
       self.ExecOrQueue(instance, op)
-    self.CommitQueue()
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
 
+  @_DoBatch(False)
   def BurnMigrate(self):
     """Migrate the instances."""
     Log("Migrating instances")
@@ -469,8 +569,9 @@ class Burner(object):
                                       cleanup=True)
       Log("migration and migration cleanup", indent=2)
       self.ExecOrQueue(instance, op1, op2)
-    self.CommitQueue()
 
+  @_DoCheckInstances
+  @_DoBatch(False)
   def BurnImportExport(self):
     """Export the instance, delete it, and import it back.
 
@@ -486,7 +587,7 @@ class Burner(object):
       # read the full name of the instance
       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
                                         names=[instance], use_locking=True)
-      full_name = self.ExecOp(nam_op)[0][0]
+      full_name = self.ExecOp(False, nam_op)[0][0]
 
       if self.opts.iallocator:
         pnode = snode = None
@@ -535,10 +636,6 @@ class Burner(object):
       Log("remove export", indent=2)
       self.ExecOrQueue(instance, exp_op, rem_op, imp_op, erem_op)
 
-    self.CommitQueue()
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
-
   def StopInstanceOp(self, instance):
     """Stop given instance."""
     return opcodes.OpShutdownInstance(instance_name=instance)
@@ -552,6 +649,8 @@ class Burner(object):
     return opcodes.OpRenameInstance(instance_name=instance,
                                     new_name=instance_new)
 
+  @_DoCheckInstances
+  @_DoBatch(True)
   def BurnStopStart(self):
     """Stop/start the instances."""
     Log("Stopping and starting instances")
@@ -561,11 +660,7 @@ class Burner(object):
       op2 = self.StartInstanceOp(instance)
       self.ExecOrQueue(instance, op1, op2)
 
-    self.CommitQueue()
-
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
-
+  @_DoBatch(False)
   def BurnRemove(self):
     """Remove the instances."""
     Log("Removing instances")
@@ -575,8 +670,6 @@ class Burner(object):
                                     ignore_failures=True)
       self.ExecOrQueue(instance, op)
 
-    self.CommitQueue()
-
   def BurnRename(self):
     """Rename the instances.
 
@@ -594,11 +687,13 @@ class Burner(object):
       op_rename2 = self.RenameInstanceOp(rename, instance)
       op_start1 = self.StartInstanceOp(rename)
       op_start2 = self.StartInstanceOp(instance)
-      self.ExecOp(op_stop1, op_rename1, op_start1)
+      self.ExecOp(False, op_stop1, op_rename1, op_start1)
       self._CheckInstanceAlive(rename)
-      self.ExecOp(op_stop2, op_rename2, op_start2)
+      self.ExecOp(False, op_stop2, op_rename2, op_start2)
       self._CheckInstanceAlive(instance)
 
+  @_DoCheckInstances
+  @_DoBatch(True)
   def BurnReinstall(self):
     """Reinstall the instances."""
     Log("Reinstalling instances")
@@ -613,11 +708,8 @@ class Burner(object):
       op4 = self.StartInstanceOp(instance)
       self.ExecOrQueue(instance, op1, op2, op3, op4)
 
-    self.CommitQueue()
-
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
-
+  @_DoCheckInstances
+  @_DoBatch(True)
   def BurnReboot(self):
     """Reboot the instances."""
     Log("Rebooting instances")
@@ -632,11 +724,8 @@ class Burner(object):
         ops.append(op)
       self.ExecOrQueue(instance, *ops)
 
-    self.CommitQueue()
-
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
-
+  @_DoCheckInstances
+  @_DoBatch(True)
   def BurnActivateDisks(self):
     """Activate and deactivate disks of the instances."""
     Log("Activating/deactivating disks")
@@ -650,10 +739,9 @@ class Burner(object):
       Log("activate disks when offline", indent=2)
       Log("deactivate disks (when offline)", indent=2)
       self.ExecOrQueue(instance, op_act, op_stop, op_act, op_deact, op_start)
-    self.CommitQueue()
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
 
+  @_DoCheckInstances
+  @_DoBatch(False)
   def BurnAddRemoveDisks(self):
     """Add and remove an extra disk for the instances."""
     Log("Adding and removing disks")
@@ -669,10 +757,8 @@ class Burner(object):
       Log("adding a disk", indent=2)
       Log("removing last disk", indent=2)
       self.ExecOrQueue(instance, op_add, op_stop, op_rem, op_start)
-    self.CommitQueue()
-    for instance in self.instances:
-      self._CheckInstanceAlive(instance)
 
+  @_DoBatch(False)
   def BurnAddRemoveNICs(self):
     """Add and remove an extra NIC for the instances."""
     Log("Adding and removing NICs")
@@ -685,7 +771,6 @@ class Burner(object):
       Log("adding a NIC", indent=2)
       Log("removing last NIC", indent=2)
       self.ExecOrQueue(instance, op_add, op_rem)
-    self.CommitQueue()
 
   def _CheckInstanceAlive(self, instance):
     """Check if an instance is alive by doing http checks.
@@ -784,7 +869,14 @@ class Burner(object):
         Log(self.GetFeedbackBuf())
         Log("\n\n")
       if not self.opts.keep_instances:
-        self.BurnRemove()
+        try:
+          self.BurnRemove()
+        except Exception, err:
+          if has_err: # already detected errors, so errors in removal
+                      # are quite expected
+            Log("Note: error detected during instance remove: %s" % str(err))
+          else: # non-expected error
+            raise
 
     return 0
 
index 8af2f61..a10296e 100755 (executable)
@@ -46,6 +46,7 @@ import time
 
 from ganeti.utils import RunCmd
 from ganeti import constants
+from ganeti import cli
 
 USAGE = ("\tlvmstrap diskinfo\n"
          "\tlvmstrap [--vgname=NAME] [--allow-removable]"
@@ -267,7 +268,7 @@ def CheckSysDev(name, devnum):
    devnum: the device number, e.g. 0x803 (2051 in decimal) for sda3
 
   Returns:
-    None; failure of the check is signalled by raising a
+    None; failure of the check is signaled by raising a
       SysconfigError exception
   """
 
@@ -449,7 +450,7 @@ def GetMountInfo():
 
 
 def DevInfo(name, dev, mountinfo):
-  """Computes miscellaneous informations about a block device.
+  """Computes miscellaneous information about a block device.
 
   Args:
     name: the device name, e.g. sda
@@ -478,7 +479,7 @@ def DevInfo(name, dev, mountinfo):
 def ShowDiskInfo(opts):
   """Shows a nicely formatted block device list for this system.
 
-  This function shows the user a table with the informations gathered
+  This function shows the user a table with the information gathered
   by the other functions defined, in order to help the user make a
   choice about which disks should be allocated to our volume group.
 
@@ -487,8 +488,15 @@ def ShowDiskInfo(opts):
   dlist = GetDiskList(opts)
 
   print "------- Disk information -------"
-  print ("%5s %7s %4s %5s %-10s %s" %
-         ("Name", "Size[M]", "Used", "Mount", "LVM?", "Info"))
+  headers = {
+      "name": "Name",
+      "size": "Size[M]",
+      "used": "Used",
+      "mount": "Mount",
+      "lvm": "LVM?",
+      "info": "Info"
+      }
+  fields = ["name", "size", "used", "mount", "lvm", "info"]
 
   flatlist = []
   # Flatten the [(disk, [partition,...]), ...] list
@@ -501,6 +509,7 @@ def ShowDiskInfo(opts):
     for partname, partsize, partdev in parts:
       flatlist.append((partname, partsize, partdev, ""))
 
+  strlist = []
   for name, size, dev, in_use in flatlist:
     mp, vgname, fileinfo = DevInfo(name, dev, mounts)
     if mp is None:
@@ -515,8 +524,15 @@ def ShowDiskInfo(opts):
     if len(name) > 3:
       # Indent partitions
       name = " %s" % name
-    print ("%-5s %7.2f %-4s %-5s %-10s %s" %
-           (name, float(size) / 1024 / 1024, in_use, mp, lvminfo, fileinfo))
+
+    strlist.append([name, "%.2f" % (float(size) / 1024 / 1024),
+                    in_use, mp, lvminfo, fileinfo])
+
+  data = cli.GenerateTable(headers, fields, None,
+                           strlist, numfields=["size"])
+
+  for line in data:
+    print line
 
 
 def CheckReread(name):