Revision e58f87a9 scripts/gnt-debug

b/scripts/gnt-debug
28 28
import sys
29 29
import simplejson
30 30
import time
31
import socket
32
import logging
31 33

  
32 34
from ganeti.cli import *
33 35
from ganeti import cli
36
from ganeti import constants
34 37
from ganeti import opcodes
35 38
from ganeti import utils
36 39
from ganeti import errors
......
155 158
  return 0
156 159

  
157 160

  
161
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
162
  def __init__(self):
163
    """Initializes this class.
164

  
165
    """
166
    cli.StdioJobPollReportCb.__init__(self)
167
    self._testmsgs = []
168
    self._job_id = None
169

  
170
  def GetTestMessages(self):
171
    """Returns all test log messages received so far.
172

  
173
    """
174
    return self._testmsgs
175

  
176
  def GetJobId(self):
177
    """Returns the job ID.
178

  
179
    """
180
    return self._job_id
181

  
182
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
183
    """Handles a log message.
184

  
185
    """
186
    if self._job_id is None:
187
      self._job_id = job_id
188
    elif self._job_id != job_id:
189
      raise errors.ProgrammerError("The same reporter instance was used for"
190
                                   " more than one job")
191

  
192
    if log_type == constants.ELOG_JQUEUE_TEST:
193
      (sockname, test, arg) = log_msg
194
      return self._ProcessTestMessage(job_id, sockname, test, arg)
195

  
196
    elif (log_type == constants.ELOG_MESSAGE and
197
          log_msg.startswith(constants.JQT_MSGPREFIX)):
198
      self._testmsgs.append(log_msg[len(constants.JQT_MSGPREFIX):])
199
      return
200

  
201
    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
202
                                                     timestamp, log_type,
203
                                                     log_msg)
204

  
205
  def _ProcessTestMessage(self, job_id, sockname, test, arg):
206
    """Handles a job queue test message.
207

  
208
    """
209
    if test not in constants.JQT_ALL:
210
      raise errors.OpExecError("Received invalid test message %s" % test)
211

  
212
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
213
    try:
214
      sock.settimeout(30.0)
215

  
216
      logging.debug("Connecting to %s", sockname)
217
      sock.connect(sockname)
218

  
219
      logging.debug("Checking status")
220
      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
221
      if not jobdetails:
222
        raise errors.OpExecError("Can't find job %s" % job_id)
223

  
224
      status = jobdetails[0]
225

  
226
      logging.debug("Status of job %s is %s", job_id, status)
227

  
228
      if test == constants.JQT_EXPANDNAMES:
229
        if status != constants.JOB_STATUS_WAITLOCK:
230
          raise errors.OpExecError("Job status while expanding names is '%s',"
231
                                   " not '%s' as expected" %
232
                                   (status, constants.JOB_STATUS_WAITLOCK))
233
      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
234
        if status != constants.JOB_STATUS_RUNNING:
235
          raise errors.OpExecError("Job status while executing opcode is '%s',"
236
                                   " not '%s' as expected" %
237
                                   (status, constants.JOB_STATUS_RUNNING))
238

  
239
      if test == constants.JQT_LOGMSG:
240
        if len(self._testmsgs) != arg:
241
          raise errors.OpExecError("Received %s test messages when %s are"
242
                                   " expected" % (len(self._testmsgs), arg))
243
    finally:
244
      logging.debug("Closing socket")
245
      sock.close()
246

  
247

  
248
def TestJobqueue(opts, _):
249
  """Runs a few tests on the job queue.
250

  
251
  """
252
  test_messages = [
253
    "Hello World",
254
    "A",
255
    "",
256
    "B"
257
    "Foo|bar|baz",
258
    utils.TimestampForFilename(),
259
    ]
260

  
261
  for fail in [False, True]:
262
    if fail:
263
      ToStdout("Testing job failure")
264
    else:
265
      ToStdout("Testing job success")
266

  
267
    op = opcodes.OpTestJobqueue(notify_waitlock=True,
268
                                notify_exec=True,
269
                                log_messages=test_messages,
270
                                fail=fail)
271

  
272
    reporter = _JobQueueTestReporter()
273
    try:
274
      SubmitOpCode(op, reporter=reporter, opts=opts)
275
    except errors.OpExecError, err:
276
      if not fail:
277
        raise
278
      # Ignore error
279
    else:
280
      if fail:
281
        raise errors.OpExecError("Job didn't fail when it should")
282

  
283
    # Check received log messages
284
    if reporter.GetTestMessages() != test_messages:
285
      raise errors.OpExecError("Received test messages don't match input"
286
                               " (input %r, received %r)" %
287
                               (test_messages, reporter.GetTestMessages()))
288

  
289
    # Check final status
290
    job_id = reporter.GetJobId()
291

  
292
    jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
293
    if not jobdetails:
294
      raise errors.OpExecError("Can't find job %s" % job_id)
295

  
296
    if fail:
297
      exp_status = constants.JOB_STATUS_ERROR
298
    else:
299
      exp_status = constants.JOB_STATUS_SUCCESS
300

  
301
    final_status = jobdetails[0]
302
    if final_status != exp_status:
303
      raise errors.OpExecError("Final job status is %s, not %s as expected" %
304
                               (final_status, exp_status))
305

  
306
  return 0
307

  
308

  
158 309
commands = {
159 310
  'delay': (
160 311
    Delay, [ArgUnknown(min=1, max=1)],
......
206 357
                help="Comma separated list of tags"),
207 358
     ],
208 359
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
360
  "test-jobqueue": (
361
    TestJobqueue, ARGS_NONE, [],
362
    "", "Test a few aspects of the job queue")
209 363
  }
210 364

  
211 365

  

Also available in: Unified diff