Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_debug.py @ dd47a0f0

History | View | Annotate | Download (19.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Debugging commands"""
22

    
23
# pylint: disable-msg=W0401,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0614: Unused import %s from wildcard import (since we need cli)
26
# C0103: Invalid name gnt-backup
27

    
28
import simplejson
29
import time
30
import socket
31
import logging
32

    
33
from ganeti.cli import *
34
from ganeti import cli
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import compat
40

    
41

    
42
#: Default fields for L{ListLocks}
43
_LIST_LOCKS_DEF_FIELDS = [
44
  "name",
45
  "mode",
46
  "owner",
47
  "pending",
48
  ]
49

    
50

    
51
def Delay(opts, args):
52
  """Sleeps for a while
53

54
  @param opts: the command line options selected by the user
55
  @type args: list
56
  @param args: should contain only one element, the duration
57
      the sleep
58
  @rtype: int
59
  @return: the desired exit code
60

61
  """
62
  delay = float(args[0])
63
  op = opcodes.OpTestDelay(duration=delay,
64
                           on_master=opts.on_master,
65
                           on_nodes=opts.on_nodes,
66
                           repeat=opts.repeat)
67
  SubmitOpCode(op, opts=opts)
68

    
69
  return 0
70

    
71

    
72
def GenericOpCodes(opts, args):
73
  """Send any opcode to the master.
74

75
  @param opts: the command line options selected by the user
76
  @type args: list
77
  @param args: should contain only one element, the path of
78
      the file with the opcode definition
79
  @rtype: int
80
  @return: the desired exit code
81

82
  """
83
  cl = cli.GetClient()
84
  jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
85

    
86
  job_cnt = 0
87
  op_cnt = 0
88
  if opts.timing_stats:
89
    ToStdout("Loading...")
90
  for job_idx in range(opts.rep_job):
91
    for fname in args:
92
      # pylint: disable-msg=W0142
93
      op_data = simplejson.loads(utils.ReadFile(fname))
94
      op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95
      op_list = op_list * opts.rep_op
96
      jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97
      op_cnt += len(op_list)
98
      job_cnt += 1
99

    
100
  if opts.timing_stats:
101
    t1 = time.time()
102
    ToStdout("Submitting...")
103

    
104
  jex.SubmitPending(each=opts.each)
105

    
106
  if opts.timing_stats:
107
    t2 = time.time()
108
    ToStdout("Executing...")
109

    
110
  jex.GetResults()
111
  if opts.timing_stats:
112
    t3 = time.time()
113
    ToStdout("C:op     %4d" % op_cnt)
114
    ToStdout("C:job    %4d" % job_cnt)
115
    ToStdout("T:submit %4.4f" % (t2-t1))
116
    ToStdout("T:exec   %4.4f" % (t3-t2))
117
    ToStdout("T:total  %4.4f" % (t3-t1))
118
  return 0
119

    
120

    
121
def TestAllocator(opts, args):
122
  """Runs the test allocator opcode.
123

124
  @param opts: the command line options selected by the user
125
  @type args: list
126
  @param args: should contain only one element, the iallocator name
127
  @rtype: int
128
  @return: the desired exit code
129

130
  """
131
  try:
132
    disks = [{
133
      constants.IDISK_SIZE: utils.ParseUnit(val),
134
      constants.IDISK_MODE: constants.DISK_RDWR,
135
      } for val in opts.disks.split(",")]
136
  except errors.UnitParseError, err:
137
    ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
138
    return 1
139

    
140
  nics = [val.split("/") for val in opts.nics.split(",")]
141
  for row in nics:
142
    while len(row) < 3:
143
      row.append(None)
144
    for i in range(3):
145
      if row[i] == '':
146
        row[i] = None
147
  nic_dict = [{
148
    constants.INIC_MAC: v[0],
149
    constants.INIC_IP: v[1],
150
    # The iallocator interface defines a "bridge" item
151
    "bridge": v[2],
152
    } for v in nics]
153

    
154
  if opts.tags is None:
155
    opts.tags = []
156
  else:
157
    opts.tags = opts.tags.split(",")
158
  if opts.target_groups is None:
159
    target_groups = []
160
  else:
161
    target_groups = opts.target_groups
162

    
163
  op = opcodes.OpTestAllocator(mode=opts.mode,
164
                               name=args[0],
165
                               evac_nodes=args,
166
                               instances=args,
167
                               memory=opts.memory,
168
                               disks=disks,
169
                               disk_template=opts.disk_template,
170
                               nics=nic_dict,
171
                               os=opts.os,
172
                               vcpus=opts.vcpus,
173
                               tags=opts.tags,
174
                               direction=opts.direction,
175
                               allocator=opts.iallocator,
176
                               reloc_mode=opts.reloc_mode,
177
                               target_groups=target_groups)
178
  result = SubmitOpCode(op, opts=opts)
179
  ToStdout("%s" % result)
180
  return 0
181

    
182

    
183
def _TestJobSubmission(opts):
184
  """Tests submitting jobs.
185

186
  """
187
  ToStdout("Testing job submission")
188

    
189
  testdata = [
190
    (0, 0, constants.OP_PRIO_LOWEST),
191
    (0, 0, constants.OP_PRIO_HIGHEST),
192
    ]
193

    
194
  for priority in (constants.OP_PRIO_SUBMIT_VALID |
195
                   frozenset([constants.OP_PRIO_LOWEST,
196
                              constants.OP_PRIO_HIGHEST])):
197
    for offset in [-1, +1]:
198
      testdata.extend([
199
        (0, 0, priority + offset),
200
        (3, 0, priority + offset),
201
        (0, 3, priority + offset),
202
        (4, 2, priority + offset),
203
        ])
204

    
205
  cl = cli.GetClient()
206

    
207
  for before, after, failpriority in testdata:
208
    ops = []
209
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
210
    ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
211
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
212

    
213
    try:
214
      cl.SubmitJob(ops)
215
    except errors.GenericError, err:
216
      if opts.debug:
217
        ToStdout("Ignoring error: %s", err)
218
    else:
219
      raise errors.OpExecError("Submitting opcode with priority %s did not"
220
                               " fail when it should (allowed are %s)" %
221
                               (failpriority, constants.OP_PRIO_SUBMIT_VALID))
222

    
223
    jobs = [
224
      [opcodes.OpTestDelay(duration=0),
225
       opcodes.OpTestDelay(duration=0, dry_run=False),
226
       opcodes.OpTestDelay(duration=0, dry_run=True)],
227
      ops,
228
      ]
229
    result = cl.SubmitManyJobs(jobs)
230
    if not (len(result) == 2 and
231
            compat.all(len(i) == 2 for i in result) and
232
            compat.all(isinstance(i[1], basestring) for i in result) and
233
            result[0][0] and not result[1][0]):
234
      raise errors.OpExecError("Submitting multiple jobs did not work as"
235
                               " expected, result %s" % result)
236
    assert len(result) == 2
237

    
238
  ToStdout("Job submission tests were successful")
239

    
240

    
241
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
242
  def __init__(self):
243
    """Initializes this class.
244

245
    """
246
    cli.StdioJobPollReportCb.__init__(self)
247
    self._expected_msgcount = 0
248
    self._all_testmsgs = []
249
    self._testmsgs = None
250
    self._job_id = None
251

    
252
  def GetTestMessages(self):
253
    """Returns all test log messages received so far.
254

255
    """
256
    return self._all_testmsgs
257

    
258
  def GetJobId(self):
259
    """Returns the job ID.
260

261
    """
262
    return self._job_id
263

    
264
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
265
    """Handles a log message.
266

267
    """
268
    if self._job_id is None:
269
      self._job_id = job_id
270
    elif self._job_id != job_id:
271
      raise errors.ProgrammerError("The same reporter instance was used for"
272
                                   " more than one job")
273

    
274
    if log_type == constants.ELOG_JQUEUE_TEST:
275
      (sockname, test, arg) = log_msg
276
      return self._ProcessTestMessage(job_id, sockname, test, arg)
277

    
278
    elif (log_type == constants.ELOG_MESSAGE and
279
          log_msg.startswith(constants.JQT_MSGPREFIX)):
280
      if self._testmsgs is None:
281
        raise errors.OpExecError("Received test message without a preceding"
282
                                 " start message")
283
      testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
284
      self._testmsgs.append(testmsg)
285
      self._all_testmsgs.append(testmsg)
286
      return
287

    
288
    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
289
                                                     timestamp, log_type,
290
                                                     log_msg)
291

    
292
  def _ProcessTestMessage(self, job_id, sockname, test, arg):
293
    """Handles a job queue test message.
294

295
    """
296
    if test not in constants.JQT_ALL:
297
      raise errors.OpExecError("Received invalid test message %s" % test)
298

    
299
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
300
    try:
301
      sock.settimeout(30.0)
302

    
303
      logging.debug("Connecting to %s", sockname)
304
      sock.connect(sockname)
305

    
306
      logging.debug("Checking status")
307
      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
308
      if not jobdetails:
309
        raise errors.OpExecError("Can't find job %s" % job_id)
310

    
311
      status = jobdetails[0]
312

    
313
      logging.debug("Status of job %s is %s", job_id, status)
314

    
315
      if test == constants.JQT_EXPANDNAMES:
316
        if status != constants.JOB_STATUS_WAITLOCK:
317
          raise errors.OpExecError("Job status while expanding names is '%s',"
318
                                   " not '%s' as expected" %
319
                                   (status, constants.JOB_STATUS_WAITLOCK))
320
      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
321
        if status != constants.JOB_STATUS_RUNNING:
322
          raise errors.OpExecError("Job status while executing opcode is '%s',"
323
                                   " not '%s' as expected" %
324
                                   (status, constants.JOB_STATUS_RUNNING))
325

    
326
      if test == constants.JQT_STARTMSG:
327
        logging.debug("Expecting %s test messages", arg)
328
        self._testmsgs = []
329
      elif test == constants.JQT_LOGMSG:
330
        if len(self._testmsgs) != arg:
331
          raise errors.OpExecError("Received %s test messages when %s are"
332
                                   " expected" % (len(self._testmsgs), arg))
333
    finally:
334
      logging.debug("Closing socket")
335
      sock.close()
336

    
337

    
338
def TestJobqueue(opts, _):
339
  """Runs a few tests on the job queue.
340

341
  """
342
  _TestJobSubmission(opts)
343

    
344
  (TM_SUCCESS,
345
   TM_MULTISUCCESS,
346
   TM_FAIL,
347
   TM_PARTFAIL) = range(4)
348
  TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
349

    
350
  for mode in TM_ALL:
351
    test_messages = [
352
      "Testing mode %s" % mode,
353
      "Hello World",
354
      "A",
355
      "",
356
      "B"
357
      "Foo|bar|baz",
358
      utils.TimestampForFilename(),
359
      ]
360

    
361
    fail = mode in (TM_FAIL, TM_PARTFAIL)
362

    
363
    if mode == TM_PARTFAIL:
364
      ToStdout("Testing partial job failure")
365
      ops = [
366
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
367
                             log_messages=test_messages, fail=False),
368
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
369
                             log_messages=test_messages, fail=False),
370
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
371
                             log_messages=test_messages, fail=True),
372
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
373
                             log_messages=test_messages, fail=False),
374
        ]
375
      expect_messages = 3 * [test_messages]
376
      expect_opstatus = [
377
        constants.OP_STATUS_SUCCESS,
378
        constants.OP_STATUS_SUCCESS,
379
        constants.OP_STATUS_ERROR,
380
        constants.OP_STATUS_ERROR,
381
        ]
382
      expect_resultlen = 2
383
    elif mode == TM_MULTISUCCESS:
384
      ToStdout("Testing multiple successful opcodes")
385
      ops = [
386
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
387
                             log_messages=test_messages, fail=False),
388
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
389
                             log_messages=test_messages, fail=False),
390
        ]
391
      expect_messages = 2 * [test_messages]
392
      expect_opstatus = [
393
        constants.OP_STATUS_SUCCESS,
394
        constants.OP_STATUS_SUCCESS,
395
        ]
396
      expect_resultlen = 2
397
    else:
398
      if mode == TM_SUCCESS:
399
        ToStdout("Testing job success")
400
        expect_opstatus = [constants.OP_STATUS_SUCCESS]
401
      elif mode == TM_FAIL:
402
        ToStdout("Testing job failure")
403
        expect_opstatus = [constants.OP_STATUS_ERROR]
404
      else:
405
        raise errors.ProgrammerError("Unknown test mode %s" % mode)
406

    
407
      ops = [
408
        opcodes.OpTestJqueue(notify_waitlock=True,
409
                             notify_exec=True,
410
                             log_messages=test_messages,
411
                             fail=fail)
412
        ]
413
      expect_messages = [test_messages]
414
      expect_resultlen = 1
415

    
416
    cl = cli.GetClient()
417
    cli.SetGenericOpcodeOpts(ops, opts)
418

    
419
    # Send job to master daemon
420
    job_id = cli.SendJob(ops, cl=cl)
421

    
422
    reporter = _JobQueueTestReporter()
423
    results = None
424

    
425
    try:
426
      results = cli.PollJob(job_id, cl=cl, reporter=reporter)
427
    except errors.OpExecError, err:
428
      if not fail:
429
        raise
430
      ToStdout("Ignoring error: %s", err)
431
    else:
432
      if fail:
433
        raise errors.OpExecError("Job didn't fail when it should")
434

    
435
    # Check length of result
436
    if fail:
437
      if results is not None:
438
        raise errors.OpExecError("Received result from failed job")
439
    elif len(results) != expect_resultlen:
440
      raise errors.OpExecError("Received %s results (%s), expected %s" %
441
                               (len(results), results, expect_resultlen))
442

    
443
    # Check received log messages
444
    all_messages = [i for j in expect_messages for i in j]
445
    if reporter.GetTestMessages() != all_messages:
446
      raise errors.OpExecError("Received test messages don't match input"
447
                               " (input %r, received %r)" %
448
                               (all_messages, reporter.GetTestMessages()))
449

    
450
    # Check final status
451
    reported_job_id = reporter.GetJobId()
452
    if reported_job_id != job_id:
453
      raise errors.OpExecError("Reported job ID %s doesn't match"
454
                               "submission job ID %s" %
455
                               (reported_job_id, job_id))
456

    
457
    jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
458
    if not jobdetails:
459
      raise errors.OpExecError("Can't find job %s" % job_id)
460

    
461
    if fail:
462
      exp_status = constants.JOB_STATUS_ERROR
463
    else:
464
      exp_status = constants.JOB_STATUS_SUCCESS
465

    
466
    (final_status, final_opstatus) = jobdetails
467
    if final_status != exp_status:
468
      raise errors.OpExecError("Final job status is %s, not %s as expected" %
469
                               (final_status, exp_status))
470
    if len(final_opstatus) != len(ops):
471
      raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
472
                               " expected %s)" %
473
                               (len(final_opstatus), len(ops)))
474
    if final_opstatus != expect_opstatus:
475
      raise errors.OpExecError("Opcode status is %s, expected %s" %
476
                               (final_opstatus, expect_opstatus))
477

    
478
  ToStdout("Job queue test successful")
479

    
480
  return 0
481

    
482

    
483
def ListLocks(opts, args): # pylint: disable-msg=W0613
484
  """List all locks.
485

486
  @param opts: the command line options selected by the user
487
  @type args: list
488
  @param args: should be an empty list
489
  @rtype: int
490
  @return: the desired exit code
491

492
  """
493
  selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
494

    
495
  def _DashIfNone(fn):
496
    def wrapper(value):
497
      if not value:
498
        return "-"
499
      return fn(value)
500
    return wrapper
501

    
502
  def _FormatPending(value):
503
    """Format pending acquires.
504

505
    """
506
    return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
507
                           for mode, threads in value)
508

    
509
  # Format raw values
510
  fmtoverride = {
511
    "mode": (_DashIfNone(str), False),
512
    "owner": (_DashIfNone(",".join), False),
513
    "pending": (_DashIfNone(_FormatPending), False),
514
    }
515

    
516
  while True:
517
    ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
518
                      opts.separator, not opts.no_headers,
519
                      format_override=fmtoverride, verbose=opts.verbose)
520

    
521
    if ret != constants.EXIT_SUCCESS:
522
      return ret
523

    
524
    if not opts.interval:
525
      break
526

    
527
    ToStdout("")
528
    time.sleep(opts.interval)
529

    
530
  return 0
531

    
532

    
533
commands = {
534
  'delay': (
535
    Delay, [ArgUnknown(min=1, max=1)],
536
    [cli_option("--no-master", dest="on_master", default=True,
537
                action="store_false", help="Do not sleep in the master code"),
538
     cli_option("-n", dest="on_nodes", default=[],
539
                action="append", help="Select nodes to sleep on"),
540
     cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
541
                help="Number of times to repeat the sleep"),
542
     DRY_RUN_OPT, PRIORITY_OPT,
543
     ],
544
    "[opts...] <duration>", "Executes a TestDelay OpCode"),
545
  'submit-job': (
546
    GenericOpCodes, [ArgFile(min=1)],
547
    [VERBOSE_OPT,
548
     cli_option("--op-repeat", type="int", default="1", dest="rep_op",
549
                help="Repeat the opcode sequence this number of times"),
550
     cli_option("--job-repeat", type="int", default="1", dest="rep_job",
551
                help="Repeat the job this number of times"),
552
     cli_option("--timing-stats", default=False,
553
                action="store_true", help="Show timing stats"),
554
     cli_option("--each", default=False, action="store_true",
555
                help="Submit each job separately"),
556
     DRY_RUN_OPT, PRIORITY_OPT,
557
     ],
558
    "<op_list_file...>", "Submits jobs built from json files"
559
    " containing a list of serialized opcodes"),
560
  'iallocator': (
561
    TestAllocator, [ArgUnknown(min=1)],
562
    [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
563
                choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
564
                help="Show allocator input (in) or allocator"
565
                " results (out)"),
566
     IALLOCATOR_OPT,
567
     cli_option("-m", "--mode", default="relocate",
568
                choices=list(constants.VALID_IALLOCATOR_MODES),
569
                help=("Request mode (one of %s)" %
570
                      utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
571
     cli_option("--memory", default=128, type="unit",
572
                help="Memory size for the instance (MiB)"),
573
     cli_option("--disks", default="4096,4096",
574
                help="Comma separated list of disk sizes (MiB)"),
575
     DISK_TEMPLATE_OPT,
576
     cli_option("--nics", default="00:11:22:33:44:55",
577
                help="Comma separated list of nics, each nic"
578
                " definition is of form mac/ip/bridge, if"
579
                " missing values are replace by None"),
580
     OS_OPT,
581
     cli_option("-p", "--vcpus", default=1, type="int",
582
                help="Select number of VCPUs for the instance"),
583
     cli_option("--tags", default=None,
584
                help="Comma separated list of tags"),
585
     cli_option("--reloc-mode", default=constants.IALLOCATOR_MRELOC_ANY,
586
                choices=list(constants.IALLOCATOR_MRELOC_MODES),
587
                help=("Instance relocation mode (one of %s)" %
588
                      utils.CommaJoin(constants.IALLOCATOR_MRELOC_MODES))),
589
     cli_option("--target-groups", help="Target groups for relocation"),
590
     DRY_RUN_OPT, PRIORITY_OPT,
591
     ],
592
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
593
  "test-jobqueue": (
594
    TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
595
    "", "Test a few aspects of the job queue"),
596
  "locks": (
597
    ListLocks, ARGS_NONE,
598
    [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
599
    "[--interval N]", "Show a list of locks in the master daemon"),
600
  }
601

    
602
#: dictionary with aliases for commands
603
aliases = {
604
  "allocator": "iallocator",
605
  }
606

    
607
def Main():
608
  return GenericMain(commands, aliases=aliases)