#!/usr/bin/python
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
+"""Debugging commands"""
-# pylint: disable-msg=W0401,W0614
+# pylint: disable-msg=W0401,W0614,C0103
# W0401: Wildcard import ganeti.cli
# W0614: Unused import %s from wildcard import (since we need cli)
+# C0103: Invalid name gnt-backup
import sys
import simplejson
import time
+import socket
+import logging
from ganeti.cli import *
from ganeti import cli
-from ganeti import opcodes
from ganeti import constants
+from ganeti import opcodes
from ganeti import utils
from ganeti import errors
+from ganeti import compat
+
+
+#: Default fields for L{ListLocks}
+_LIST_LOCKS_DEF_FIELDS = [
+ "name",
+ "mode",
+ "owner",
+ "pending",
+ ]
def Delay(opts, args):
delay = float(args[0])
op = opcodes.OpTestDelay(duration=delay,
on_master=opts.on_master,
- on_nodes=opts.on_nodes)
- SubmitOpCode(op)
+ on_nodes=opts.on_nodes,
+ repeat=opts.repeat)
+ SubmitOpCode(op, opts=opts)
return 0
def GenericOpCodes(opts, args):
"""Send any opcode to the master.
- @todo: The function is broken and needs to be converted to the
- current job queue API
@param opts: the command line options selected by the user
@type args: list
@param args: should contain only one element, the path of
"""
cl = cli.GetClient()
- jex = cli.JobExecutor(cl=cl, verbose=opts.verbose)
+ jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
job_cnt = 0
op_cnt = 0
ToStdout("Loading...")
for job_idx in range(opts.rep_job):
for fname in args:
+ # pylint: disable-msg=W0142
op_data = simplejson.loads(utils.ReadFile(fname))
op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
op_list = op_list * opts.rep_op
t1 = time.time()
ToStdout("Submitting...")
- jex.SubmitPending()
+ jex.SubmitPending(each=opts.each)
if opts.timing_stats:
t2 = time.time()
op = opcodes.OpTestAllocator(mode=opts.mode,
name=args[0],
+ evac_nodes=args,
mem_size=opts.mem,
disks=disks,
disk_template=opts.disk_template,
nics=nic_dict,
- os=opts.os_type,
+ os=opts.os,
vcpus=opts.vcpus,
tags=opts.tags,
direction=opts.direction,
- allocator=opts.allocator,
+ allocator=opts.iallocator,
)
- result = SubmitOpCode(op)
+ result = SubmitOpCode(op, opts=opts)
ToStdout("%s" % result)
return 0
+def _TestJobSubmission(opts):
+ """Tests submitting jobs.
+
+ """
+ ToStdout("Testing job submission")
+
+ testdata = [
+ (0, 0, constants.OP_PRIO_LOWEST),
+ (0, 0, constants.OP_PRIO_HIGHEST),
+ ]
+
+ for priority in (constants.OP_PRIO_SUBMIT_VALID |
+ frozenset([constants.OP_PRIO_LOWEST,
+ constants.OP_PRIO_HIGHEST])):
+ for offset in [-1, +1]:
+ testdata.extend([
+ (0, 0, priority + offset),
+ (3, 0, priority + offset),
+ (0, 3, priority + offset),
+ (4, 2, priority + offset),
+ ])
+
+ cl = cli.GetClient()
+
+ for before, after, failpriority in testdata:
+ ops = []
+ ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
+ ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
+ ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
+
+ try:
+ cl.SubmitJob(ops)
+ except errors.GenericError, err:
+ if opts.debug:
+ ToStdout("Ignoring error: %s", err)
+ else:
+ raise errors.OpExecError("Submitting opcode with priority %s did not"
+ " fail when it should (allowed are %s)" %
+ (failpriority, constants.OP_PRIO_SUBMIT_VALID))
+
+ jobs = [
+ [opcodes.OpTestDelay(duration=0),
+ opcodes.OpTestDelay(duration=0, dry_run=False),
+ opcodes.OpTestDelay(duration=0, dry_run=True)],
+ ops,
+ ]
+ result = cl.SubmitManyJobs(jobs)
+ if not (len(result) == 2 and
+ compat.all(len(i) == 2 for i in result) and
+ compat.all(isinstance(i[1], basestring) for i in result) and
+ result[0][0] and not result[1][0]):
+ raise errors.OpExecError("Submitting multiple jobs did not work as"
+ " expected, result %s" % result)
+ assert len(result) == 2
+
+ ToStdout("Job submission tests were successful")
+
+
+class _JobQueueTestReporter(cli.StdioJobPollReportCb):
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ cli.StdioJobPollReportCb.__init__(self)
+ self._expected_msgcount = 0
+ self._all_testmsgs = []
+ self._testmsgs = None
+ self._job_id = None
+
+ def GetTestMessages(self):
+ """Returns all test log messages received so far.
+
+ """
+ return self._all_testmsgs
+
+ def GetJobId(self):
+ """Returns the job ID.
+
+ """
+ return self._job_id
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ if self._job_id is None:
+ self._job_id = job_id
+ elif self._job_id != job_id:
+ raise errors.ProgrammerError("The same reporter instance was used for"
+ " more than one job")
+
+ if log_type == constants.ELOG_JQUEUE_TEST:
+ (sockname, test, arg) = log_msg
+ return self._ProcessTestMessage(job_id, sockname, test, arg)
+
+ elif (log_type == constants.ELOG_MESSAGE and
+ log_msg.startswith(constants.JQT_MSGPREFIX)):
+ if self._testmsgs is None:
+ raise errors.OpExecError("Received test message without a preceding"
+ " start message")
+ testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
+ self._testmsgs.append(testmsg)
+ self._all_testmsgs.append(testmsg)
+ return
+
+ return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
+ timestamp, log_type,
+ log_msg)
+
+ def _ProcessTestMessage(self, job_id, sockname, test, arg):
+ """Handles a job queue test message.
+
+ """
+ if test not in constants.JQT_ALL:
+ raise errors.OpExecError("Received invalid test message %s" % test)
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ sock.settimeout(30.0)
+
+ logging.debug("Connecting to %s", sockname)
+ sock.connect(sockname)
+
+ logging.debug("Checking status")
+ jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
+ if not jobdetails:
+ raise errors.OpExecError("Can't find job %s" % job_id)
+
+ status = jobdetails[0]
+
+ logging.debug("Status of job %s is %s", job_id, status)
+
+ if test == constants.JQT_EXPANDNAMES:
+ if status != constants.JOB_STATUS_WAITLOCK:
+ raise errors.OpExecError("Job status while expanding names is '%s',"
+ " not '%s' as expected" %
+ (status, constants.JOB_STATUS_WAITLOCK))
+ elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
+ if status != constants.JOB_STATUS_RUNNING:
+ raise errors.OpExecError("Job status while executing opcode is '%s',"
+ " not '%s' as expected" %
+ (status, constants.JOB_STATUS_RUNNING))
+
+ if test == constants.JQT_STARTMSG:
+ logging.debug("Expecting %s test messages", arg)
+ self._testmsgs = []
+ elif test == constants.JQT_LOGMSG:
+ if len(self._testmsgs) != arg:
+ raise errors.OpExecError("Received %s test messages when %s are"
+ " expected" % (len(self._testmsgs), arg))
+ finally:
+ logging.debug("Closing socket")
+ sock.close()
+
+
+def TestJobqueue(opts, _):
+ """Runs a few tests on the job queue.
+
+ """
+ _TestJobSubmission(opts)
+
+ (TM_SUCCESS,
+ TM_MULTISUCCESS,
+ TM_FAIL,
+ TM_PARTFAIL) = range(4)
+ TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
+
+ for mode in TM_ALL:
+ test_messages = [
+ "Testing mode %s" % mode,
+ "Hello World",
+ "A",
+ "",
+ "B"
+ "Foo|bar|baz",
+ utils.TimestampForFilename(),
+ ]
+
+ fail = mode in (TM_FAIL, TM_PARTFAIL)
+
+ if mode == TM_PARTFAIL:
+ ToStdout("Testing partial job failure")
+ ops = [
+ opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+ log_messages=test_messages, fail=False),
+ opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+ log_messages=test_messages, fail=False),
+ opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+ log_messages=test_messages, fail=True),
+ opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+ log_messages=test_messages, fail=False),
+ ]
+ expect_messages = 3 * [test_messages]
+ expect_opstatus = [
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_ERROR,
+ constants.OP_STATUS_ERROR,
+ ]
+ expect_resultlen = 2
+ elif mode == TM_MULTISUCCESS:
+ ToStdout("Testing multiple successful opcodes")
+ ops = [
+ opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+ log_messages=test_messages, fail=False),
+ opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+ log_messages=test_messages, fail=False),
+ ]
+ expect_messages = 2 * [test_messages]
+ expect_opstatus = [
+ constants.OP_STATUS_SUCCESS,
+ constants.OP_STATUS_SUCCESS,
+ ]
+ expect_resultlen = 2
+ else:
+ if mode == TM_SUCCESS:
+ ToStdout("Testing job success")
+ expect_opstatus = [constants.OP_STATUS_SUCCESS]
+ elif mode == TM_FAIL:
+ ToStdout("Testing job failure")
+ expect_opstatus = [constants.OP_STATUS_ERROR]
+ else:
+ raise errors.ProgrammerError("Unknown test mode %s" % mode)
+
+ ops = [
+ opcodes.OpTestJobqueue(notify_waitlock=True,
+ notify_exec=True,
+ log_messages=test_messages,
+ fail=fail)
+ ]
+ expect_messages = [test_messages]
+ expect_resultlen = 1
+
+ cl = cli.GetClient()
+ cli.SetGenericOpcodeOpts(ops, opts)
+
+ # Send job to master daemon
+ job_id = cli.SendJob(ops, cl=cl)
+
+ reporter = _JobQueueTestReporter()
+ results = None
+
+ try:
+ results = cli.PollJob(job_id, cl=cl, reporter=reporter)
+ except errors.OpExecError, err:
+ if not fail:
+ raise
+ ToStdout("Ignoring error: %s", err)
+ else:
+ if fail:
+ raise errors.OpExecError("Job didn't fail when it should")
+
+ # Check length of result
+ if fail:
+ if results is not None:
+ raise errors.OpExecError("Received result from failed job")
+ elif len(results) != expect_resultlen:
+ raise errors.OpExecError("Received %s results (%s), expected %s" %
+ (len(results), results, expect_resultlen))
+
+ # Check received log messages
+ all_messages = [i for j in expect_messages for i in j]
+ if reporter.GetTestMessages() != all_messages:
+ raise errors.OpExecError("Received test messages don't match input"
+ " (input %r, received %r)" %
+ (all_messages, reporter.GetTestMessages()))
+
+ # Check final status
+ reported_job_id = reporter.GetJobId()
+ if reported_job_id != job_id:
+ raise errors.OpExecError("Reported job ID %s doesn't match"
+ "submission job ID %s" %
+ (reported_job_id, job_id))
+
+ jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
+ if not jobdetails:
+ raise errors.OpExecError("Can't find job %s" % job_id)
+
+ if fail:
+ exp_status = constants.JOB_STATUS_ERROR
+ else:
+ exp_status = constants.JOB_STATUS_SUCCESS
+
+ (final_status, final_opstatus) = jobdetails
+ if final_status != exp_status:
+ raise errors.OpExecError("Final job status is %s, not %s as expected" %
+ (final_status, exp_status))
+ if len(final_opstatus) != len(ops):
+ raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
+ " expected %s)" %
+ (len(final_opstatus), len(ops)))
+ if final_opstatus != expect_opstatus:
+ raise errors.OpExecError("Opcode status is %s, expected %s" %
+ (final_opstatus, expect_opstatus))
+
+ ToStdout("Job queue test successful")
+
+ return 0
+
+
+def ListLocks(opts, args): # pylint: disable-msg=W0613
+ """List all locks.
+
+ @param opts: the command line options selected by the user
+ @type args: list
+ @param args: should be an empty list
+ @rtype: int
+ @return: the desired exit code
+
+ """
+ selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
+
+ if not opts.no_headers:
+ headers = {
+ "name": "Name",
+ "mode": "Mode",
+ "owner": "Owner",
+ "pending": "Pending",
+ }
+ else:
+ headers = None
+
+ while True:
+ # Not reusing client as interval might be too long
+ output = GetClient().QueryLocks(selected_fields, False)
+
+ # change raw values to nicer strings
+ for row in output:
+ for idx, field in enumerate(selected_fields):
+ val = row[idx]
+
+ if field in ("mode", "owner", "pending") and not val:
+ val = "-"
+ elif field == "owner":
+ val = ",".join(val)
+ elif field == "pending":
+ val = utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
+ for mode, threads in val)
+
+ row[idx] = str(val)
+
+ data = GenerateTable(separator=opts.separator, headers=headers,
+ fields=selected_fields, data=output)
+ for line in data:
+ ToStdout(line)
+
+ if not opts.interval:
+ break
+
+ ToStdout("")
+ time.sleep(opts.interval)
+
+ return 0
+
+
commands = {
- 'delay': (Delay, [ArgUnknown(min=1, max=1)],
- [DEBUG_OPT,
- cli_option("--no-master", dest="on_master", default=True,
- action="store_false",
- help="Do not sleep in the master code"),
- cli_option("-n", dest="on_nodes", default=[],
- action="append",
- help="Select nodes to sleep on"),
- ],
- "[opts...] <duration>", "Executes a TestDelay OpCode"),
- 'submit-job': (GenericOpCodes, [ArgFile(min=1)],
- [DEBUG_OPT, VERBOSE_OPT,
- cli_option("--op-repeat", type="int", default="1",
- dest="rep_op",
- help="Repeat the opcode sequence this number"
- " of times"),
- cli_option("--job-repeat", type="int", default="1",
- dest="rep_job",
- help="Repeat the job this number"
- " of times"),
- cli_option("--timing-stats", default=False,
- action="store_true",
- help="Show timing stats"),
- ],
- "<op_list_file...>", "Submits jobs built from json files"
- " containing a list of serialized opcodes"),
- 'allocator': (TestAllocator, ARGS_ONE_INSTANCE,
- [DEBUG_OPT,
- cli_option("--dir", dest="direction",
- default="in", choices=["in", "out"],
- help="Show allocator input (in) or allocator"
- " results (out)"),
- IALLOCATOR_OPT,
- cli_option("-m", "--mode", default="relocate",
- choices=["relocate", "allocate"],
- help="Request mode, either allocate or"
- " relocate"),
- cli_option("--mem", default=128, type="unit",
- help="Memory size for the instance (MiB)"),
- cli_option("--disks", default="4096,4096",
- help="Comma separated list of disk sizes (MiB)"),
- DISK_TEMPLATE_OPT,
- cli_option("--nics", default="00:11:22:33:44:55",
- help="Comma separated list of nics, each nic"
- " definition is of form mac/ip/bridge, if"
- " missing values are replace by None"),
- OS_OPT,
- cli_option("-p", "--vcpus", default=1, type="int",
- help="Select number of VCPUs for the instance"),
- cli_option("--tags", default=None,
- help="Comma separated list of tags"),
- ],
- "{opts...} <instance>", "Executes a TestAllocator OpCode"),
+ 'delay': (
+ Delay, [ArgUnknown(min=1, max=1)],
+ [cli_option("--no-master", dest="on_master", default=True,
+ action="store_false", help="Do not sleep in the master code"),
+ cli_option("-n", dest="on_nodes", default=[],
+ action="append", help="Select nodes to sleep on"),
+ cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
+ help="Number of times to repeat the sleep"),
+ DRY_RUN_OPT, PRIORITY_OPT,
+ ],
+ "[opts...] <duration>", "Executes a TestDelay OpCode"),
+ 'submit-job': (
+ GenericOpCodes, [ArgFile(min=1)],
+ [VERBOSE_OPT,
+ cli_option("--op-repeat", type="int", default="1", dest="rep_op",
+ help="Repeat the opcode sequence this number of times"),
+ cli_option("--job-repeat", type="int", default="1", dest="rep_job",
+ help="Repeat the job this number of times"),
+ cli_option("--timing-stats", default=False,
+ action="store_true", help="Show timing stats"),
+ cli_option("--each", default=False, action="store_true",
+ help="Submit each job separately"),
+ DRY_RUN_OPT, PRIORITY_OPT,
+ ],
+ "<op_list_file...>", "Submits jobs built from json files"
+ " containing a list of serialized opcodes"),
+ 'allocator': (
+ TestAllocator, [ArgUnknown(min=1)],
+ [cli_option("--dir", dest="direction",
+ default="in", choices=["in", "out"],
+ help="Show allocator input (in) or allocator"
+ " results (out)"),
+ IALLOCATOR_OPT,
+ cli_option("-m", "--mode", default="relocate",
+ choices=["relocate", "allocate", "multi-evacuate"],
+ help="Request mode, either allocate or relocate"),
+ cli_option("--mem", default=128, type="unit",
+ help="Memory size for the instance (MiB)"),
+ cli_option("--disks", default="4096,4096",
+ help="Comma separated list of disk sizes (MiB)"),
+ DISK_TEMPLATE_OPT,
+ cli_option("--nics", default="00:11:22:33:44:55",
+ help="Comma separated list of nics, each nic"
+ " definition is of form mac/ip/bridge, if"
+ " missing values are replace by None"),
+ OS_OPT,
+ cli_option("-p", "--vcpus", default=1, type="int",
+ help="Select number of VCPUs for the instance"),
+ cli_option("--tags", default=None,
+ help="Comma separated list of tags"),
+ DRY_RUN_OPT, PRIORITY_OPT,
+ ],
+ "{opts...} <instance>", "Executes a TestAllocator OpCode"),
+ "test-jobqueue": (
+ TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
+ "", "Test a few aspects of the job queue"),
+ "locks": (
+ ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT],
+ "[--interval N]", "Show a list of locks in the master daemon"),
}