Revision 4e338533

b/lib/cli.py
1223 1223
  return job_id
1224 1224

  
1225 1225

  
1226
def PollJob(job_id, cl=None, feedback_fn=None):
1227
  """Function to poll for the result of a job.
1226
def GenericPollJob(job_id, cbs, report_cbs):
1227
  """Generic job-polling function.
1228 1228

  
1229
  @type job_id: job identified
1230
  @param job_id: the job to poll for results
1231
  @type cl: luxi.Client
1232
  @param cl: the luxi client to use for communicating with the master;
1233
             if None, a new client will be created
1229
  @type job_id: number
1230
  @param job_id: Job ID
1231
  @type cbs: Instance of L{JobPollCbBase}
1232
  @param cbs: Data callbacks
1233
  @type report_cbs: Instance of L{JobPollReportCbBase}
1234
  @param report_cbs: Reporting callbacks
1234 1235

  
1235 1236
  """
1236
  if cl is None:
1237
    cl = GetClient()
1238

  
1239 1237
  prev_job_info = None
1240 1238
  prev_logmsg_serial = None
1241 1239

  
1242 1240
  status = None
1243 1241

  
1244
  notified_queued = False
1245
  notified_waitlock = False
1246

  
1247 1242
  while True:
1248
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1249
                                     prev_logmsg_serial)
1243
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1244
                                      prev_logmsg_serial)
1250 1245
    if not result:
1251 1246
      # job not found, go away!
1252 1247
      raise errors.JobLost("Job with id %s lost" % job_id)
1253
    elif result == constants.JOB_NOTCHANGED:
1254
      if status is not None and not callable(feedback_fn):
1255
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1256
          ToStderr("Job %s is waiting in queue", job_id)
1257
          notified_queued = True
1258
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1259
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1260
          notified_waitlock = True
1248

  
1249
    if result == constants.JOB_NOTCHANGED:
1250
      report_cbs.ReportNotChanged(job_id, status)
1261 1251

  
1262 1252
      # Wait again
1263 1253
      continue
......
1268 1258

  
1269 1259
    if log_entries:
1270 1260
      for log_entry in log_entries:
1271
        (serial, timestamp, _, message) = log_entry
1272
        if callable(feedback_fn):
1273
          feedback_fn(log_entry[1:])
1274
        else:
1275
          encoded = utils.SafeEncode(message)
1276
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1261
        (serial, timestamp, log_type, message) = log_entry
1262
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1263
                                    log_type, message)
1277 1264
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1278 1265

  
1279 1266
    # TODO: Handle canceled and archived jobs
......
1285 1272

  
1286 1273
    prev_job_info = job_info
1287 1274

  
1288
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1275
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1289 1276
  if not jobs:
1290 1277
    raise errors.JobLost("Job with id %s lost" % job_id)
1291 1278

  
1292 1279
  status, opstatus, result = jobs[0]
1280

  
1293 1281
  if status == constants.JOB_STATUS_SUCCESS:
1294 1282
    return result
1295
  elif status in (constants.JOB_STATUS_CANCELING,
1296
                  constants.JOB_STATUS_CANCELED):
1283

  
1284
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1297 1285
    raise errors.OpExecError("Job was canceled")
1286

  
1287
  has_ok = False
1288
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1289
    if status == constants.OP_STATUS_SUCCESS:
1290
      has_ok = True
1291
    elif status == constants.OP_STATUS_ERROR:
1292
      errors.MaybeRaise(msg)
1293

  
1294
      if has_ok:
1295
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1296
                                 (idx, msg))
1297

  
1298
      raise errors.OpExecError(str(msg))
1299

  
1300
  # default failure mode
1301
  raise errors.OpExecError(result)
1302

  
1303

  
1304
class JobPollCbBase:
1305
  """Base class for L{GenericPollJob} callbacks.
1306

  
1307
  """
1308
  def __init__(self):
1309
    """Initializes this class.
