4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Debugging commands"""
23 # pylint: disable=W0401,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0614: Unused import %s from wildcard import (since we need cli)
26 # C0103: Invalid name gnt-backup
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
43 #: Default fields for L{ListLocks}
44 _LIST_LOCKS_DEF_FIELDS = [
52 def Delay(opts, args):
55 @param opts: the command line options selected by the user
57 @param args: should contain only one element, the duration
60 @return: the desired exit code
63 delay = float(args[0])
64 op = opcodes.OpTestDelay(duration=delay,
65 on_master=opts.on_master,
66 on_nodes=opts.on_nodes,
68 SubmitOrSend(op, opts)
73 def GenericOpCodes(opts, args):
74 """Send any opcode to the master.
76 @param opts: the command line options selected by the user
78 @param args: should contain only one element, the path of
79 the file with the opcode definition
81 @return: the desired exit code
85 jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
90 ToStdout("Loading...")
91 for job_idx in range(opts.rep_job):
93 # pylint: disable=W0142
94 op_data = simplejson.loads(utils.ReadFile(fname))
95 op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
96 op_list = op_list * opts.rep_op
97 jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
98 op_cnt += len(op_list)
101 if opts.timing_stats:
103 ToStdout("Submitting...")
105 jex.SubmitPending(each=opts.each)
107 if opts.timing_stats:
109 ToStdout("Executing...")
112 if opts.timing_stats:
114 ToStdout("C:op %4d" % op_cnt)
115 ToStdout("C:job %4d" % job_cnt)
116 ToStdout("T:submit %4.4f" % (t2 - t1))
117 ToStdout("T:exec %4.4f" % (t3 - t2))
118 ToStdout("T:total %4.4f" % (t3 - t1))
122 def TestAllocator(opts, args):
123 """Runs the test allocator opcode.
125 @param opts: the command line options selected by the user
127 @param args: should contain only one element, the iallocator name
129 @return: the desired exit code
134 constants.IDISK_SIZE: utils.ParseUnit(val),
135 constants.IDISK_MODE: constants.DISK_RDWR,
136 } for val in opts.disks.split(",")]
137 except errors.UnitParseError, err:
138 ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
141 nics = [val.split("/") for val in opts.nics.split(",")]
149 constants.INIC_MAC: v[0],
150 constants.INIC_IP: v[1],
151 # The iallocator interface defines a "bridge" item
155 if opts.tags is None:
158 opts.tags = opts.tags.split(",")
159 if opts.target_groups is None:
162 target_groups = opts.target_groups
164 op = opcodes.OpTestAllocator(mode=opts.mode,
169 disk_template=opts.disk_template,
174 direction=opts.direction,
175 iallocator=opts.iallocator,
176 evac_mode=opts.evac_mode,
177 target_groups=target_groups,
178 spindle_use=opts.spindle_use,
180 result = SubmitOpCode(op, opts=opts)
181 ToStdout("%s" % result)
185 def _TestJobDependency(opts):
186 """Tests job dependencies.
189 ToStdout("Testing job dependencies")
194 SubmitOpCode(opcodes.OpTestDelay(duration=0, depends=[(-1, None)]), cl=cl)
195 except errors.GenericError, err:
197 ToStdout("Ignoring error for 'wrong dependencies' test: %s", err)
199 raise errors.OpExecError("Submitting plain opcode with relative job ID"
200 " did not fail as expected")
202 # TODO: Test dependencies on errors
204 [opcodes.OpTestDelay(duration=1)],
205 [opcodes.OpTestDelay(duration=1,
206 depends=[(-1, [])])],
207 [opcodes.OpTestDelay(duration=1,
208 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
209 [opcodes.OpTestDelay(duration=1,
211 [opcodes.OpTestDelay(duration=1,
212 depends=[(-2, [constants.JOB_STATUS_SUCCESS])])],
215 # Function for checking result
216 check_fn = ht.TListOf(ht.TAnd(ht.TIsLength(2),
218 ht.TOr(ht.TNonEmptyString,
221 result = cl.SubmitManyJobs(jobs)
222 if not check_fn(result):
223 raise errors.OpExecError("Job submission doesn't match %s: %s" %
226 # Wait for jobs to finish
227 jex = JobExecutor(cl=cl, opts=opts)
229 for (status, job_id) in result:
230 jex.AddJobId(None, status, job_id)
232 job_results = jex.GetResults()
233 if not compat.all(row[0] for row in job_results):
234 raise errors.OpExecError("At least one of the submitted jobs failed: %s" %
237 # Get details about jobs
238 data = cl.QueryJobs([job_id for (_, job_id) in result],
239 ["id", "opexec", "ops"])
240 data_job_id = [job_id for (job_id, _, _) in data]
241 data_opexec = [opexec for (_, opexec, _) in data]
242 data_op = [[opcodes.OpCode.LoadOpCode(op) for op in ops]
243 for (_, _, ops) in data]
245 assert compat.all(not op.depends or len(op.depends) == 1
249 # Check resolved job IDs in dependencies
250 for (job_idx, res_jobdep) in [(1, data_job_id[0]),
252 (4, data_job_id[2])]:
253 if data_op[job_idx][0].depends[0][0] != res_jobdep:
254 raise errors.OpExecError("Job %s's opcode doesn't depend on correct job"
255 " ID (%s)" % (job_idx, res_jobdep))
257 # Check execution order
258 if not (data_opexec[0] <= data_opexec[1] and
259 data_opexec[0] <= data_opexec[2] and
260 data_opexec[2] <= data_opexec[4]):
261 raise errors.OpExecError("Jobs did not run in correct order: %s" % data)
263 assert len(jobs) == 5 and compat.all(len(ops) == 1 for ops in jobs)
265 ToStdout("Job dependency tests were successful")
268 def _TestJobSubmission(opts):
269 """Tests submitting jobs.
272 ToStdout("Testing job submission")
275 (0, 0, constants.OP_PRIO_LOWEST),
276 (0, 0, constants.OP_PRIO_HIGHEST),
279 for priority in (constants.OP_PRIO_SUBMIT_VALID |
280 frozenset([constants.OP_PRIO_LOWEST,
281 constants.OP_PRIO_HIGHEST])):
282 for offset in [-1, +1]:
284 (0, 0, priority + offset),
285 (3, 0, priority + offset),
286 (0, 3, priority + offset),
287 (4, 2, priority + offset),
292 for before, after, failpriority in testdata:
294 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
295 ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
296 ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
300 except errors.GenericError, err:
302 ToStdout("Ignoring error for 'wrong priority' test: %s", err)
304 raise errors.OpExecError("Submitting opcode with priority %s did not"
305 " fail when it should (allowed are %s)" %
306 (failpriority, constants.OP_PRIO_SUBMIT_VALID))
309 [opcodes.OpTestDelay(duration=0),
310 opcodes.OpTestDelay(duration=0, dry_run=False),
311 opcodes.OpTestDelay(duration=0, dry_run=True)],
314 result = cl.SubmitManyJobs(jobs)
315 if not (len(result) == 2 and
316 compat.all(len(i) == 2 for i in result) and
317 isinstance(result[0][1], int) and
318 isinstance(result[1][1], basestring) and
319 result[0][0] and not result[1][0]):
320 raise errors.OpExecError("Submitting multiple jobs did not work as"
321 " expected, result %s" % result)
322 assert len(result) == 2
324 ToStdout("Job submission tests were successful")
327 class _JobQueueTestReporter(cli.StdioJobPollReportCb):
329 """Initializes this class.
332 cli.StdioJobPollReportCb.__init__(self)
333 self._expected_msgcount = 0
334 self._all_testmsgs = []
335 self._testmsgs = None
338 def GetTestMessages(self):
339 """Returns all test log messages received so far.
342 return self._all_testmsgs
345 """Returns the job ID.
350 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
351 """Handles a log message.
354 if self._job_id is None:
355 self._job_id = job_id
356 elif self._job_id != job_id:
357 raise errors.ProgrammerError("The same reporter instance was used for"
358 " more than one job")
360 if log_type == constants.ELOG_JQUEUE_TEST:
361 (sockname, test, arg) = log_msg
362 return self._ProcessTestMessage(job_id, sockname, test, arg)
364 elif (log_type == constants.ELOG_MESSAGE and
365 log_msg.startswith(constants.JQT_MSGPREFIX)):
366 if self._testmsgs is None:
367 raise errors.OpExecError("Received test message without a preceding"
369 testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
370 self._testmsgs.append(testmsg)
371 self._all_testmsgs.append(testmsg)
374 return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
378 def _ProcessTestMessage(self, job_id, sockname, test, arg):
379 """Handles a job queue test message.
382 if test not in constants.JQT_ALL:
383 raise errors.OpExecError("Received invalid test message %s" % test)
385 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
387 sock.settimeout(30.0)
389 logging.debug("Connecting to %s", sockname)
390 sock.connect(sockname)
392 logging.debug("Checking status")
393 jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
395 raise errors.OpExecError("Can't find job %s" % job_id)
397 status = jobdetails[0]
399 logging.debug("Status of job %s is %s", job_id, status)
401 if test == constants.JQT_EXPANDNAMES:
402 if status != constants.JOB_STATUS_WAITING:
403 raise errors.OpExecError("Job status while expanding names is '%s',"
404 " not '%s' as expected" %
405 (status, constants.JOB_STATUS_WAITING))
406 elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
407 if status != constants.JOB_STATUS_RUNNING:
408 raise errors.OpExecError("Job status while executing opcode is '%s',"
409 " not '%s' as expected" %
410 (status, constants.JOB_STATUS_RUNNING))
412 if test == constants.JQT_STARTMSG:
413 logging.debug("Expecting %s test messages", arg)
415 elif test == constants.JQT_LOGMSG:
416 if len(self._testmsgs) != arg:
417 raise errors.OpExecError("Received %s test messages when %s are"
418 " expected" % (len(self._testmsgs), arg))
420 logging.debug("Closing socket")
424 def TestJobqueue(opts, _):
425 """Runs a few tests on the job queue.
428 _TestJobSubmission(opts)
429 _TestJobDependency(opts)
434 TM_PARTFAIL) = range(4)
435 TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
439 "Testing mode %s" % mode,
445 utils.TimestampForFilename(),
448 fail = mode in (TM_FAIL, TM_PARTFAIL)
450 if mode == TM_PARTFAIL:
451 ToStdout("Testing partial job failure")
453 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
454 log_messages=test_messages, fail=False),
455 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
456 log_messages=test_messages, fail=False),
457 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
458 log_messages=test_messages, fail=True),
459 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
460 log_messages=test_messages, fail=False),
462 expect_messages = 3 * [test_messages]
464 constants.OP_STATUS_SUCCESS,
465 constants.OP_STATUS_SUCCESS,
466 constants.OP_STATUS_ERROR,
467 constants.OP_STATUS_ERROR,
470 elif mode == TM_MULTISUCCESS:
471 ToStdout("Testing multiple successful opcodes")
473 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
474 log_messages=test_messages, fail=False),
475 opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
476 log_messages=test_messages, fail=False),
478 expect_messages = 2 * [test_messages]
480 constants.OP_STATUS_SUCCESS,
481 constants.OP_STATUS_SUCCESS,
485 if mode == TM_SUCCESS:
486 ToStdout("Testing job success")
487 expect_opstatus = [constants.OP_STATUS_SUCCESS]
488 elif mode == TM_FAIL:
489 ToStdout("Testing job failure")
490 expect_opstatus = [constants.OP_STATUS_ERROR]
492 raise errors.ProgrammerError("Unknown test mode %s" % mode)
495 opcodes.OpTestJqueue(notify_waitlock=True,
497 log_messages=test_messages,
500 expect_messages = [test_messages]
504 cli.SetGenericOpcodeOpts(ops, opts)
506 # Send job to master daemon
507 job_id = cli.SendJob(ops, cl=cl)
509 reporter = _JobQueueTestReporter()
513 results = cli.PollJob(job_id, cl=cl, reporter=reporter)
514 except errors.OpExecError, err:
517 ToStdout("Ignoring error for 'job fail' test: %s", err)
520 raise errors.OpExecError("Job didn't fail when it should")
522 # Check length of result
524 if results is not None:
525 raise errors.OpExecError("Received result from failed job")
526 elif len(results) != expect_resultlen:
527 raise errors.OpExecError("Received %s results (%s), expected %s" %
528 (len(results), results, expect_resultlen))
530 # Check received log messages
531 all_messages = [i for j in expect_messages for i in j]
532 if reporter.GetTestMessages() != all_messages:
533 raise errors.OpExecError("Received test messages don't match input"
534 " (input %r, received %r)" %
535 (all_messages, reporter.GetTestMessages()))
538 reported_job_id = reporter.GetJobId()
539 if reported_job_id != job_id:
540 raise errors.OpExecError("Reported job ID %s doesn't match"
541 "submission job ID %s" %
542 (reported_job_id, job_id))
544 jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
546 raise errors.OpExecError("Can't find job %s" % job_id)
549 exp_status = constants.JOB_STATUS_ERROR
551 exp_status = constants.JOB_STATUS_SUCCESS
553 (final_status, final_opstatus) = jobdetails
554 if final_status != exp_status:
555 raise errors.OpExecError("Final job status is %s, not %s as expected" %
556 (final_status, exp_status))
557 if len(final_opstatus) != len(ops):
558 raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
560 (len(final_opstatus), len(ops)))
561 if final_opstatus != expect_opstatus:
562 raise errors.OpExecError("Opcode status is %s, expected %s" %
563 (final_opstatus, expect_opstatus))
565 ToStdout("Job queue test successful")
570 def ListLocks(opts, args): # pylint: disable=W0613
573 @param opts: the command line options selected by the user
575 @param args: should be an empty list
577 @return: the desired exit code
580 selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
589 def _FormatPending(value):
590 """Format pending acquires.
593 return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
594 for mode, threads in value)
598 "mode": (_DashIfNone(str), False),
599 "owner": (_DashIfNone(",".join), False),
600 "pending": (_DashIfNone(_FormatPending), False),
604 ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
605 opts.separator, not opts.no_headers,
606 format_override=fmtoverride, verbose=opts.verbose)
608 if ret != constants.EXIT_SUCCESS:
611 if not opts.interval:
615 time.sleep(opts.interval)
622 Delay, [ArgUnknown(min=1, max=1)],
623 [cli_option("--no-master", dest="on_master", default=True,
624 action="store_false", help="Do not sleep in the master code"),
625 cli_option("-n", dest="on_nodes", default=[],
626 action="append", help="Select nodes to sleep on"),
627 cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
628 help="Number of times to repeat the sleep"),
629 DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT,
631 "[opts...] <duration>", "Executes a TestDelay OpCode"),
633 GenericOpCodes, [ArgFile(min=1)],
635 cli_option("--op-repeat", type="int", default="1", dest="rep_op",
636 help="Repeat the opcode sequence this number of times"),
637 cli_option("--job-repeat", type="int", default="1", dest="rep_job",
638 help="Repeat the job this number of times"),
639 cli_option("--timing-stats", default=False,
640 action="store_true", help="Show timing stats"),
641 cli_option("--each", default=False, action="store_true",
642 help="Submit each job separately"),
643 DRY_RUN_OPT, PRIORITY_OPT,
645 "<op_list_file...>", "Submits jobs built from json files"
646 " containing a list of serialized opcodes"),
648 TestAllocator, [ArgUnknown(min=1)],
649 [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
650 choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
651 help="Show allocator input (in) or allocator"
654 cli_option("-m", "--mode", default="relocate",
655 choices=list(constants.VALID_IALLOCATOR_MODES),
656 help=("Request mode (one of %s)" %
657 utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
658 cli_option("--memory", default=128, type="unit",
659 help="Memory size for the instance (MiB)"),
660 cli_option("--disks", default="4096,4096",
661 help="Comma separated list of disk sizes (MiB)"),
663 cli_option("--nics", default="00:11:22:33:44:55",
664 help="Comma separated list of nics, each nic"
665 " definition is of form mac/ip/bridge, if"
666 " missing values are replace by None"),
668 cli_option("-p", "--vcpus", default=1, type="int",
669 help="Select number of VCPUs for the instance"),
670 cli_option("--tags", default=None,
671 help="Comma separated list of tags"),
672 cli_option("--evac-mode", default=constants.IALLOCATOR_NEVAC_ALL,
673 choices=list(constants.IALLOCATOR_NEVAC_MODES),
674 help=("Node evacuation mode (one of %s)" %
675 utils.CommaJoin(constants.IALLOCATOR_NEVAC_MODES))),
676 cli_option("--target-groups", help="Target groups for relocation",
677 default=[], action="append"),
678 cli_option("--spindle-use", help="How many spindles to use",
679 default=1, type="int"),
680 cli_option("--count", help="How many instances to allocate",
681 default=2, type="int"),
682 DRY_RUN_OPT, PRIORITY_OPT,
684 "{opts...} <instance>", "Executes a TestAllocator OpCode"),
686 TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
687 "", "Test a few aspects of the job queue"),
689 ListLocks, ARGS_NONE,
690 [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
691 "[--interval N]", "Show a list of locks in the master daemon"),
694 #: dictionary with aliases for commands
696 "allocator": "iallocator",
701 return GenericMain(commands, aliases=aliases)