Statistics
| Branch: | Tag: | Revision:

root / scripts / gnt-debug @ db5a8a2d

History | View | Annotate | Download (15.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Debugging commands"""
22

    
23
# pylint: disable-msg=W0401,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0614: Unused import %s from wildcard import (since we need cli)
26
# C0103: Invalid name gnt-backup
27

    
28
import sys
29
import simplejson
30
import time
31
import socket
32
import logging
33

    
34
from ganeti.cli import *
35
from ganeti import cli
36
from ganeti import constants
37
from ganeti import opcodes
38
from ganeti import utils
39
from ganeti import errors
40

    
41

    
42
def Delay(opts, args):
43
  """Sleeps for a while
44

    
45
  @param opts: the command line options selected by the user
46
  @type args: list
47
  @param args: should contain only one element, the duration
48
      the sleep
49
  @rtype: int
50
  @return: the desired exit code
51

    
52
  """
53
  delay = float(args[0])
54
  op = opcodes.OpTestDelay(duration=delay,
55
                           on_master=opts.on_master,
56
                           on_nodes=opts.on_nodes,
57
                           repeat=opts.repeat)
58
  SubmitOpCode(op, opts=opts)
59

    
60
  return 0
61

    
62

    
63
def GenericOpCodes(opts, args):
64
  """Send any opcode to the master.
65

    
66
  @param opts: the command line options selected by the user
67
  @type args: list
68
  @param args: should contain only one element, the path of
69
      the file with the opcode definition
70
  @rtype: int
71
  @return: the desired exit code
72

    
73
  """
74
  cl = cli.GetClient()
75
  jex = cli.JobExecutor(cl=cl, verbose=opts.verbose, opts=opts)
76

    
77
  job_cnt = 0
78
  op_cnt = 0
79
  if opts.timing_stats:
80
    ToStdout("Loading...")
81
  for job_idx in range(opts.rep_job):
82
    for fname in args:
83
      # pylint: disable-msg=W0142
84
      op_data = simplejson.loads(utils.ReadFile(fname))
85
      op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data]
86
      op_list = op_list * opts.rep_op
87
      jex.QueueJob("file %s/%d" % (fname, job_idx), *op_list)
88
      op_cnt += len(op_list)
89
      job_cnt += 1
90

    
91
  if opts.timing_stats:
92
    t1 = time.time()
93
    ToStdout("Submitting...")
94

    
95
  jex.SubmitPending(each=opts.each)
96

    
97
  if opts.timing_stats:
98
    t2 = time.time()
99
    ToStdout("Executing...")
100

    
101
  jex.GetResults()
102
  if opts.timing_stats:
103
    t3 = time.time()
104
    ToStdout("C:op     %4d" % op_cnt)
105
    ToStdout("C:job    %4d" % job_cnt)
106
    ToStdout("T:submit %4.4f" % (t2-t1))
107
    ToStdout("T:exec   %4.4f" % (t3-t2))
108
    ToStdout("T:total  %4.4f" % (t3-t1))
109
  return 0
110

    
111

    
112
def TestAllocator(opts, args):
113
  """Runs the test allocator opcode.
114

    
115
  @param opts: the command line options selected by the user
116
  @type args: list
117
  @param args: should contain only one element, the iallocator name
118
  @rtype: int
119
  @return: the desired exit code
120

    
121
  """
122
  try:
123
    disks = [{"size": utils.ParseUnit(val), "mode": 'w'}
124
             for val in opts.disks.split(",")]
125
  except errors.UnitParseError, err:
126
    ToStderr("Invalid disks parameter '%s': %s", opts.disks, err)
127
    return 1
128

    
129
  nics = [val.split("/") for val in opts.nics.split(",")]
130
  for row in nics:
131
    while len(row) < 3:
132
      row.append(None)
133
    for i in range(3):
134
      if row[i] == '':
135
        row[i] = None
136
  nic_dict = [{"mac": v[0], "ip": v[1], "bridge": v[2]} for v in nics]
137

    
138
  if opts.tags is None:
139
    opts.tags = []
140
  else:
141
    opts.tags = opts.tags.split(",")
142

    
143
  op = opcodes.OpTestAllocator(mode=opts.mode,
144
                               name=args[0],
145
                               evac_nodes=args,
146
                               mem_size=opts.mem,
147
                               disks=disks,
148
                               disk_template=opts.disk_template,
149
                               nics=nic_dict,
150
                               os=opts.os,
151
                               vcpus=opts.vcpus,
152
                               tags=opts.tags,
153
                               direction=opts.direction,
154
                               allocator=opts.iallocator,
155
                               )
156
  result = SubmitOpCode(op, opts=opts)
157
  ToStdout("%s" % result)
158
  return 0
159

    
160

    
161
class _JobQueueTestReporter(cli.StdioJobPollReportCb):
162
  def __init__(self):
163
    """Initializes this class.
164

    
165
    """
166
    cli.StdioJobPollReportCb.__init__(self)
167
    self._expected_msgcount = 0
168
    self._all_testmsgs = []
169
    self._testmsgs = None
170
    self._job_id = None
171

    
172
  def GetTestMessages(self):
173
    """Returns all test log messages received so far.
174

    
175
    """
176
    return self._all_testmsgs
177

    
178
  def GetJobId(self):
179
    """Returns the job ID.
180

    
181
    """
182
    return self._job_id
183

    
184
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
185
    """Handles a log message.
186

    
187
    """
188
    if self._job_id is None:
189
      self._job_id = job_id
190
    elif self._job_id != job_id:
191
      raise errors.ProgrammerError("The same reporter instance was used for"
192
                                   " more than one job")
193

    
194
    if log_type == constants.ELOG_JQUEUE_TEST:
195
      (sockname, test, arg) = log_msg
196
      return self._ProcessTestMessage(job_id, sockname, test, arg)
197

    
198
    elif (log_type == constants.ELOG_MESSAGE and
199
          log_msg.startswith(constants.JQT_MSGPREFIX)):
200
      if self._testmsgs is None:
201
        raise errors.OpExecError("Received test message without a preceding"
202
                                 " start message")
203
      testmsg = log_msg[len(constants.JQT_MSGPREFIX):]
204
      self._testmsgs.append(testmsg)
205
      self._all_testmsgs.append(testmsg)
206
      return
207

    
208
    return cli.StdioJobPollReportCb.ReportLogMessage(self, job_id, serial,
209
                                                     timestamp, log_type,
210
                                                     log_msg)
211

    
212
  def _ProcessTestMessage(self, job_id, sockname, test, arg):
213
    """Handles a job queue test message.
214

    
215
    """
216
    if test not in constants.JQT_ALL:
217
      raise errors.OpExecError("Received invalid test message %s" % test)
218

    
219
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220
    try:
221
      sock.settimeout(30.0)
222

    
223
      logging.debug("Connecting to %s", sockname)
224
      sock.connect(sockname)
225

    
226
      logging.debug("Checking status")
227
      jobdetails = cli.GetClient().QueryJobs([job_id], ["status"])[0]
228
      if not jobdetails:
229
        raise errors.OpExecError("Can't find job %s" % job_id)
230

    
231
      status = jobdetails[0]
232

    
233
      logging.debug("Status of job %s is %s", job_id, status)
234

    
235
      if test == constants.JQT_EXPANDNAMES:
236
        if status != constants.JOB_STATUS_WAITLOCK:
237
          raise errors.OpExecError("Job status while expanding names is '%s',"
238
                                   " not '%s' as expected" %
239
                                   (status, constants.JOB_STATUS_WAITLOCK))
240
      elif test in (constants.JQT_EXEC, constants.JQT_LOGMSG):
241
        if status != constants.JOB_STATUS_RUNNING:
242
          raise errors.OpExecError("Job status while executing opcode is '%s',"
243
                                   " not '%s' as expected" %
244
                                   (status, constants.JOB_STATUS_RUNNING))
245

    
246
      if test == constants.JQT_STARTMSG:
247
        logging.debug("Expecting %s test messages", arg)
248
        self._testmsgs = []
249
      elif test == constants.JQT_LOGMSG:
250
        if len(self._testmsgs) != arg:
251
          raise errors.OpExecError("Received %s test messages when %s are"
252
                                   " expected" % (len(self._testmsgs), arg))
253
    finally:
254
      logging.debug("Closing socket")
255
      sock.close()
256

    
257

    
258
def TestJobqueue(opts, _):
259
  """Runs a few tests on the job queue.
260

    
261
  """
262
  (TM_SUCCESS,
263
   TM_MULTISUCCESS,
264
   TM_FAIL,
265
   TM_PARTFAIL) = range(4)
266
  TM_ALL = frozenset([TM_SUCCESS, TM_MULTISUCCESS, TM_FAIL, TM_PARTFAIL])
267

    
268
  for mode in TM_ALL:
269
    test_messages = [
270
      "Testing mode %s" % mode,
271
      "Hello World",
272
      "A",
273
      "",
274
      "B"
275
      "Foo|bar|baz",
276
      utils.TimestampForFilename(),
277
      ]
278

    
279
    fail = mode in (TM_FAIL, TM_PARTFAIL)
280

    
281
    if mode == TM_PARTFAIL:
282
      ToStdout("Testing partial job failure")
283
      ops = [
284
        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
285
                               log_messages=test_messages, fail=False),
286
        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
287
                               log_messages=test_messages, fail=False),
288
        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
289
                               log_messages=test_messages, fail=True),
290
        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
291
                               log_messages=test_messages, fail=False),
292
        ]
293
      expect_messages = 3 * [test_messages]
294
      expect_opstatus = [
295
        constants.OP_STATUS_SUCCESS,
296
        constants.OP_STATUS_SUCCESS,
297
        constants.OP_STATUS_ERROR,
298
        constants.OP_STATUS_ERROR,
299
        ]
300
      expect_resultlen = 2
301
    elif mode == TM_MULTISUCCESS:
302
      ToStdout("Testing multiple successful opcodes")
303
      ops = [
304
        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
305
                               log_messages=test_messages, fail=False),
306
        opcodes.OpTestJobqueue(notify_waitlock=True, notify_exec=True,
307
                               log_messages=test_messages, fail=False),
308
        ]
309
      expect_messages = 2 * [test_messages]
310
      expect_opstatus = [
311
        constants.OP_STATUS_SUCCESS,
312
        constants.OP_STATUS_SUCCESS,
313
        ]
314
      expect_resultlen = 2
315
    else:
316
      if mode == TM_SUCCESS:
317
        ToStdout("Testing job success")
318
        expect_opstatus = [constants.OP_STATUS_SUCCESS]
319
      elif mode == TM_FAIL:
320
        ToStdout("Testing job failure")
321
        expect_opstatus = [constants.OP_STATUS_ERROR]
322
      else:
323
        raise errors.ProgrammerError("Unknown test mode %s" % mode)
324

    
325
      ops = [
326
        opcodes.OpTestJobqueue(notify_waitlock=True,
327
                               notify_exec=True,
328
                               log_messages=test_messages,
329
                               fail=fail)
330
        ]
331
      expect_messages = [test_messages]
332
      expect_resultlen = 1
333

    
334
    cl = cli.GetClient()
335
    cli.SetGenericOpcodeOpts(ops, opts)
336

    
337
    # Send job to master daemon
338
    job_id = cli.SendJob(ops, cl=cl)
339

    
340
    reporter = _JobQueueTestReporter()
341
    results = None
342

    
343
    try:
344
      results = cli.PollJob(job_id, cl=cl, reporter=reporter)
345
    except errors.OpExecError, err:
346
      if not fail:
347
        raise
348
      ToStdout("Ignoring error: %s", err)
349
    else:
350
      if fail:
351
        raise errors.OpExecError("Job didn't fail when it should")
352

    
353
    # Check length of result
354
    if fail:
355
      if results is not None:
356
        raise errors.OpExecError("Received result from failed job")
357
    elif len(results) != expect_resultlen:
358
      raise errors.OpExecError("Received %s results (%s), expected %s" %
359
                               (len(results), results, expect_resultlen))
360

    
361
    # Check received log messages
362
    all_messages = [i for j in expect_messages for i in j]
363
    if reporter.GetTestMessages() != all_messages:
364
      raise errors.OpExecError("Received test messages don't match input"
365
                               " (input %r, received %r)" %
366
                               (all_messages, reporter.GetTestMessages()))
367

    
368
    # Check final status
369
    reported_job_id = reporter.GetJobId()
370
    if reported_job_id != job_id:
371
      raise errors.OpExecError("Reported job ID %s doesn't match"
372
                               "submission job ID %s" %
373
                               (reported_job_id, job_id))
374

    
375
    jobdetails = cli.GetClient().QueryJobs([job_id], ["status", "opstatus"])[0]
376
    if not jobdetails:
377
      raise errors.OpExecError("Can't find job %s" % job_id)
378

    
379
    if fail:
380
      exp_status = constants.JOB_STATUS_ERROR
381
    else:
382
      exp_status = constants.JOB_STATUS_SUCCESS
383

    
384
    (final_status, final_opstatus) = jobdetails
385
    if final_status != exp_status:
386
      raise errors.OpExecError("Final job status is %s, not %s as expected" %
387
                               (final_status, exp_status))
388
    if len(final_opstatus) != len(ops):
389
      raise errors.OpExecError("Did not receive status for all opcodes (got %s,"
390
                               " expected %s)" %
391
                               (len(final_opstatus), len(ops)))
392
    if final_opstatus != expect_opstatus:
393
      raise errors.OpExecError("Opcode status is %s, expected %s" %
394
                               (final_opstatus, expect_opstatus))
395

    
396
  ToStdout("Job queue test successful")
397

    
398
  return 0
399

    
400

    
401
commands = {
402
  'delay': (
403
    Delay, [ArgUnknown(min=1, max=1)],
404
    [cli_option("--no-master", dest="on_master", default=True,
405
                action="store_false", help="Do not sleep in the master code"),
406
     cli_option("-n", dest="on_nodes", default=[],
407
                action="append", help="Select nodes to sleep on"),
408
     cli_option("-r", "--repeat", type="int", default="0", dest="repeat",
409
                help="Number of times to repeat the sleep"),
410
     DRY_RUN_OPT,
411
     ],
412
    "[opts...] <duration>", "Executes a TestDelay OpCode"),
413
  'submit-job': (
414
    GenericOpCodes, [ArgFile(min=1)],
415
    [VERBOSE_OPT,
416
     cli_option("--op-repeat", type="int", default="1", dest="rep_op",
417
                help="Repeat the opcode sequence this number of times"),
418
     cli_option("--job-repeat", type="int", default="1", dest="rep_job",
419
                help="Repeat the job this number of times"),
420
     cli_option("--timing-stats", default=False,
421
                action="store_true", help="Show timing stats"),
422
     cli_option("--each", default=False, action="store_true",
423
                help="Submit each job separately"),
424
     DRY_RUN_OPT,
425
     ],
426
    "<op_list_file...>", "Submits jobs built from json files"
427
    " containing a list of serialized opcodes"),
428
  'allocator': (
429
    TestAllocator, [ArgUnknown(min=1)],
430
    [cli_option("--dir", dest="direction",
431
                default="in", choices=["in", "out"],
432
                help="Show allocator input (in) or allocator"
433
                " results (out)"),
434
     IALLOCATOR_OPT,
435
     cli_option("-m", "--mode", default="relocate",
436
                choices=["relocate", "allocate", "multi-evacuate"],
437
                help="Request mode, either allocate or relocate"),
438
     cli_option("--mem", default=128, type="unit",
439
                help="Memory size for the instance (MiB)"),
440
     cli_option("--disks", default="4096,4096",
441
                help="Comma separated list of disk sizes (MiB)"),
442
     DISK_TEMPLATE_OPT,
443
     cli_option("--nics", default="00:11:22:33:44:55",
444
                help="Comma separated list of nics, each nic"
445
                " definition is of form mac/ip/bridge, if"
446
                " missing values are replace by None"),
447
     OS_OPT,
448
     cli_option("-p", "--vcpus", default=1, type="int",
449
                help="Select number of VCPUs for the instance"),
450
     cli_option("--tags", default=None,
451
                help="Comma separated list of tags"),
452
     DRY_RUN_OPT,
453
     ],
454
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
455
  "test-jobqueue": (
456
    TestJobqueue, ARGS_NONE, [],
457
    "", "Test a few aspects of the job queue")
458
  }
459

    
460

    
461
if __name__ == '__main__':
462
  sys.exit(GenericMain(commands))