Rename OpAssignGroupNodes and LUAssignGroupNodes
[ganeti-local] / lib / client / gnt_debug.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Debugging commands"""
22
23 # pylint: disable-msg=W0401,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0614: Unused import %s from wildcard import (since we need cli)
26 # C0103: Invalid name gnt-backup
27
28 import simplejson
29 import time
30 import socket
31 import logging
32
33 from ganeti.cli import *
34 from ganeti import cli
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import compat
40
41
42 #: Default fields for L{ListLocks}
43 _LIST_LOCKS_DEF_FIELDS = [
44   "name",
45   "mode",
46   "owner",
47   "pending",
48   ]
49
50
51 def Delay(opts, args):
52   """Sleeps for a while
53
54   @param opts: the command line options selected by the user
55   @type args: list
56   @param args: should contain only one element, the duration
57       the sleep
58   @rtype: int
59   @return: the desired exit code
60
61   """
62   delay = float(args[0])
63   op = opcodes.OpTestDelay(duration=delay,
64                            on_master=opts.on_master,
65                            on_nodes=opts.on_nodes,
66                            repeat=opts.repeat)
67   SubmitOpCode(op, opts=opts)
68
69   return 0
70
71
72 def GenericOpCodes(opts, args):
73   """Send any opcode to the master.
74
75   @param opts: the command line options selected by the user
76   @type args: list
77   @param args: should contain only one element, the path of
78       the file with the opcode definition
79   @rtype: int
80   @return: the desired exit code
81
82   """
83   cl = cli.GetClient()
84   jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
85
86   job_cnt = 0
87   op_cnt = 0
88   if opts.timing_stats:
89     ToStdout("Loading...")
90   for job_idx in range(opts.rep_job):
91     for fname in args:
92       # pylint: disable-msg=W0142
93       op_data = simplejson.loads(utils.ReadFile(fname))
94       op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95       op_list = op_list * opts.rep_op
96       jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97       op_cnt += len(op_list)
98       job_cnt += 1
99
100   if opts.timing_stats:
101     t1 = time.time()
102     ToStdout("Submitting...")
103
104   jex.SubmitPending(each=opts.each)
105
106   if opts.timing_stats:
107     t2 = time.time()
108     ToStdout("Executing...")
109
110   jex.GetResults()
111   if opts.timing_stats:
112     t3 = time.time()
113     ToStdout("C:op     %4d" % op_cnt)
114     ToStdout("C:job    %4d" % job_cnt)
115     ToStdout("T:submit %4.4f" % (t2-t1))
116     ToStdout("T:exec   %4.4f" % (t3-t2))
117     ToStdout("T:total  %4.4f" % (t3-t1))
118   return 0
119
120
121 def TestAllocator(opts, args):
122   """Runs the test allocator opcode.
123
124   @param opts: the command line options selected by the user
125   @type args: list
126   @param args: should contain only one element, the iallocator name
127   @rtype: int
128   @return: the desired exit code
129
130   """
131   try:
132     disks = [{"size": utils.ParseUnit(val), "mode": 'w'}
133              for val in opts.disks.split(",")]
134   except errors.UnitParseError, err:
135     ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
136     return 1
137
138   nics = [val.split("/") for val in opts.nics.split(",")]
139   for row in nics:
140     while len(row) < 3:
141       row.append(None)
142     for i in range(3):
143       if row[i] == '':
144         row[i] = None
145   nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics]
146
147   if opts.tags is None:
148     opts.tags = []
149   else:
150     opts.tags = opts.tags.split(",")
151
152   op = opcodes.OpTestAllocator(mode=opts.mode,
153                                name=args[0],
154                                evac_nodes=args,
155                                mem_size=opts.mem,
156                                disks=disks,
157                                disk_template=opts.disk_template,
158                                nics=nic_dict,
159                                os=opts.os,
160                                vcpus=opts.vcpus,
161                                tags=opts.tags,
162                                direction=opts.direction,
163                                allocator=opts.iallocator,
164                                )
165   result = SubmitOpCode(op, opts=opts)
166   ToStdout("%s" % result)
167   return 0
168
169
170 def _TestJobSubmission(opts):
171   """Tests submitting jobs.
172
173   """
174   ToStdout("Testing job submission")
175
176   testdata = [
177     (0, 0, constants.OP_PRIO_LOWEST),
178     (0, 0, constants.OP_PRIO_HIGHEST),
179     ]
180
181   for priority in (constants.OP_PRIO_SUBMIT_VALID |
182                    frozenset([constants.OP_PRIO_LOWEST,
183                               constants.OP_PRIO_HIGHEST])):
184     for offset in [-1, +1]:
185       testdata.extend([
186         (0, 0, priority + offset),
187         (3, 0, priority + offset),
188         (0, 3, priority + offset),
189         (4, 2, priority + offset),
190         ])
191
192   cl = cli.GetClient()
193
194   for before, after, failpriority in testdata:
195     ops = []
196     ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
197     ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
198     ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
199
200     try:
201       cl.SubmitJob(ops)
202     except errors.GenericError, err:
203       if opts.debug:
204         ToStdout("Ignoring error: %s", err)
205     else:
206       raise errors.OpExecError("Submitting opcode with priority %s did not"
207                                " fail when it should (allowed are %s)" %
208                                (failpriority, constants.OP_PRIO_SUBMIT_VALID))
209
210     jobs = [
211       [opcodes.OpTestDelay(duration=0),
212        opcodes.OpTestDelay(duration=0, dry_run=False),
213        opcodes.OpTestDelay(duration=0, dry_run=True)],
214       ops,
215       ]
216     result = cl.SubmitManyJobs(jobs)
217     if not (len(result) == 2 and
218             compat.all(len(i) == 2 for i in result) and
219             compat.all(isinstance(i[1], basestring) for i in result) and
220             result[0][0] and not result[1][0]):
221       raise errors.OpExecError("Submitting multiple jobs did not work as"
222                                " expected, result %s" % result)
223     assert len(result) == 2
224
225   ToStdout("Job submission tests were successful")
226
227
228 class _JobQueueTestReporter(cli.StdioJobPollReportCb):
229   def __init__(self):
230     """Initializes this class.
231
232     """
233     cli.StdioJobPollReportCb.__init__(self)
234     self._expected_msgcount = 0
235     self._all_testmsgs = []
236     self._testmsgs = None
237     self._job_id = None
238
239   def GetTestMessages(self):
240     """Returns all test log messages received so far.
241
242     """
243     return self._all_testmsgs
244
245   def GetJobId(self):
246     """Returns the job ID.
247
248     """
249     return self._job_id
250
251   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
252     """Handles a log message.
253
254     """
255     if self._job_id is None:
256       self._job_id = job_id
257     elif self._job_id != job_id:
258       raise errors.ProgrammerError("The same reporter instance was used for"
259                                    " more than one job")
260
261     if log_type == constants.ELOG_JQUEUE_TEST:
262       (sockname, test, arg) = log_msg
263       return self._ProcessTestMessage(job_id, sockname, test, arg)
264
265     elif (log_type == constants.ELOG_MESSAGE and
266           log_msg.startswith(constants.JQT_MSGPREFIX)):
267       if self._testmsgs is None:
268         raise errors.OpExecError("Received test message without a preceding"
269                                  " start message")
270       testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
271       self._testmsgs.append(testmsg)
272       self._all_testmsgs.append(testmsg)
273       return
274
275     return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
276                                                      timestamp, log_type,
277                                                      log_msg)
278
279   def _ProcessTestMessage(self, job_id, sockname, test, arg):
280     """Handles a job queue test message.
281
282     """
283     if test not in constants.JQT_ALL:
284       raise errors.OpExecError("Received invalid test message %s" % test)
285
286     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
287     try:
288       sock.settimeout(30.0)
289
290       logging.debug("Connecting to %s", sockname)
291       sock.connect(sockname)
292
293       logging.debug("Checking status")
294       jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
295       if not jobdetails:
296         raise errors.OpExecError("Can't find job %s" % job_id)
297
298       status = jobdetails[0]
299
300       logging.debug("Status of job %s is %s", job_id, status)
301
302       if test == constants.JQT_EXPANDNAMES:
303         if status != constants.JOB_STATUS_WAITLOCK:
304           raise errors.OpExecError("Job status while expanding names is '%s',"
305                                    " not '%s' as expected" %
306                                    (status, constants.JOB_STATUS_WAITLOCK))
307       elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
308         if status != constants.JOB_STATUS_RUNNING:
309           raise errors.OpExecError("Job status while executing opcode is '%s',"
310                                    " not '%s' as expected" %
311                                    (status, constants.JOB_STATUS_RUNNING))
312
313       if test == constants.JQT_STARTMSG:
314         logging.debug("Expecting %s test messages", arg)
315         self._testmsgs = []
316       elif test == constants.JQT_LOGMSG:
317         if len(self._testmsgs) != arg:
318           raise errors.OpExecError("Received %s test messages when %s are"
319                                    " expected" % (len(self._testmsgs), arg))
320     finally:
321       logging.debug("Closing socket")
322       sock.close()
323
324
325 def TestJobqueue(opts, _):
326   """Runs a few tests on the job queue.
327
328   """
329   _TestJobSubmission(opts)
330
331   (TM_SUCCESS,
332    TM_MULTISUCCESS,
333    TM_FAIL,
334    TM_PARTFAIL) = range(4)
335   TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
336
337   for mode in TM_ALL:
338     test_messages = [
339       "Testing mode %s" % mode,
340       "Hello World",
341       "A",
342       "",
343       "B"
344       "Foo|bar|baz",
345       utils.TimestampForFilename(),
346       ]
347
348     fail = mode in (TM_FAIL, TM_PARTFAIL)
349
350     if mode == TM_PARTFAIL:
351       ToStdout("Testing partial job failure")
352       ops = [
353         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
354                                log_messages=test_messages, fail=False),
355         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
356                                log_messages=test_messages, fail=False),
357         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
358                                log_messages=test_messages, fail=True),
359         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
360                                log_messages=test_messages, fail=False),
361         ]
362       expect_messages = 3 * [test_messages]
363       expect_opstatus = [
364         constants.OP_STATUS_SUCCESS,
365         constants.OP_STATUS_SUCCESS,
366         constants.OP_STATUS_ERROR,
367         constants.OP_STATUS_ERROR,
368         ]
369       expect_resultlen = 2
370     elif mode == TM_MULTISUCCESS:
371       ToStdout("Testing multiple successful opcodes")
372       ops = [
373         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
374                                log_messages=test_messages, fail=False),
375         opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
376                                log_messages=test_messages, fail=False),
377         ]
378       expect_messages = 2 * [test_messages]
379       expect_opstatus = [
380         constants.OP_STATUS_SUCCESS,
381         constants.OP_STATUS_SUCCESS,
382         ]
383       expect_resultlen = 2
384     else:
385       if mode == TM_SUCCESS:
386         ToStdout("Testing job success")
387         expect_opstatus = [constants.OP_STATUS_SUCCESS]
388       elif mode == TM_FAIL:
389         ToStdout("Testing job failure")
390         expect_opstatus = [constants.OP_STATUS_ERROR]
391       else:
392         raise errors.ProgrammerError("Unknown test mode %s" % mode)
393
394       ops = [
395         opcodes.OpTestJobqueue(notify_waitlock=True,
396                                notify_exec=True,
397                                log_messages=test_messages,
398                                fail=fail)
399         ]
400       expect_messages = [test_messages]
401       expect_resultlen = 1
402
403     cl = cli.GetClient()
404     cli.SetGenericOpcodeOpts(ops, opts)
405
406     # Send job to master daemon
407     job_id = cli.SendJob(ops, cl=cl)
408
409     reporter = _JobQueueTestReporter()
410     results = None
411
412     try:
413       results = cli.PollJob(job_id, cl=cl, reporter=reporter)
414     except errors.OpExecError, err:
415       if not fail:
416         raise
417       ToStdout("Ignoring error: %s", err)
418     else:
419       if fail:
420         raise errors.OpExecError("Job didn't fail when it should")
421
422     # Check length of result
423     if fail:
424       if results is not None:
425         raise errors.OpExecError("Received result from failed job")
426     elif len(results) != expect_resultlen:
427       raise errors.OpExecError("Received %s results (%s), expected %s" %
428                                (len(results), results, expect_resultlen))
429
430     # Check received log messages
431     all_messages = [i for j in expect_messages for i in j]
432     if reporter.GetTestMessages() != all_messages:
433       raise errors.OpExecError("Received test messages don't match input"
434                                " (input %r, received %r)" %
435                                (all_messages, reporter.GetTestMessages()))
436
437     # Check final status
438     reported_job_id = reporter.GetJobId()
439     if reported_job_id != job_id:
440       raise errors.OpExecError("Reported job ID %s doesn't match"
441                                "submission job ID %s" %
442                                (reported_job_id, job_id))
443
444     jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
445     if not jobdetails:
446       raise errors.OpExecError("Can't find job %s" % job_id)
447
448     if fail:
449       exp_status = constants.JOB_STATUS_ERROR
450     else:
451       exp_status = constants.JOB_STATUS_SUCCESS
452
453     (final_status, final_opstatus) = jobdetails
454     if final_status != exp_status:
455       raise errors.OpExecError("Final job status is %s, not %s as expected" %
456                                (final_status, exp_status))
457     if len(final_opstatus) != len(ops):
458       raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
459                                " expected %s)" %
460                                (len(final_opstatus), len(ops)))
461     if final_opstatus != expect_opstatus:
462       raise errors.OpExecError("Opcode status is %s, expected %s" %
463                                (final_opstatus, expect_opstatus))
464
465   ToStdout("Job queue test successful")
466
467   return 0
468
469
470 def ListLocks(opts, args): # pylint: disable-msg=W0613
471   """List all locks.
472
473   @param opts: the command line options selected by the user
474   @type args: list
475   @param args: should be an empty list
476   @rtype: int
477   @return: the desired exit code
478
479   """
480   selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
481
482   def _DashIfNone(fn):
483     def wrapper(value):
484       if not value:
485         return "-"
486       return fn(value)
487     return wrapper
488
489   def _FormatPending(value):
490     """Format pending acquires.
491
492     """
493     return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
494                            for mode, threads in value)
495
496   # Format raw values
497   fmtoverride = {
498     "mode": (_DashIfNone(str), False),
499     "owner": (_DashIfNone(",".join), False),
500     "pending": (_DashIfNone(_FormatPending), False),
501     }
502
503   while True:
504     ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
505                       opts.separator, not opts.no_headers,
506                       format_override=fmtoverride)
507
508     if ret != constants.EXIT_SUCCESS:
509       return ret
510
511     if not opts.interval:
512       break
513
514     ToStdout("")
515     time.sleep(opts.interval)
516
517   return 0
518
519
520 commands = {
521   'delay': (
522     Delay, [ArgUnknown(min=1, max=1)],
523     [cli_option("--no-master", dest="on_master", default=True,
524                 action="store_false", help="Do not sleep in the master code"),
525      cli_option("-n", dest="on_nodes", default=[],
526                 action="append", help="Select nodes to sleep on"),
527      cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
528                 help="Number of times to repeat the sleep"),
529      DRY_RUN_OPT, PRIORITY_OPT,
530      ],
531     "[opts...] <duration>", "Executes a TestDelay OpCode"),
532   'submit-job': (
533     GenericOpCodes, [ArgFile(min=1)],
534     [VERBOSE_OPT,
535      cli_option("--op-repeat", type="int", default="1", dest="rep_op",
536                 help="Repeat the opcode sequence this number of times"),
537      cli_option("--job-repeat", type="int", default="1", dest="rep_job",
538                 help="Repeat the job this number of times"),
539      cli_option("--timing-stats", default=False,
540                 action="store_true", help="Show timing stats"),
541      cli_option("--each", default=False, action="store_true",
542                 help="Submit each job separately"),
543      DRY_RUN_OPT, PRIORITY_OPT,
544      ],
545     "<op_list_file...>", "Submits jobs built from json files"
546     " containing a list of serialized opcodes"),
547   'allocator': (
548     TestAllocator, [ArgUnknown(min=1)],
549     [cli_option("--dir", dest="direction",
550                 default="in", choices=["in", "out"],
551                 help="Show allocator input (in) or allocator"
552                 " results (out)"),
553      IALLOCATOR_OPT,
554      cli_option("-m", "--mode", default="relocate",
555                 choices=["relocate", "allocate", "multi-evacuate"],
556                 help="Request mode, either allocate or relocate"),
557      cli_option("--mem", default=128, type="unit",
558                 help="Memory size for the instance (MiB)"),
559      cli_option("--disks", default="4096,4096",
560                 help="Comma separated list of disk sizes (MiB)"),
561      DISK_TEMPLATE_OPT,
562      cli_option("--nics", default="00:11:22:33:44:55",
563                 help="Comma separated list of nics, each nic"
564                 " definition is of form mac/ip/bridge, if"
565                 " missing values are replace by None"),
566      OS_OPT,
567      cli_option("-p", "--vcpus", default=1, type="int",
568                 help="Select number of VCPUs for the instance"),
569      cli_option("--tags", default=None,
570                 help="Comma separated list of tags"),
571      DRY_RUN_OPT, PRIORITY_OPT,
572      ],
573     "{opts...} <instance>", "Executes a TestAllocator OpCode"),
574   "test-jobqueue": (
575     TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
576     "", "Test a few aspects of the job queue"),
577   "locks": (
578     ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT],
579     "[--interval N]", "Show a list of locks in the master daemon"),
580   }
581
582
583 def Main():
584   return GenericMain(commands)