1310

  
1311
    """
1312

  
1313
  def WaitForJobChangeOnce(self, job_id, fields,
1314
                           prev_job_info, prev_log_serial):
1315
    """Waits for changes on a job.
1316

  
1317
    """
1318
    raise NotImplementedError()
1319

  
1320
  def QueryJobs(self, job_ids, fields):
1321
    """Returns the selected fields for the selected job IDs.
1322

  
1323
    @type job_ids: list of numbers
1324
    @param job_ids: Job IDs
1325
    @type fields: list of strings
1326
    @param fields: Fields
1327

  
1328
    """
1329
    raise NotImplementedError()
1330

  
1331

  
1332
class JobPollReportCbBase:
1333
  """Base class for L{GenericPollJob} reporting callbacks.
1334

  
1335
  """
1336
  def __init__(self):
1337
    """Initializes this class.
1338

  
1339
    """
1340

  
1341
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1342
    """Handles a log message.
1343

  
1344
    """
1345
    raise NotImplementedError()
1346

  
1347
  def ReportNotChanged(self, job_id, status):
1348
    """Called for if a job hasn't changed in a while.
1349

  
1350
    @type job_id: number
1351
    @param job_id: Job ID
1352
    @type status: string or None
1353
    @param status: Job status if available
1354

  
1355
    """
1356
    raise NotImplementedError()
1357

  
1358

  
1359
class _LuxiJobPollCb(JobPollCbBase):
1360
  def __init__(self, cl):
1361
    """Initializes this class.
1362

  
1363
    """
1364
    JobPollCbBase.__init__(self)
1365
    self.cl = cl
1366

  
1367
  def WaitForJobChangeOnce(self, job_id, fields,
1368
                           prev_job_info, prev_log_serial):
1369
    """Waits for changes on a job.
1370

  
1371
    """
1372
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1373
                                        prev_job_info, prev_log_serial)
1374

  
1375
  def QueryJobs(self, job_ids, fields):
1376
    """Returns the selected fields for the selected job IDs.
1377

  
1378
    """
1379
    return self.cl.QueryJobs(job_ids, fields)
1380

  
1381

  
1382
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1383
  def __init__(self, feedback_fn):
1384
    """Initializes this class.
1385

  
1386
    """
1387
    JobPollReportCbBase.__init__(self)
1388

  
1389
    self.feedback_fn = feedback_fn
1390

  
1391
    assert callable(feedback_fn)
1392

  
1393
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1394
    """Handles a log message.
1395

  
1396
    """
1397
    self.feedback_fn((timestamp, log_type, log_msg))
1398

  
1399
  def ReportNotChanged(self, job_id, status):
1400
    """Called if a job hasn't changed in a while.
1401

  
1402
    """
1403
    # Ignore
1404

  
1405

  
1406
class StdioJobPollReportCb(JobPollReportCbBase):
1407
  def __init__(self):
1408
    """Initializes this class.
1409

  
1410
    """
1411
    JobPollReportCbBase.__init__(self)
1412

  
1413
    self.notified_queued = False
1414
    self.notified_waitlock = False
1415

  
1416
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1417
    """Handles a log message.
1418

  
1419
    """
1420
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1421
             utils.SafeEncode(log_msg))
1422

  
1423
  def ReportNotChanged(self, job_id, status):
1424
    """Called if a job hasn't changed in a while.
1425

  
1426
    """
1427
    if status is None:
1428
      return
1429

  
1430
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1431
      ToStderr("Job %s is waiting in queue", job_id)
1432
      self.notified_queued = True
1433

  
1434
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1435
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1436
      self.notified_waitlock = True
1437

  
1438

  
1439
def PollJob(job_id, cl=None, feedback_fn=None):
1440
  """Function to poll for the result of a job.
1441

  
1442
  @type job_id: job identified
1443
  @param job_id: the job to poll for results
1444
  @type cl: luxi.Client
1445
  @param cl: the luxi client to use for communicating with the master;
1446
             if None, a new client will be created
