Revision 4e338533
b/lib/cli.py | ||
---|---|---|
1223 | 1223 |
return job_id |
1224 | 1224 |
|
1225 | 1225 |
|
1226 |
def PollJob(job_id, cl=None, feedback_fn=None):
|
|
1227 |
"""Function to poll for the result of a job.
|
|
1226 |
def GenericPollJob(job_id, cbs, report_cbs):
|
|
1227 |
"""Generic job-polling function.
|
|
1228 | 1228 |
|
1229 |
@type job_id: job identified |
|
1230 |
@param job_id: the job to poll for results |
|
1231 |
@type cl: luxi.Client |
|
1232 |
@param cl: the luxi client to use for communicating with the master; |
|
1233 |
if None, a new client will be created |
|
1229 |
@type job_id: number |
|
1230 |
@param job_id: Job ID |
|
1231 |
@type cbs: Instance of L{JobPollCbBase} |
|
1232 |
@param cbs: Data callbacks |
|
1233 |
@type report_cbs: Instance of L{JobPollReportCbBase} |
|
1234 |
@param report_cbs: Reporting callbacks |
|
1234 | 1235 |
|
1235 | 1236 |
""" |
1236 |
if cl is None: |
|
1237 |
cl = GetClient() |
|
1238 |
|
|
1239 | 1237 |
prev_job_info = None |
1240 | 1238 |
prev_logmsg_serial = None |
1241 | 1239 |
|
1242 | 1240 |
status = None |
1243 | 1241 |
|
1244 |
notified_queued = False |
|
1245 |
notified_waitlock = False |
|
1246 |
|
|
1247 | 1242 |
while True: |
1248 |
result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
|
|
1249 |
prev_logmsg_serial) |
|
1243 |
result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
|
|
1244 |
prev_logmsg_serial)
|
|
1250 | 1245 |
if not result: |
1251 | 1246 |
# job not found, go away! |
1252 | 1247 |
raise errors.JobLost("Job with id %s lost" % job_id) |
1253 |
elif result == constants.JOB_NOTCHANGED: |
|
1254 |
if status is not None and not callable(feedback_fn): |
|
1255 |
if status == constants.JOB_STATUS_QUEUED and not notified_queued: |
|
1256 |
ToStderr("Job %s is waiting in queue", job_id) |
|
1257 |
notified_queued = True |
|
1258 |
elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock: |
|
1259 |
ToStderr("Job %s is trying to acquire all necessary locks", job_id) |
|
1260 |
notified_waitlock = True |
|
1248 |
|
|
1249 |
if result == constants.JOB_NOTCHANGED: |
|
1250 |
report_cbs.ReportNotChanged(job_id, status) |
|
1261 | 1251 |
|
1262 | 1252 |
# Wait again |
1263 | 1253 |
continue |
... | ... | |
1268 | 1258 |
|
1269 | 1259 |
if log_entries: |
1270 | 1260 |
for log_entry in log_entries: |
1271 |
(serial, timestamp, _, message) = log_entry |
|
1272 |
if callable(feedback_fn): |
|
1273 |
feedback_fn(log_entry[1:]) |
|
1274 |
else: |
|
1275 |
encoded = utils.SafeEncode(message) |
|
1276 |
ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded) |
|
1261 |
(serial, timestamp, log_type, message) = log_entry |
|
1262 |
report_cbs.ReportLogMessage(job_id, serial, timestamp, |
|
1263 |
log_type, message) |
|
1277 | 1264 |
prev_logmsg_serial = max(prev_logmsg_serial, serial) |
1278 | 1265 |
|
1279 | 1266 |
# TODO: Handle canceled and archived jobs |
... | ... | |
1285 | 1272 |
|
1286 | 1273 |
prev_job_info = job_info |
1287 | 1274 |
|
1288 |
jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
|
|
1275 |
jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
|
|
1289 | 1276 |
if not jobs: |
1290 | 1277 |
raise errors.JobLost("Job with id %s lost" % job_id) |
1291 | 1278 |
|
1292 | 1279 |
status, opstatus, result = jobs[0] |
1280 |
|
|
1293 | 1281 |
if status == constants.JOB_STATUS_SUCCESS: |
1294 | 1282 |
return result |
1295 |
elif status in (constants.JOB_STATUS_CANCELING, |
|
1296 |
constants.JOB_STATUS_CANCELED):
|
|
1283 |
|
|
1284 |
if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
|
|
1297 | 1285 |
raise errors.OpExecError("Job was canceled") |
1286 |
|
|
1287 |
has_ok = False |
|
1288 |
for idx, (status, msg) in enumerate(zip(opstatus, result)): |
|
1289 |
if status == constants.OP_STATUS_SUCCESS: |
|
1290 |
has_ok = True |
|
1291 |
elif status == constants.OP_STATUS_ERROR: |
|
1292 |
errors.MaybeRaise(msg) |
|
1293 |
|
|
1294 |
if has_ok: |
|
1295 |
raise errors.OpExecError("partial failure (opcode %d): %s" % |
|
1296 |
(idx, msg)) |
|
1297 |
|
|
1298 |
raise errors.OpExecError(str(msg)) |
|
1299 |
|
|
1300 |
# default failure mode |
|
1301 |
raise errors.OpExecError(result) |
|
1302 |
|
|
1303 |
|
|
1304 |
class JobPollCbBase: |
|
1305 |
"""Base class for L{GenericPollJob} callbacks. |
|
1306 |
|
|
1307 |
""" |
|
1308 |
def __init__(self): |
|
1309 |
"""Initializes this class. |
|
1310 |
|
|
1311 |
""" |
|
1312 |
|
|
1313 |
def WaitForJobChangeOnce(self, job_id, fields, |
|
1314 |
prev_job_info, prev_log_serial): |
|
1315 |
"""Waits for changes on a job. |
|
1316 |
|
|
1317 |
""" |
|
1318 |
raise NotImplementedError() |
|
1319 |
|
|
1320 |
def QueryJobs(self, job_ids, fields): |
|
1321 |
"""Returns the selected fields for the selected job IDs. |
|
1322 |
|
|
1323 |
@type job_ids: list of numbers |
|
1324 |
@param job_ids: Job IDs |
|
1325 |
@type fields: list of strings |
|
1326 |
@param fields: Fields |
|
1327 |
|
|
1328 |
""" |
|
1329 |
raise NotImplementedError() |
|
1330 |
|
|
1331 |
|
|
1332 |
class JobPollReportCbBase: |
|
1333 |
"""Base class for L{GenericPollJob} reporting callbacks. |
|
1334 |
|
|
1335 |
""" |
|
1336 |
def __init__(self): |
|
1337 |
"""Initializes this class. |
|
1338 |
|
|
1339 |
""" |
|
1340 |
|
|
1341 |
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
|
1342 |
"""Handles a log message. |
|
1343 |
|
|
1344 |
""" |
|
1345 |
raise NotImplementedError() |
|
1346 |
|
|
1347 |
def ReportNotChanged(self, job_id, status): |
|
1348 |
"""Called for if a job hasn't changed in a while. |
|
1349 |
|
|
1350 |
@type job_id: number |
|
1351 |
@param job_id: Job ID |
|
1352 |
@type status: string or None |
|
1353 |
@param status: Job status if available |
|
1354 |
|
|
1355 |
""" |
|
1356 |
raise NotImplementedError() |
|
1357 |
|
|
1358 |
|
|
1359 |
class _LuxiJobPollCb(JobPollCbBase): |
|
1360 |
def __init__(self, cl): |
|
1361 |
"""Initializes this class. |
|
1362 |
|
|
1363 |
""" |
|
1364 |
JobPollCbBase.__init__(self) |
|
1365 |
self.cl = cl |
|
1366 |
|
|
1367 |
def WaitForJobChangeOnce(self, job_id, fields, |
|
1368 |
prev_job_info, prev_log_serial): |
|
1369 |
"""Waits for changes on a job. |
|
1370 |
|
|
1371 |
""" |
|
1372 |
return self.cl.WaitForJobChangeOnce(job_id, fields, |
|
1373 |
prev_job_info, prev_log_serial) |
|
1374 |
|
|
1375 |
def QueryJobs(self, job_ids, fields): |
|
1376 |
"""Returns the selected fields for the selected job IDs. |
|
1377 |
|
|
1378 |
""" |
|
1379 |
return self.cl.QueryJobs(job_ids, fields) |
|
1380 |
|
|
1381 |
|
|
1382 |
class FeedbackFnJobPollReportCb(JobPollReportCbBase): |
|
1383 |
def __init__(self, feedback_fn): |
|
1384 |
"""Initializes this class. |
|
1385 |
|
|
1386 |
""" |
|
1387 |
JobPollReportCbBase.__init__(self) |
|
1388 |
|
|
1389 |
self.feedback_fn = feedback_fn |
|
1390 |
|
|
1391 |
assert callable(feedback_fn) |
|
1392 |
|
|
1393 |
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
|
1394 |
"""Handles a log message. |
|
1395 |
|
|
1396 |
""" |
|
1397 |
self.feedback_fn((timestamp, log_type, log_msg)) |
|
1398 |
|
|
1399 |
def ReportNotChanged(self, job_id, status): |
|
1400 |
"""Called if a job hasn't changed in a while. |
|
1401 |
|
|
1402 |
""" |
|
1403 |
# Ignore |
|
1404 |
|
|
1405 |
|
|
1406 |
class StdioJobPollReportCb(JobPollReportCbBase): |
|
1407 |
def __init__(self): |
|
1408 |
"""Initializes this class. |
|
1409 |
|
|
1410 |
""" |
|
1411 |
JobPollReportCbBase.__init__(self) |
|
1412 |
|
|
1413 |
self.notified_queued = False |
|
1414 |
self.notified_waitlock = False |
|
1415 |
|
|
1416 |
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
|
1417 |
"""Handles a log message. |
|
1418 |
|
|
1419 |
""" |
|
1420 |
ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), |
|
1421 |
utils.SafeEncode(log_msg)) |
|
1422 |
|
|
1423 |
def ReportNotChanged(self, job_id, status): |
|
1424 |
"""Called if a job hasn't changed in a while. |
|
1425 |
|
|
1426 |
""" |
|
1427 |
if status is None: |
|
1428 |
return |
|
1429 |
|
|
1430 |
if status == constants.JOB_STATUS_QUEUED and not self.notified_queued: |
|
1431 |
ToStderr("Job %s is waiting in queue", job_id) |
|
1432 |
self.notified_queued = True |
|
1433 |
|
|
1434 |
elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock: |
|
1435 |
ToStderr("Job %s is trying to acquire all necessary locks", job_id) |
|
1436 |
self.notified_waitlock = True |
|
1437 |
|
|
1438 |
|
|
1439 |
def PollJob(job_id, cl=None, feedback_fn=None): |
|
1440 |
"""Function to poll for the result of a job. |
|
1441 |
|
|
1442 |
@type job_id: job identified |
|
1443 |
@param job_id: the job to poll for results |
|
1444 |
@type cl: luxi.Client |
|
1445 |
@param cl: the luxi client to use for communicating with the master; |
|
1446 |
if None, a new client will be created |
|
1447 |
|
|
1448 |
""" |
|
1449 |
if cl is None: |
|
1450 |
cl = GetClient() |
|
1451 |
|
|
1452 |
if feedback_fn: |
|
1453 |
reporter = FeedbackFnJobPollReportCb(feedback_fn) |
|
1298 | 1454 |
else: |
1299 |
has_ok = False |
|
1300 |
for idx, (status, msg) in enumerate(zip(opstatus, result)): |
|
1301 |
if status == constants.OP_STATUS_SUCCESS: |
|
1302 |
has_ok = True |
|
1303 |
elif status == constants.OP_STATUS_ERROR: |
|
1304 |
errors.MaybeRaise(msg) |
|
1305 |
if has_ok: |
|
1306 |
raise errors.OpExecError("partial failure (opcode %d): %s" % |
|
1307 |
(idx, msg)) |
|
1308 |
else: |
|
1309 |
raise errors.OpExecError(str(msg)) |
|
1310 |
# default failure mode |
|
1311 |
raise errors.OpExecError(result) |
|
1455 |
reporter = StdioJobPollReportCb() |
|
1456 |
|
|
1457 |
return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter) |
|
1312 | 1458 |
|
1313 | 1459 |
|
1314 | 1460 |
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None): |
b/test/ganeti.cli_unittest.py | ||
---|---|---|
29 | 29 |
|
30 | 30 |
from ganeti import constants |
31 | 31 |
from ganeti import cli |
32 |
from ganeti import errors |
|
33 |
from ganeti import utils |
|
32 | 34 |
from ganeti.errors import OpPrereqError, ParameterError |
33 | 35 |
|
34 | 36 |
|
... | ... | |
100 | 102 |
|
101 | 103 |
|
102 | 104 |
class TestToStream(unittest.TestCase): |
103 |
"""Thes the ToStream functions"""
|
|
105 |
"""Test the ToStream functions"""
|
|
104 | 106 |
|
105 | 107 |
def testBasic(self): |
106 | 108 |
for data in ["foo", |
... | ... | |
246 | 248 |
None, None, "m", exp) |
247 | 249 |
|
248 | 250 |
|
251 |
class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase): |
|
252 |
def __init__(self, tc, job_id): |
|
253 |
self.tc = tc |
|
254 |
self.job_id = job_id |
|
255 |
self._wfjcr = [] |
|
256 |
self._jobstatus = [] |
|
257 |
self._expect_notchanged = False |
|
258 |
self._expect_log = [] |
|
259 |
|
|
260 |
def CheckEmpty(self): |
|
261 |
self.tc.assertFalse(self._wfjcr) |
|
262 |
self.tc.assertFalse(self._jobstatus) |
|
263 |
self.tc.assertFalse(self._expect_notchanged) |
|
264 |
self.tc.assertFalse(self._expect_log) |
|
265 |
|
|
266 |
def AddWfjcResult(self, *args): |
|
267 |
self._wfjcr.append(args) |
|
268 |
|
|
269 |
def AddQueryJobsResult(self, *args): |
|
270 |
self._jobstatus.append(args) |
|
271 |
|
|
272 |
def WaitForJobChangeOnce(self, job_id, fields, |
|
273 |
prev_job_info, prev_log_serial): |
|
274 |
self.tc.assertEqual(job_id, self.job_id) |
|
275 |
self.tc.assertEqualValues(fields, ["status"]) |
|
276 |
self.tc.assertFalse(self._expect_notchanged) |
|
277 |
self.tc.assertFalse(self._expect_log) |
|
278 |
|
|
279 |
(exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0) |
|
280 |
self.tc.assertEqualValues(prev_job_info, exp_prev_job_info) |
|
281 |
self.tc.assertEqual(prev_log_serial, exp_prev_log_serial) |
|
282 |
|
|
283 |
if result == constants.JOB_NOTCHANGED: |
|
284 |
self._expect_notchanged = True |
|
285 |
elif result: |
|
286 |
(_, logmsgs) = result |
|
287 |
if logmsgs: |
|
288 |
self._expect_log.extend(logmsgs) |
|
289 |
|
|
290 |
return result |
|
291 |
|
|
292 |
def QueryJobs(self, job_ids, fields): |
|
293 |
self.tc.assertEqual(job_ids, [self.job_id]) |
|
294 |
self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"]) |
|
295 |
self.tc.assertFalse(self._expect_notchanged) |
|
296 |
self.tc.assertFalse(self._expect_log) |
|
297 |
|
|
298 |
result = self._jobstatus.pop(0) |
|
299 |
self.tc.assertEqual(len(fields), len(result)) |
|
300 |
return [result] |
|
301 |
|
|
302 |
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
|
303 |
self.tc.assertEqual(job_id, self.job_id) |
|
304 |
self.tc.assertEqualValues((serial, timestamp, log_type, log_msg), |
|
305 |
self._expect_log.pop(0)) |
|
306 |
|
|
307 |
def ReportNotChanged(self, job_id, status): |
|
308 |
self.tc.assertEqual(job_id, self.job_id) |
|
309 |
self.tc.assert_(self._expect_notchanged) |
|
310 |
self._expect_notchanged = False |
|
311 |
|
|
312 |
|
|
313 |
class TestGenericPollJob(testutils.GanetiTestCase): |
|
314 |
def testSuccessWithLog(self): |
|
315 |
job_id = 29609 |
|
316 |
cbs = _MockJobPollCb(self, job_id) |
|
317 |
|
|
318 |
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) |
|
319 |
|
|
320 |
cbs.AddWfjcResult(None, None, |
|
321 |
((constants.JOB_STATUS_QUEUED, ), None)) |
|
322 |
|
|
323 |
cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None, |
|
324 |
constants.JOB_NOTCHANGED) |
|
325 |
|
|
326 |
cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None, |
|
327 |
((constants.JOB_STATUS_RUNNING, ), |
|
328 |
[(1, utils.SplitTime(1273491611.0), |
|
329 |
constants.ELOG_MESSAGE, "Step 1"), |
|
330 |
(2, utils.SplitTime(1273491615.9), |
|
331 |
constants.ELOG_MESSAGE, "Step 2"), |
|
332 |
(3, utils.SplitTime(1273491625.02), |
|
333 |
constants.ELOG_MESSAGE, "Step 3"), |
|
334 |
(4, utils.SplitTime(1273491635.05), |
|
335 |
constants.ELOG_MESSAGE, "Step 4"), |
|
336 |
(37, utils.SplitTime(1273491645.0), |
|
337 |
constants.ELOG_MESSAGE, "Step 5"), |
|
338 |
(203, utils.SplitTime(127349155.0), |
|
339 |
constants.ELOG_MESSAGE, "Step 6")])) |
|
340 |
|
|
341 |
cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203, |
|
342 |
((constants.JOB_STATUS_RUNNING, ), |
|
343 |
[(300, utils.SplitTime(1273491711.01), |
|
344 |
constants.ELOG_MESSAGE, "Step X"), |
|
345 |
(302, utils.SplitTime(1273491815.8), |
|
346 |
constants.ELOG_MESSAGE, "Step Y"), |
|
347 |
(303, utils.SplitTime(1273491925.32), |
|
348 |
constants.ELOG_MESSAGE, "Step Z")])) |
|
349 |
|
|
350 |
cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303, |
|
351 |
((constants.JOB_STATUS_SUCCESS, ), None)) |
|
352 |
|
|
353 |
cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS, |
|
354 |
[constants.OP_STATUS_SUCCESS, |
|
355 |
constants.OP_STATUS_SUCCESS], |
|
356 |
["Hello World", "Foo man bar"]) |
|
357 |
|
|
358 |
self.assertEqual(["Hello World", "Foo man bar"], |
|
359 |
cli.GenericPollJob(job_id, cbs, cbs)) |
|
360 |
cbs.CheckEmpty() |
|
361 |
|
|
362 |
def testJobLost(self): |
|
363 |
job_id = 13746 |
|
364 |
|
|
365 |
cbs = _MockJobPollCb(self, job_id) |
|
366 |
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) |
|
367 |
cbs.AddWfjcResult(None, None, None) |
|
368 |
self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs) |
|
369 |
cbs.CheckEmpty() |
|
370 |
|
|
371 |
def testError(self): |
|
372 |
job_id = 31088 |
|
373 |
|
|
374 |
cbs = _MockJobPollCb(self, job_id) |
|
375 |
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) |
|
376 |
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None)) |
|
377 |
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR, |
|
378 |
[constants.OP_STATUS_SUCCESS, |
|
379 |
constants.OP_STATUS_ERROR], |
|
380 |
["Hello World", "Error code 123"]) |
|
381 |
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs) |
|
382 |
cbs.CheckEmpty() |
|
383 |
|
|
384 |
def testError2(self): |
|
385 |
job_id = 22235 |
|
386 |
|
|
387 |
cbs = _MockJobPollCb(self, job_id) |
|
388 |
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None)) |
|
389 |
encexc = errors.EncodeException(errors.LockError("problem")) |
|
390 |
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR, |
|
391 |
[constants.OP_STATUS_ERROR], [encexc]) |
|
392 |
self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs) |
|
393 |
cbs.CheckEmpty() |
|
394 |
|
|
395 |
def testWeirdError(self): |
|
396 |
job_id = 28847 |
|
397 |
|
|
398 |
cbs = _MockJobPollCb(self, job_id) |
|
399 |
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None)) |
|
400 |
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR, |
|
401 |
[constants.OP_STATUS_RUNNING, |
|
402 |
constants.OP_STATUS_RUNNING], |
|
403 |
[None, None]) |
|
404 |
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs) |
|
405 |
cbs.CheckEmpty() |
|
406 |
|
|
407 |
def testCancel(self): |
|
408 |
job_id = 4275 |
|
409 |
|
|
410 |
cbs = _MockJobPollCb(self, job_id) |
|
411 |
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) |
|
412 |
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None)) |
|
413 |
cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING, |
|
414 |
[constants.OP_STATUS_CANCELING, |
|
415 |
constants.OP_STATUS_CANCELING], |
|
416 |
[None, None]) |
|
417 |
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs) |
|
418 |
cbs.CheckEmpty() |
|
419 |
|
|
420 |
|
|
249 | 421 |
if __name__ == '__main__': |
250 | 422 |
testutils.GanetiTestProgram() |
Also available in: Unified diff