gnt-debug: Extend job queue tests
[ganeti-local] / scripts / gnt-debug
index c96deca..944ccdc 100755 (executable)
 import sys
 import simplejson
 import time
+import socket
+import logging
 
 from ganeti.cli import *
 from ganeti import cli
+from ganeti import constants
 from ganeti import opcodes
 from ganeti import utils
 from ganeti import errors
@@ -50,7 +53,8 @@ def Delay(opts, args):
   delay = float(args[0])
   op = opcodes.OpTestDelay(duration=delay,
                            on_master=opts.on_master,
-                           on_nodes=opts.on_nodes)
+                           on_nodes=opts.on_nodes,
+                           repeat=opts.repeat)
   SubmitOpCode(op, opts=opts)
 
   return 0
@@ -59,8 +63,6 @@ def Delay(opts, args):
 def GenericOpCodes(opts, args):
   """Send any opcode to the master.
 
-  @todo: The function is broken and needs to be converted to the
-      current job queue API
   @param opts: the command line options selected by the user
   @type args: list
   @param args: should contain only one element, the path of
@@ -156,6 +158,246 @@ def TestAllocator(opts, args):
   return 0
 
 
+class _JobQueueTestReporter(cli.StdioJobPollReportCb):
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    cli.StdioJobPollReportCb.__init__(self)
+    self._expected_msgcount = 0
+    self._all_testmsgs = []
+    self._testmsgs = None
+    self._job_id = None
+
+  def GetTestMessages(self):
+    """Returns all test log messages received so far.
+
+    """
+    return self._all_testmsgs
+
+  def GetJobId(self):
+    """Returns the job ID.
+
+    """
+    return self._job_id
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    if self._job_id is None:
+      self._job_id = job_id
+    elif self._job_id != job_id:
+      raise errors.ProgrammerError("The same reporter instance was used for"
+                                   " more than one job")
+
+    if log_type == constants.ELOG_JQUEUE_TEST:
+      (sockname, test, arg) = log_msg
+      return self._ProcessTestMessage(job_id, sockname, test, arg)
+
+    elif (log_type == constants.ELOG_MESSAGE and
+          log_msg.startswith(constants.JQT_MSGPREFIX)):
+      if self._testmsgs is None:
+        raise errors.OpExecError("Received test message without a preceding"
+                                 " start message")
+      testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
+      self._testmsgs.append(testmsg)
+      self._all_testmsgs.append(testmsg)
+      return
+
+    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
+                                                     timestamp, log_type,
+                                                     log_msg)
+
+  def _ProcessTestMessage(self, job_id, sockname, test, arg):
+    """Handles a job queue test message.
+
+    """
+    if test not in constants.JQT_ALL:
+      raise errors.OpExecError("Received invalid test message %s" % test)
+
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    try:
+      sock.settimeout(30.0)
+
+      logging.debug("Connecting to %s", sockname)
+      sock.connect(sockname)
+
+      logging.debug("Checking status")
+      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
+      if not jobdetails:
+        raise errors.OpExecError("Can't find job %s" % job_id)
+
+      status = jobdetails[0]
+
+      logging.debug("Status of job %s is %s", job_id, status)
+
+      if test == constants.JQT_EXPANDNAMES:
+        if status != constants.JOB_STATUS_WAITLOCK:
+          raise errors.OpExecError("Job status while expanding names is '%s',"
+                                   " not '%s' as expected" %
+                                   (status, constants.JOB_STATUS_WAITLOCK))
+      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
+        if status != constants.JOB_STATUS_RUNNING:
+          raise errors.OpExecError("Job status while executing opcode is '%s',"
+                                   " not '%s' as expected" %
+                                   (status, constants.JOB_STATUS_RUNNING))
+
+      if test == constants.JQT_STARTMSG:
+        logging.debug("Expecting %s test messages", arg)
+        self._testmsgs = []
+      elif test == constants.JQT_LOGMSG:
+        if len(self._testmsgs) != arg:
+          raise errors.OpExecError("Received %s test messages when %s are"
+                                   " expected" % (len(self._testmsgs), arg))
+    finally:
+      logging.debug("Closing socket")
+      sock.close()
+
+
+def TestJobqueue(opts, _):
+  """Runs a few tests on the job queue.
+
+  """
+  (TM_SUCCESS,
+   TM_MULTISUCCESS,
+   TM_FAIL,
+   TM_PARTFAIL) = range(4)
+  TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
+
+  for mode in TM_ALL:
+    test_messages = [
+      "Testing mode %s" % mode,
+      "Hello World",
+      "A",
+      "",
+      "B"
+      "Foo|bar|baz",
+      utils.TimestampForFilename(),
+      ]
+
+    fail = mode in (TM_FAIL, TM_PARTFAIL)
+
+    if mode == TM_PARTFAIL:
+      ToStdout("Testing partial job failure")
+      ops = [
+        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+                               log_messages=test_messages, fail=False),
+        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+                               log_messages=test_messages, fail=False),
+        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+                               log_messages=test_messages, fail=True),
+        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+                               log_messages=test_messages, fail=False),
+        ]
+      expect_messages = 3 * [test_messages]
+      expect_opstatus = [
+        constants.OP_STATUS_SUCCESS,
+        constants.OP_STATUS_SUCCESS,
+        constants.OP_STATUS_ERROR,
+        constants.OP_STATUS_ERROR,
+        ]
+      expect_resultlen = 2
+    elif mode == TM_MULTISUCCESS:
+      ToStdout("Testing multiple successful opcodes")
+      ops = [
+        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+                               log_messages=test_messages, fail=False),
+        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
+                               log_messages=test_messages, fail=False),
+        ]
+      expect_messages = 2 * [test_messages]
+      expect_opstatus = [
+        constants.OP_STATUS_SUCCESS,
+        constants.OP_STATUS_SUCCESS,
+        ]
+      expect_resultlen = 2
+    else:
+      if mode == TM_SUCCESS:
+        ToStdout("Testing job success")
+        expect_opstatus = [constants.OP_STATUS_SUCCESS]
+      elif mode == TM_FAIL:
+        ToStdout("Testing job failure")
+        expect_opstatus = [constants.OP_STATUS_ERROR]
+      else:
+        raise errors.ProgrammerError("Unknown test mode %s" % mode)
+
+      ops = [
+        opcodes.OpTestJobqueue(notify_waitlock=True,
+                               notify_exec=True,
+                               log_messages=test_messages,
+                               fail=fail)
+        ]
+      expect_messages = [test_messages]
+      expect_resultlen = 1
+
+    cl = cli.GetClient()
+    cli.SetGenericOpcodeOpts(ops, opts)
+
+    # Send job to master daemon
+    job_id = cli.SendJob(ops, cl=cl)
+
+    reporter = _JobQueueTestReporter()
+    results = None
+
+    try:
+      results = cli.PollJob(job_id, cl=cl, reporter=reporter)
+    except errors.OpExecError, err:
+      if not fail:
+        raise
+      ToStdout("Ignoring error: %s", err)
+    else:
+      if fail:
+        raise errors.OpExecError("Job didn't fail when it should")
+
+    # Check length of result
+    if fail:
+      if results is not None:
+        raise errors.OpExecError("Received result from failed job")
+    elif len(results) != expect_resultlen:
+      raise errors.OpExecError("Received %s results (%s), expected %s" %
+                               (len(results), results, expect_resultlen))
+
+    # Check received log messages
+    all_messages = [i for j in expect_messages for i in j]
+    if reporter.GetTestMessages() != all_messages:
+      raise errors.OpExecError("Received test messages don't match input"
+                               " (input %r, received %r)" %
+                               (all_messages, reporter.GetTestMessages()))
+
+    # Check final status
+    reported_job_id = reporter.GetJobId()
+    if reported_job_id != job_id:
+      raise errors.OpExecError("Reported job ID %s doesn't match"
+                               "submission job ID %s" %
+                               (reported_job_id, job_id))
+
+    jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
+    if not jobdetails:
+      raise errors.OpExecError("Can't find job %s" % job_id)
+
+    if fail:
+      exp_status = constants.JOB_STATUS_ERROR
+    else:
+      exp_status = constants.JOB_STATUS_SUCCESS
+
+    (final_status, final_opstatus) = jobdetails
+    if final_status != exp_status:
+      raise errors.OpExecError("Final job status is %s, not %s as expected" %
+                               (final_status, exp_status))
+    if len(final_opstatus) != len(ops):
+      raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
+                               " expected %s)" %
+                               (len(final_opstatus), len(ops)))
+    if final_opstatus != expect_opstatus:
+      raise errors.OpExecError("Opcode status is %s, expected %s" %
+                               (final_opstatus, expect_opstatus))
+
+  ToStdout("Job queue test successful")
+
+  return 0
+
+
 commands = {
   'delay': (
     Delay, [ArgUnknown(min=1, max=1)],
@@ -163,6 +405,8 @@ commands = {
                 action="store_false", help="Do not sleep in the master code"),
      cli_option("-n", dest="on_nodes", default=[],
                 action="append", help="Select nodes to sleep on"),
+     cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
+                help="Number of times to repeat the sleep"),
      ],
     "[opts...] <duration>", "Executes a TestDelay OpCode"),
   'submit-job': (
@@ -205,6 +449,9 @@ commands = {
                 help="Comma separated list of tags"),
      ],
     "{opts...} <instance>", "Executes a TestAllocator OpCode"),
+  "test-jobqueue": (
+    TestJobqueue, ARGS_NONE, [],
+    "", "Test a few aspects of the job queue")
   }