1447

  
1448
  """
1449
  if cl is None:
1450
    cl = GetClient()
1451

  
1452
  if feedback_fn:
1453
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1298 1454
  else:
1299
    has_ok = False
1300
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1301
      if status == constants.OP_STATUS_SUCCESS:
1302
        has_ok = True
1303
      elif status == constants.OP_STATUS_ERROR:
1304
        errors.MaybeRaise(msg)
1305
        if has_ok:
1306
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1307
                                   (idx, msg))
1308
        else:
1309
          raise errors.OpExecError(str(msg))
1310
    # default failure mode
1311
    raise errors.OpExecError(result)
1455
    reporter = StdioJobPollReportCb()
1456

  
1457
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1312 1458

  
1313 1459

  
1314 1460
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
b/test/ganeti.cli_unittest.py
29 29

  
30 30
from ganeti import constants
31 31
from ganeti import cli
32
from ganeti import errors
33
from ganeti import utils
32 34
from ganeti.errors import OpPrereqError, ParameterError
33 35

  
34 36

  
......
100 102

  
101 103

  
102 104
class TestToStream(unittest.TestCase):
103
  """Thes the ToStream functions"""
105
  """Test the ToStream functions"""
104 106

  
105 107
  def testBasic(self):
106 108
    for data in ["foo",
......
246 248
               None, None, "m", exp)
247 249

  
248 250

  
251
class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase):
252
  def __init__(self, tc, job_id):
253
    self.tc = tc
254
    self.job_id = job_id
255
    self._wfjcr = []
256
    self._jobstatus = []
257
    self._expect_notchanged = False
258
    self._expect_log = []
259

  
260
  def CheckEmpty(self):
261
    self.tc.assertFalse(self._wfjcr)
262
    self.tc.assertFalse(self._jobstatus)
263
    self.tc.assertFalse(self._expect_notchanged)
264
    self.tc.assertFalse(self._expect_log)
265

  
266
  def AddWfjcResult(self, *args):
267
    self._wfjcr.append(args)
268

  
269
  def AddQueryJobsResult(self, *args):
270
    self._jobstatus.append(args)
271

  
272
  def WaitForJobChangeOnce(self, job_id, fields,
273
                           prev_job_info, prev_log_serial):
274
    self.tc.assertEqual(job_id, self.job_id)
275
    self.tc.assertEqualValues(fields, ["status"])
276
    self.tc.assertFalse(self._expect_notchanged)
277
    self.tc.assertFalse(self._expect_log)
278

  
279
    (exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0)
280
    self.tc.assertEqualValues(prev_job_info, exp_prev_job_info)
281
    self.tc.assertEqual(prev_log_serial, exp_prev_log_serial)
282

  
283
    if result == constants.JOB_NOTCHANGED:
284
      self._expect_notchanged = True
285
    elif result:
286
      (_, logmsgs) = result
287
      if logmsgs:
288
        self._expect_log.extend(logmsgs)
289

  
290
    return result
291

  
292
  def QueryJobs(self, job_ids, fields):
293
    self.tc.assertEqual(job_ids, [self.job_id])
294
    self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"])
295
    self.tc.assertFalse(self._expect_notchanged)
296
    self.tc.assertFalse(self._expect_log)
297

  
298
    result = self._jobstatus.pop(0)
299
    self.tc.assertEqual(len(fields), len(result))
300
    return [result]
301

  
302
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
303
    self.tc.assertEqual(job_id, self.job_id)
304
    self.tc.assertEqualValues((serial, timestamp, log_type, log_msg),
305
                              self._expect_log.pop(0))
306

  
307
  def ReportNotChanged(self, job_id, status):
308
    self.tc.assertEqual(job_id, self.job_id)
309
    self.tc.assert_(self._expect_notchanged)
310
    self._expect_notchanged = False
311

  
312

  
313
class TestGenericPollJob(testutils.GanetiTestCase):
314
  def testSuccessWithLog(self):
315
    job_id = 29609
316
    cbs = _MockJobPollCb(self, job_id)
317

  
318
    cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
319

  
320
    cbs.AddWfjcResult(None, None,
321
                      ((constants.JOB_STATUS_QUEUED, ), None))
322

  
323
    cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
324
                      constants.JOB_NOTCHANGED)
325

  
326
    cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
327
                      ((constants.JOB_STATUS_RUNNING, ),
328
                       [(1, utils.SplitTime(1273491611.0),
329
                         constants.ELOG_MESSAGE, "Step 1"),
330
                        (2, utils.SplitTime(1273491615.9),
331
                         constants.ELOG_MESSAGE, "Step 2"),
332
                        (3, utils.SplitTime(1273491625.02),
333
                         constants.ELOG_MESSAGE, "Step 3"),
334
                        (4, utils.SplitTime(1273491635.05),
335
                         constants.ELOG_MESSAGE, "Step 4"),
336
                        (37, utils.SplitTime(1273491645.0),
337
                         constants.ELOG_MESSAGE, "Step 5"),
338
                        (203, utils.SplitTime(127349155.0),
339
                         constants.ELOG_MESSAGE, "Step 6")]))
340

  
341
    cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203,
342
                      ((constants.JOB_STATUS_RUNNING, ),
343
                       [(300, utils.SplitTime(1273491711.01),
344
                         constants.ELOG_MESSAGE, "Step X"),
345
                        (302, utils.SplitTime(1273491815.8),
346
                         constants.ELOG_MESSAGE, "Step Y"),
347
                        (303, utils.SplitTime(1273491925.32),
348
                         constants.ELOG_MESSAGE, "Step Z")]))
349

  
350
    cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303,
351
                      ((constants.JOB_STATUS_SUCCESS, ), None))
352

  
353
    cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS,
354
                           [constants.OP_STATUS_SUCCESS,
355
                            constants.OP_STATUS_SUCCESS],
356
                           ["Hello World", "Foo man bar"])
357

  
358
    self.assertEqual(["Hello World", "Foo man bar"],
359
                     cli.GenericPollJob(job_id, cbs, cbs))
360
    cbs.CheckEmpty()
361

  
362
  def testJobLost(self):
363
    job_id = 13746
364

  
365
    cbs = _MockJobPollCb(self, job_id)
366
    cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
367
    cbs.AddWfjcResult(None, None, None)
368
    self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs)
369
    cbs.CheckEmpty()
370

  
371
  def testError(self):
372
    job_id = 31088
373

  
374
    cbs = _MockJobPollCb(self, job_id)
375
    cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
376
    cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
377
    cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
378
                           [constants.OP_STATUS_SUCCESS,
379
                            constants.OP_STATUS_ERROR],
380
                           ["Hello World", "Error code 123"])
381
    self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
382
    cbs.CheckEmpty()
383

  
384
  def testError2(self):
385
    job_id = 22235
386

  
387
    cbs = _MockJobPollCb(self, job_id)
388
    cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
389
    encexc = errors.EncodeException(errors.LockError("problem"))
390
    cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
391
                           [constants.OP_STATUS_ERROR], [encexc])
392
    self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs)
393
    cbs.CheckEmpty()
394

  
395
  def testWeirdError(self):
396
    job_id = 28847
397

  
398
    cbs = _MockJobPollCb(self, job_id)
399
    cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
400
    cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
401
                           [constants.OP_STATUS_RUNNING,
402
                            constants.OP_STATUS_RUNNING],
403
                           [None, None])
404
    self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
405
    cbs.CheckEmpty()
406

  
407
  def testCancel(self):
408
    job_id = 4275
409

  
410
    cbs = _MockJobPollCb(self, job_id)
411
    cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
412
    cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None))
413
    cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING,
414
                           [constants.OP_STATUS_CANCELING,
415
                            constants.OP_STATUS_CANCELING],
416
                           [None, None])
417
    self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
418
    cbs.CheckEmpty()
419

  
420

  
249 421
if __name__ == '__main__':
250 422
  testutils.GanetiTestProgram()

Also available in: Unified diff