4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Debugging commands"""
23 # pylint: disable-msg=W0401,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0614: Unused import %s from wildcard import (since we need cli)
26 # C0103: Invalid name gnt-backup
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
42 #: Default fields for L{ListLocks}
43 _LIST_LOCKS_DEF_FIELDS = [
51 def Delay(opts, args):
54 @param opts: the command line options selected by the user
56 @param args: should contain only one element, the duration
59 @return: the desired exit code
62 delay = float(args[0])
63 op = opcodes.OpTestDelay(duration=delay,
64 on_master=opts.on_master,
65 on_nodes=opts.on_nodes,
67 SubmitOpCode(op, opts=opts)
72 def GenericOpCodes(opts, args):
73 """Send any opcode to the master.
75 @param opts: the command line options selected by the user
77 @param args: should contain only one element, the path of
78 the file with the opcode definition
80 @return: the desired exit code
84 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
89 ToStdout("Loading...")
90 for job_idx in range(opts.rep_job):
92 # pylint: disable-msg=W0142
93 op_data = simplejson.loads(utils.ReadFile(fname))
94 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95 op_list = op_list * opts.rep_op
96 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97 op_cnt += len(op_list)
100 if opts.timing_stats:
102 ToStdout("Submitting...")
104 jex.SubmitPending(each=opts.each)
106 if opts.timing_stats:
108 ToStdout("Executing...")
111 if opts.timing_stats:
113 ToStdout("C:op %4d" % op_cnt)
114 ToStdout("C:job %4d" % job_cnt)
115 ToStdout("T:submit %4.4f" % (t2-t1))
116 ToStdout("T:exec %4.4f" % (t3-t2))
117 ToStdout("T:total %4.4f" % (t3-t1))
121 def TestAllocator(opts, args):
122 """Runs the test allocator opcode.
124 @param opts: the command line options selected by the user
126 @param args: should contain only one element, the iallocator name
128 @return: the desired exit code
133 constants.IDISK_SIZE: utils.ParseUnit(val),
134 constants.IDISK_MODE: constants.DISK_RDWR,
135 } for val in opts.disks.split(",")]
136 except errors.UnitParseError, err:
137 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
140 nics = [val.split("/") for val in opts.nics.split(",")]
148 constants.INIC_MAC: v[0],
149 constants.INIC_IP: v[1],
150 # The iallocator interface defines a "bridge" item
154 if opts.tags is None:
157 opts.tags = opts.tags.split(",")
159 op = opcodes.OpTestAllocator(mode=opts.mode,
165 disk_template=opts.disk_template,
170 direction=opts.direction,
171 allocator=opts.iallocator,
172 reloc_mode=opts.reloc_mode,
173 target_groups=opts.target_groups)
174 result = SubmitOpCode(op, opts=opts)
175 ToStdout("%s" % result)
179 def _TestJobSubmission(opts):
180 """Tests submitting jobs.
183 ToStdout("Testing job submission")
186 (0, 0, constants.OP_PRIO_LOWEST),
187 (0, 0, constants.OP_PRIO_HIGHEST),
190 for priority in (constants.OP_PRIO_SUBMIT_VALID |
191 frozenset([constants.OP_PRIO_LOWEST,
192 constants.OP_PRIO_HIGHEST])):
193 for offset in [-1, +1]:
195 (0, 0, priority + offset),
196 (3, 0, priority + offset),
197 (0, 3, priority + offset),
198 (4, 2, priority + offset),
203 for before, after, failpriority in testdata:
205 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
206 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
207 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
211 except errors.GenericError, err:
213 ToStdout("Ignoring error: %s", err)
215 raise errors.OpExecError("Submitting opcode with priority %s did not"
216 " fail when it should (allowed are %s)" %
217 (failpriority, constants.OP_PRIO_SUBMIT_VALID))
220 [opcodes.OpTestDelay(duration=0),
221 opcodes.OpTestDelay(duration=0, dry_run=False),
222 opcodes.OpTestDelay(duration=0, dry_run=True)],
225 result = cl.SubmitManyJobs(jobs)
226 if not (len(result) == 2 and
227 compat.all(len(i) == 2 for i in result) and
228 compat.all(isinstance(i[1], basestring) for i in result) and
229 result[0][0] and not result[1][0]):
230 raise errors.OpExecError("Submitting multiple jobs did not work as"
231 " expected, result %s" % result)
232 assert len(result) == 2
234 ToStdout("Job submission tests were successful")
237 class _JobQueueTestReporter(cli.StdioJobPollReportCb):
239 """Initializes this class.
242 cli.StdioJobPollReportCb.__init__(self)
243 self._expected_msgcount = 0
244 self._all_testmsgs = []
245 self._testmsgs = None
248 def GetTestMessages(self):
249 """Returns all test log messages received so far.
252 return self._all_testmsgs
255 """Returns the job ID.
260 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
261 """Handles a log message.
264 if self._job_id is None:
265 self._job_id = job_id
266 elif self._job_id != job_id:
267 raise errors.ProgrammerError("The same reporter instance was used for"
268 " more than one job")
270 if log_type == constants.ELOG_JQUEUE_TEST:
271 (sockname, test, arg) = log_msg
272 return self._ProcessTestMessage(job_id, sockname, test, arg)
274 elif (log_type == constants.ELOG_MESSAGE and
275 log_msg.startswith(constants.JQT_MSGPREFIX)):
276 if self._testmsgs is None:
277 raise errors.OpExecError("Received test message without a preceding"
279 testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
280 self._testmsgs.append(testmsg)
281 self._all_testmsgs.append(testmsg)
284 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
288 def _ProcessTestMessage(self, job_id, sockname, test, arg):
289 """Handles a job queue test message.
292 if test not in constants.JQT_ALL:
293 raise errors.OpExecError("Received invalid test message %s" % test)
295 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
297 sock.settimeout(30.0)
299 logging.debug("Connecting to %s", sockname)
300 sock.connect(sockname)
302 logging.debug("Checking status")
303 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
305 raise errors.OpExecError("Can't find job %s" % job_id)
307 status = jobdetails[0]
309 logging.debug("Status of job %s is %s", job_id, status)
311 if test == constants.JQT_EXPANDNAMES:
312 if status != constants.JOB_STATUS_WAITLOCK:
313 raise errors.OpExecError("Job status while expanding names is '%s',"
314 " not '%s' as expected" %
315 (status, constants.JOB_STATUS_WAITLOCK))
316 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
317 if status != constants.JOB_STATUS_RUNNING:
318 raise errors.OpExecError("Job status while executing opcode is '%s',"
319 " not '%s' as expected" %
320 (status, constants.JOB_STATUS_RUNNING))
322 if test == constants.JQT_STARTMSG:
323 logging.debug("Expecting %s test messages", arg)
325 elif test == constants.JQT_LOGMSG:
326 if len(self._testmsgs) != arg:
327 raise errors.OpExecError("Received %s test messages when %s are"
328 " expected" % (len(self._testmsgs), arg))
330 logging.debug("Closing socket")
334 def TestJobqueue(opts, _):
335 """Runs a few tests on the job queue.
338 _TestJobSubmission(opts)
343 TM_PARTFAIL) = range(4)
344 TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
348 "Testing mode %s" % mode,
354 utils.TimestampForFilename(),
357 fail = mode in (TM_FAIL, TM_PARTFAIL)
359 if mode == TM_PARTFAIL:
360 ToStdout("Testing partial job failure")
362 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
363 log_messages=test_messages, fail=False),
364 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
365 log_messages=test_messages, fail=False),
366 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
367 log_messages=test_messages, fail=True),
368 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
369 log_messages=test_messages, fail=False),
371 expect_messages = 3 * [test_messages]
373 constants.OP_STATUS_SUCCESS,
374 constants.OP_STATUS_SUCCESS,
375 constants.OP_STATUS_ERROR,
376 constants.OP_STATUS_ERROR,
379 elif mode == TM_MULTISUCCESS:
380 ToStdout("Testing multiple successful opcodes")
382 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
383 log_messages=test_messages, fail=False),
384 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
385 log_messages=test_messages, fail=False),
387 expect_messages = 2 * [test_messages]
389 constants.OP_STATUS_SUCCESS,
390 constants.OP_STATUS_SUCCESS,
394 if mode == TM_SUCCESS:
395 ToStdout("Testing job success")
396 expect_opstatus = [constants.OP_STATUS_SUCCESS]
397 elif mode == TM_FAIL:
398 ToStdout("Testing job failure")
399 expect_opstatus = [constants.OP_STATUS_ERROR]
401 raise errors.ProgrammerError("Unknown test mode %s" % mode)
404 opcodes.OpTestJqueue(notify_waitlock=True,
406 log_messages=test_messages,
409 expect_messages = [test_messages]
413 cli.SetGenericOpcodeOpts(ops, opts)
415 # Send job to master daemon
416 job_id = cli.SendJob(ops, cl=cl)
418 reporter = _JobQueueTestReporter()
422 results = cli.PollJob(job_id, cl=cl, reporter=reporter)
423 except errors.OpExecError, err:
426 ToStdout("Ignoring error: %s", err)
429 raise errors.OpExecError("Job didn't fail when it should")
431 # Check length of result
433 if results is not None:
434 raise errors.OpExecError("Received result from failed job")
435 elif len(results) != expect_resultlen:
436 raise errors.OpExecError("Received %s results (%s), expected %s" %
437 (len(results), results, expect_resultlen))
439 # Check received log messages
440 all_messages = [i for j in expect_messages for i in j]
441 if reporter.GetTestMessages() != all_messages:
442 raise errors.OpExecError("Received test messages don't match input"
443 " (input %r, received %r)" %
444 (all_messages, reporter.GetTestMessages()))
447 reported_job_id = reporter.GetJobId()
448 if reported_job_id != job_id:
449 raise errors.OpExecError("Reported job ID %s doesn't match"
450 "submission job ID %s" %
451 (reported_job_id, job_id))
453 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
455 raise errors.OpExecError("Can't find job %s" % job_id)
458 exp_status = constants.JOB_STATUS_ERROR
460 exp_status = constants.JOB_STATUS_SUCCESS
462 (final_status, final_opstatus) = jobdetails
463 if final_status != exp_status:
464 raise errors.OpExecError("Final job status is %s, not %s as expected" %
465 (final_status, exp_status))
466 if len(final_opstatus) != len(ops):
467 raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
469 (len(final_opstatus), len(ops)))
470 if final_opstatus != expect_opstatus:
471 raise errors.OpExecError("Opcode status is %s, expected %s" %
472 (final_opstatus, expect_opstatus))
474 ToStdout("Job queue test successful")
479 def ListLocks(opts, args): # pylint: disable-msg=W0613
482 @param opts: the command line options selected by the user
484 @param args: should be an empty list
486 @return: the desired exit code
489 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
498 def _FormatPending(value):
499 """Format pending acquires.
502 return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
503 for mode, threads in value)
507 "mode": (_DashIfNone(str), False),
508 "owner": (_DashIfNone(",".join), False),
509 "pending": (_DashIfNone(_FormatPending), False),
513 ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
514 opts.separator, not opts.no_headers,
515 format_override=fmtoverride, verbose=opts.verbose)
517 if ret != constants.EXIT_SUCCESS:
520 if not opts.interval:
524 time.sleep(opts.interval)
531 Delay, [ArgUnknown(min=1, max=1)],
532 [cli_option("--no-master", dest="on_master", default=True,
533 action="store_false", help="Do not sleep in the master code"),
534 cli_option("-n", dest="on_nodes", default=[],
535 action="append", help="Select nodes to sleep on"),
536 cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
537 help="Number of times to repeat the sleep"),
538 DRY_RUN_OPT, PRIORITY_OPT,
540 "[opts...] <duration>", "Executes a TestDelay OpCode"),
542 GenericOpCodes, [ArgFile(min=1)],
544 cli_option("--op-repeat", type="int", default="1", dest="rep_op",
545 help="Repeat the opcode sequence this number of times"),
546 cli_option("--job-repeat", type="int", default="1", dest="rep_job",
547 help="Repeat the job this number of times"),
548 cli_option("--timing-stats", default=False,
549 action="store_true", help="Show timing stats"),
550 cli_option("--each", default=False, action="store_true",
551 help="Submit each job separately"),
552 DRY_RUN_OPT, PRIORITY_OPT,
554 "<op_list_file...>", "Submits jobs built from json files"
555 " containing a list of serialized opcodes"),
557 TestAllocator, [ArgUnknown(min=1)],
558 [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
559 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
560 help="Show allocator input (in) or allocator"
563 cli_option("-m", "--mode", default="relocate",
564 choices=list(constants.VALID_IALLOCATOR_MODES),
565 help=("Request mode (one of %s)" %
566 utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
567 cli_option("--mem", default=128, type="unit",
568 help="Memory size for the instance (MiB)"),
569 cli_option("--disks", default="4096,4096",
570 help="Comma separated list of disk sizes (MiB)"),
572 cli_option("--nics", default="00:11:22:33:44:55",
573 help="Comma separated list of nics, each nic"
574 " definition is of form mac/ip/bridge, if"
575 " missing values are replace by None"),
577 cli_option("-p", "--vcpus", default=1, type="int",
578 help="Select number of VCPUs for the instance"),
579 cli_option("--tags", default=None,
580 help="Comma separated list of tags"),
581 cli_option("--reloc-mode", default=constants.IALLOCATOR_MRELOC_ANY,
582 choices=list(constants.IALLOCATOR_MRELOC_MODES),
583 help=("Instance relocation mode (one of %s)" %
584 utils.CommaJoin(constants.IALLOCATOR_MRELOC_MODES))),
585 cli_option("--target-groups", help="Target groups for relocation"),
586 DRY_RUN_OPT, PRIORITY_OPT,
588 "{opts...} <instance>", "Executes a TestAllocator OpCode"),
590 TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
591 "", "Test a few aspects of the job queue"),
593 ListLocks, ARGS_NONE,
594 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
595 "[--interval N]", "Show a list of locks in the master daemon"),
600 return GenericMain(commands)