Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_debug.py @ 42c161cf

History | View | Annotate | Download (19.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Debugging commands"""
22

    
23
# pylint: disable-msg=W0401,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0614: Unused import %s from wildcard import (since we need cli)
26
# C0103: Invalid name gnt-backup
27

    
28
import simplejson
29
import time
30
import socket
31
import logging
32

    
33
from ganeti.cli import *
34
from ganeti import cli
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import compat
40

    
41

    
42
#: Default fields for L{ListLocks}
43
_LIST_LOCKS_DEF_FIELDS = [
44
  "name",
45
  "mode",
46
  "owner",
47
  "pending",
48
  ]
49

    
50

    
51
def Delay(opts, args):
52
  """Sleeps for a while
53

54
  @param opts: the command line options selected by the user
55
  @type args: list
56
  @param args: should contain only one element, the duration
57
      the sleep
58
  @rtype: int
59
  @return: the desired exit code
60

61
  """
62
  delay = float(args[0])
63
  op = opcodes.OpTestDelay(duration=delay,
64
                           on_master=opts.on_master,
65
                           on_nodes=opts.on_nodes,
66
                           repeat=opts.repeat)
67
  SubmitOpCode(op, opts=opts)
68

    
69
  return 0
70

    
71

    
72
def GenericOpCodes(opts, args):
73
  """Send any opcode to the master.
74

75
  @param opts: the command line options selected by the user
76
  @type args: list
77
  @param args: should contain only one element, the path of
78
      the file with the opcode definition
79
  @rtype: int
80
  @return: the desired exit code
81

82
  """
83
  cl = cli.GetClient()
84
  jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
85

    
86
  job_cnt = 0
87
  op_cnt = 0
88
  if opts.timing_stats:
89
    ToStdout("Loading...")
90
  for job_idx in range(opts.rep_job):
91
    for fname in args:
92
      # pylint: disable-msg=W0142
93
      op_data = simplejson.loads(utils.ReadFile(fname))
94
      op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
95
      op_list = op_list * opts.rep_op
96
      jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
97
      op_cnt += len(op_list)
98
      job_cnt += 1
99

    
100
  if opts.timing_stats:
101
    t1 = time.time()
102
    ToStdout("Submitting...")
103

    
104
  jex.SubmitPending(each=opts.each)
105

    
106
  if opts.timing_stats:
107
    t2 = time.time()
108
    ToStdout("Executing...")
109

    
110
  jex.GetResults()
111
  if opts.timing_stats:
112
    t3 = time.time()
113
    ToStdout("C:op     %4d" % op_cnt)
114
    ToStdout("C:job    %4d" % job_cnt)
115
    ToStdout("T:submit %4.4f" % (t2-t1))
116
    ToStdout("T:exec   %4.4f" % (t3-t2))
117
    ToStdout("T:total  %4.4f" % (t3-t1))
118
  return 0
119

    
120

    
121
def TestAllocator(opts, args):
122
  """Runs the test allocator opcode.
123

124
  @param opts: the command line options selected by the user
125
  @type args: list
126
  @param args: should contain only one element, the iallocator name
127
  @rtype: int
128
  @return: the desired exit code
129

130
  """
131
  try:
132
    disks = [{
133
      constants.IDISK_SIZE: utils.ParseUnit(val),
134
      constants.IDISK_MODE: constants.DISK_RDWR,
135
      } for val in opts.disks.split(",")]
136
  except errors.UnitParseError, err:
137
    ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
138
    return 1
139

    
140
  nics = [val.split("/") for val in opts.nics.split(",")]
141
  for row in nics:
142
    while len(row) < 3:
143
      row.append(None)
144
    for i in range(3):
145
      if row[i] == '':
146
        row[i] = None
147
  nic_dict = [{
148
    constants.INIC_MAC: v[0],
149
    constants.INIC_IP: v[1],
150
    # The iallocator interface defines a "bridge" item
151
    "bridge": v[2],
152
    } for v in nics]
153

    
154
  if opts.tags is None:
155
    opts.tags = []
156
  else:
157
    opts.tags = opts.tags.split(",")
158

    
159
  op = opcodes.OpTestAllocator(mode=opts.mode,
160
                               name=args[0],
161
                               evac_nodes=args,
162
                               instances=args,
163
                               mem_size=opts.mem,
164
                               disks=disks,
165
                               disk_template=opts.disk_template,
166
                               nics=nic_dict,
167
                               os=opts.os,
168
                               vcpus=opts.vcpus,
169
                               tags=opts.tags,
170
                               direction=opts.direction,
171
                               allocator=opts.iallocator,
172
                               reloc_mode=opts.reloc_mode,
173
                               target_groups=opts.target_groups)
174
  result = SubmitOpCode(op, opts=opts)
175
  ToStdout("%s" % result)
176
  return 0
177

    
178

    
179
def _TestJobSubmission(opts):
180
  """Tests submitting jobs.
181

182
  """
183
  ToStdout("Testing job submission")
184

    
185
  testdata = [
186
    (0, 0, constants.OP_PRIO_LOWEST),
187
    (0, 0, constants.OP_PRIO_HIGHEST),
188
    ]
189

    
190
  for priority in (constants.OP_PRIO_SUBMIT_VALID |
191
                   frozenset([constants.OP_PRIO_LOWEST,
192
                              constants.OP_PRIO_HIGHEST])):
193
    for offset in [-1, +1]:
194
      testdata.extend([
195
        (0, 0, priority + offset),
196
        (3, 0, priority + offset),
197
        (0, 3, priority + offset),
198
        (4, 2, priority + offset),
199
        ])
200

    
201
  cl = cli.GetClient()
202

    
203
  for before, after, failpriority in testdata:
204
    ops = []
205
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(before)])
206
    ops.append(opcodes.OpTestDelay(duration=0, priority=failpriority))
207
    ops.extend([opcodes.OpTestDelay(duration=0) for _ in range(after)])
208

    
209
    try:
210
      cl.SubmitJob(ops)
211
    except errors.GenericError, err:
212
      if opts.debug:
213
        ToStdout("Ignoring error: %s", err)
214
    else:
215
      raise errors.OpExecError("Submitting opcode with priority %s did not"
216
                               " fail when it should (allowed are %s)" %
217
                               (failpriority, constants.OP_PRIO_SUBMIT_VALID))
218

    
219
    jobs = [
220
      [opcodes.OpTestDelay(duration=0),
221
       opcodes.OpTestDelay(duration=0, dry_run=False),
222
       opcodes.OpTestDelay(duration=0, dry_run=True)],
223
      ops,
224
      ]
225
    result = cl.SubmitManyJobs(jobs)
226
    if not (len(result) == 2 and
227
            compat.all(len(i) == 2 for i in result) and
228
            compat.all(isinstance(i[1], basestring) for i in result) and
229
            result[0][0] and not result[1][0]):
230
      raise errors.OpExecError("Submitting multiple jobs did not work as"
231
                               " expected, result %s" % result)
232
    assert len(result) == 2
233

    
234
  ToStdout("Job submission tests were successful")
235

    
236

    
237
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
238
  def __init__(self):
239
    """Initializes this class.
240

241
    """
242
    cli.StdioJobPollReportCb.__init__(self)
243
    self._expected_msgcount = 0
244
    self._all_testmsgs = []
245
    self._testmsgs = None
246
    self._job_id = None
247

    
248
  def GetTestMessages(self):
249
    """Returns all test log messages received so far.
250

251
    """
252
    return self._all_testmsgs
253

    
254
  def GetJobId(self):
255
    """Returns the job ID.
256

257
    """
258
    return self._job_id
259

    
260
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
261
    """Handles a log message.
262

263
    """
264
    if self._job_id is None:
265
      self._job_id = job_id
266
    elif self._job_id != job_id:
267
      raise errors.ProgrammerError("The same reporter instance was used for"
268
                                   " more than one job")
269

    
270
    if log_type == constants.ELOG_JQUEUE_TEST:
271
      (sockname, test, arg) = log_msg
272
      return self._ProcessTestMessage(job_id, sockname, test, arg)
273

    
274
    elif (log_type == constants.ELOG_MESSAGE and
275
          log_msg.startswith(constants.JQT_MSGPREFIX)):
276
      if self._testmsgs is None:
277
        raise errors.OpExecError("Received test message without a preceding"
278
                                 " start message")
279
      testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
280
      self._testmsgs.append(testmsg)
281
      self._all_testmsgs.append(testmsg)
282
      return
283

    
284
    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
285
                                                     timestamp, log_type,
286
                                                     log_msg)
287

    
288
  def _ProcessTestMessage(self, job_id, sockname, test, arg):
289
    """Handles a job queue test message.
290

291
    """
292
    if test not in constants.JQT_ALL:
293
      raise errors.OpExecError("Received invalid test message %s" % test)
294

    
295
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
296
    try:
297
      sock.settimeout(30.0)
298

    
299
      logging.debug("Connecting to %s", sockname)
300
      sock.connect(sockname)
301

    
302
      logging.debug("Checking status")
303
      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
304
      if not jobdetails:
305
        raise errors.OpExecError("Can't find job %s" % job_id)
306

    
307
      status = jobdetails[0]
308

    
309
      logging.debug("Status of job %s is %s", job_id, status)
310

    
311
      if test == constants.JQT_EXPANDNAMES:
312
        if status != constants.JOB_STATUS_WAITLOCK:
313
          raise errors.OpExecError("Job status while expanding names is '%s',"
314
                                   " not '%s' as expected" %
315
                                   (status, constants.JOB_STATUS_WAITLOCK))
316
      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
317
        if status != constants.JOB_STATUS_RUNNING:
318
          raise errors.OpExecError("Job status while executing opcode is '%s',"
319
                                   " not '%s' as expected" %
320
                                   (status, constants.JOB_STATUS_RUNNING))
321

    
322
      if test == constants.JQT_STARTMSG:
323
        logging.debug("Expecting %s test messages", arg)
324
        self._testmsgs = []
325
      elif test == constants.JQT_LOGMSG:
326
        if len(self._testmsgs) != arg:
327
          raise errors.OpExecError("Received %s test messages when %s are"
328
                                   " expected" % (len(self._testmsgs), arg))
329
    finally:
330
      logging.debug("Closing socket")
331
      sock.close()
332

    
333

    
334
def TestJobqueue(opts, _):
335
  """Runs a few tests on the job queue.
336

337
  """
338
  _TestJobSubmission(opts)
339

    
340
  (TM_SUCCESS,
341
   TM_MULTISUCCESS,
342
   TM_FAIL,
343
   TM_PARTFAIL) = range(4)
344
  TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
345

    
346
  for mode in TM_ALL:
347
    test_messages = [
348
      "Testing mode %s" % mode,
349
      "Hello World",
350
      "A",
351
      "",
352
      "B"
353
      "Foo|bar|baz",
354
      utils.TimestampForFilename(),
355
      ]
356

    
357
    fail = mode in (TM_FAIL, TM_PARTFAIL)
358

    
359
    if mode == TM_PARTFAIL:
360
      ToStdout("Testing partial job failure")
361
      ops = [
362
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
363
                             log_messages=test_messages, fail=False),
364
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
365
                             log_messages=test_messages, fail=False),
366
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
367
                             log_messages=test_messages, fail=True),
368
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
369
                             log_messages=test_messages, fail=False),
370
        ]
371
      expect_messages = 3 * [test_messages]
372
      expect_opstatus = [
373
        constants.OP_STATUS_SUCCESS,
374
        constants.OP_STATUS_SUCCESS,
375
        constants.OP_STATUS_ERROR,
376
        constants.OP_STATUS_ERROR,
377
        ]
378
      expect_resultlen = 2
379
    elif mode == TM_MULTISUCCESS:
380
      ToStdout("Testing multiple successful opcodes")
381
      ops = [
382
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
383
                             log_messages=test_messages, fail=False),
384
        opcodes.OpTestJqueue(notify_waitlock=True, notify_exec=True,
385
                             log_messages=test_messages, fail=False),
386
        ]
387
      expect_messages = 2 * [test_messages]
388
      expect_opstatus = [
389
        constants.OP_STATUS_SUCCESS,
390
        constants.OP_STATUS_SUCCESS,
391
        ]
392
      expect_resultlen = 2
393
    else:
394
      if mode == TM_SUCCESS:
395
        ToStdout("Testing job success")
396
        expect_opstatus = [constants.OP_STATUS_SUCCESS]
397
      elif mode == TM_FAIL:
398
        ToStdout("Testing job failure")
399
        expect_opstatus = [constants.OP_STATUS_ERROR]
400
      else:
401
        raise errors.ProgrammerError("Unknown test mode %s" % mode)
402

    
403
      ops = [
404
        opcodes.OpTestJqueue(notify_waitlock=True,
405
                             notify_exec=True,
406
                             log_messages=test_messages,
407
                             fail=fail)
408
        ]
409
      expect_messages = [test_messages]
410
      expect_resultlen = 1
411

    
412
    cl = cli.GetClient()
413
    cli.SetGenericOpcodeOpts(ops, opts)
414

    
415
    # Send job to master daemon
416
    job_id = cli.SendJob(ops, cl=cl)
417

    
418
    reporter = _JobQueueTestReporter()
419
    results = None
420

    
421
    try:
422
      results = cli.PollJob(job_id, cl=cl, reporter=reporter)
423
    except errors.OpExecError, err:
424
      if not fail:
425
        raise
426
      ToStdout("Ignoring error: %s", err)
427
    else:
428
      if fail:
429
        raise errors.OpExecError("Job didn't fail when it should")
430

    
431
    # Check length of result
432
    if fail:
433
      if results is not None:
434
        raise errors.OpExecError("Received result from failed job")
435
    elif len(results) != expect_resultlen:
436
      raise errors.OpExecError("Received %s results (%s), expected %s" %
437
                               (len(results), results, expect_resultlen))
438

    
439
    # Check received log messages
440
    all_messages = [i for j in expect_messages for i in j]
441
    if reporter.GetTestMessages() != all_messages:
442
      raise errors.OpExecError("Received test messages don't match input"
443
                               " (input %r, received %r)" %
444
                               (all_messages, reporter.GetTestMessages()))
445

    
446
    # Check final status
447
    reported_job_id = reporter.GetJobId()
448
    if reported_job_id != job_id:
449
      raise errors.OpExecError("Reported job ID %s doesn't match"
450
                               "submission job ID %s" %
451
                               (reported_job_id, job_id))
452

    
453
    jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
454
    if not jobdetails:
455
      raise errors.OpExecError("Can't find job %s" % job_id)
456

    
457
    if fail:
458
      exp_status = constants.JOB_STATUS_ERROR
459
    else:
460
      exp_status = constants.JOB_STATUS_SUCCESS
461

    
462
    (final_status, final_opstatus) = jobdetails
463
    if final_status != exp_status:
464
      raise errors.OpExecError("Final job status is %s, not %s as expected" %
465
                               (final_status, exp_status))
466
    if len(final_opstatus) != len(ops):
467
      raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
468
                               " expected %s)" %
469
                               (len(final_opstatus), len(ops)))
470
    if final_opstatus != expect_opstatus:
471
      raise errors.OpExecError("Opcode status is %s, expected %s" %
472
                               (final_opstatus, expect_opstatus))
473

    
474
  ToStdout("Job queue test successful")
475

    
476
  return 0
477

    
478

    
479
def ListLocks(opts, args): # pylint: disable-msg=W0613
480
  """List all locks.
481

482
  @param opts: the command line options selected by the user
483
  @type args: list
484
  @param args: should be an empty list
485
  @rtype: int
486
  @return: the desired exit code
487

488
  """
489
  selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
490

    
491
  def _DashIfNone(fn):
492
    def wrapper(value):
493
      if not value:
494
        return "-"
495
      return fn(value)
496
    return wrapper
497

    
498
  def _FormatPending(value):
499
    """Format pending acquires.
500

501
    """
502
    return utils.CommaJoin("%s:%s" % (mode, ",".join(threads))
503
                           for mode, threads in value)
504

    
505
  # Format raw values
506
  fmtoverride = {
507
    "mode": (_DashIfNone(str), False),
508
    "owner": (_DashIfNone(",".join), False),
509
    "pending": (_DashIfNone(_FormatPending), False),
510
    }
511

    
512
  while True:
513
    ret = GenericList(constants.QR_LOCK, selected_fields, None, None,
514
                      opts.separator, not opts.no_headers,
515
                      format_override=fmtoverride, verbose=opts.verbose)
516

    
517
    if ret != constants.EXIT_SUCCESS:
518
      return ret
519

    
520
    if not opts.interval:
521
      break
522

    
523
    ToStdout("")
524
    time.sleep(opts.interval)
525

    
526
  return 0
527

    
528

    
529
commands = {
530
  'delay': (
531
    Delay, [ArgUnknown(min=1, max=1)],
532
    [cli_option("--no-master", dest="on_master", default=True,
533
                action="store_false", help="Do not sleep in the master code"),
534
     cli_option("-n", dest="on_nodes", default=[],
535
                action="append", help="Select nodes to sleep on"),
536
     cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
537
                help="Number of times to repeat the sleep"),
538
     DRY_RUN_OPT, PRIORITY_OPT,
539
     ],
540
    "[opts...] <duration>", "Executes a TestDelay OpCode"),
541
  'submit-job': (
542
    GenericOpCodes, [ArgFile(min=1)],
543
    [VERBOSE_OPT,
544
     cli_option("--op-repeat", type="int", default="1", dest="rep_op",
545
                help="Repeat the opcode sequence this number of times"),
546
     cli_option("--job-repeat", type="int", default="1", dest="rep_job",
547
                help="Repeat the job this number of times"),
548
     cli_option("--timing-stats", default=False,
549
                action="store_true", help="Show timing stats"),
550
     cli_option("--each", default=False, action="store_true",
551
                help="Submit each job separately"),
552
     DRY_RUN_OPT, PRIORITY_OPT,
553
     ],
554
    "<op_list_file...>", "Submits jobs built from json files"
555
    " containing a list of serialized opcodes"),
556
  'allocator': (
557
    TestAllocator, [ArgUnknown(min=1)],
558
    [cli_option("--dir", dest="direction", default=constants.IALLOCATOR_DIR_IN,
559
                choices=list(constants.VALID_IALLOCATOR_DIRECTIONS),
560
                help="Show allocator input (in) or allocator"
561
                " results (out)"),
562
     IALLOCATOR_OPT,
563
     cli_option("-m", "--mode", default="relocate",
564
                choices=list(constants.VALID_IALLOCATOR_MODES),
565
                help=("Request mode (one of %s)" %
566
                      utils.CommaJoin(constants.VALID_IALLOCATOR_MODES))),
567
     cli_option("--mem", default=128, type="unit",
568
                help="Memory size for the instance (MiB)"),
569
     cli_option("--disks", default="4096,4096",
570
                help="Comma separated list of disk sizes (MiB)"),
571
     DISK_TEMPLATE_OPT,
572
     cli_option("--nics", default="00:11:22:33:44:55",
573
                help="Comma separated list of nics, each nic"
574
                " definition is of form mac/ip/bridge, if"
575
                " missing values are replace by None"),
576
     OS_OPT,
577
     cli_option("-p", "--vcpus", default=1, type="int",
578
                help="Select number of VCPUs for the instance"),
579
     cli_option("--tags", default=None,
580
                help="Comma separated list of tags"),
581
     cli_option("--reloc-mode", default=constants.IALLOCATOR_MRELOC_ANY,
582
                choices=list(constants.IALLOCATOR_MRELOC_MODES),
583
                help=("Instance relocation mode (one of %s)" %
584
                      utils.CommaJoin(constants.IALLOCATOR_MRELOC_MODES))),
585
     cli_option("--target-groups", help="Target groups for relocation"),
586
     DRY_RUN_OPT, PRIORITY_OPT,
587
     ],
588
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
589
  "test-jobqueue": (
590
    TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
591
    "", "Test a few aspects of the job queue"),
592
  "locks": (
593
    ListLocks, ARGS_NONE,
594
    [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
595
    "[--interval N]", "Show a list of locks in the master daemon"),
596
  }
597

    
598

    
599
def Main():
600
  return GenericMain(commands)