Revision 7b5c4a69 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
975 | 975 |
""" |
976 | 976 |
logging.info("Inspecting job queue") |
977 | 977 |
|
978 |
restartjobs = [] |
|
979 |
|
|
978 | 980 |
all_job_ids = self._GetJobIDsUnlocked() |
979 | 981 |
jobs_count = len(all_job_ids) |
980 | 982 |
lastinfo = time.time() |
... | ... | |
995 | 997 |
status = job.CalcStatus() |
996 | 998 |
|
997 | 999 |
if status in (constants.JOB_STATUS_QUEUED, ): |
998 |
self._wpool.AddTask((job, ))
|
|
1000 |
restartjobs.append(job)
|
|
999 | 1001 |
|
1000 | 1002 |
elif status in (constants.JOB_STATUS_RUNNING, |
1001 | 1003 |
constants.JOB_STATUS_WAITLOCK, |
... | ... | |
1005 | 1007 |
"Unclean master daemon shutdown") |
1006 | 1008 |
self.UpdateJobUnlocked(job) |
1007 | 1009 |
|
1010 |
if restartjobs: |
|
1011 |
logging.info("Restarting %s jobs", len(restartjobs)) |
|
1012 |
self._EnqueueJobs(restartjobs) |
|
1013 |
|
|
1008 | 1014 |
logging.info("Job queue inspection finished") |
1009 | 1015 |
|
1010 | 1016 |
@locking.ssynchronized(_LOCK) |
... | ... | |
1434 | 1440 |
|
1435 | 1441 |
""" |
1436 | 1442 |
job_id = self._NewSerialsUnlocked(1)[0] |
1437 |
self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
|
|
1443 |
self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
|
|
1438 | 1444 |
return job_id |
1439 | 1445 |
|
1440 | 1446 |
@locking.ssynchronized(_LOCK) |
... | ... | |
1446 | 1452 |
|
1447 | 1453 |
""" |
1448 | 1454 |
results = [] |
1449 |
tasks = []
|
|
1455 |
added_jobs = []
|
|
1450 | 1456 |
all_job_ids = self._NewSerialsUnlocked(len(jobs)) |
1451 | 1457 |
for job_id, ops in zip(all_job_ids, jobs): |
1452 | 1458 |
try: |
1453 |
tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
|
|
1459 |
added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
|
|
1454 | 1460 |
status = True |
1455 | 1461 |
data = job_id |
1456 | 1462 |
except errors.GenericError, err: |
1457 | 1463 |
data = str(err) |
1458 | 1464 |
status = False |
1459 | 1465 |
results.append((status, data)) |
1460 |
self._wpool.AddManyTasks(tasks) |
|
1466 |
|
|
1467 |
self._EnqueueJobs(added_jobs) |
|
1461 | 1468 |
|
1462 | 1469 |
return results |
1463 | 1470 |
|
1471 |
def _EnqueueJobs(self, jobs): |
|
1472 |
"""Helper function to add jobs to worker pool's queue. |
|
1473 |
|
|
1474 |
@type jobs: list |
|
1475 |
@param jobs: List of all jobs |
|
1476 |
|
|
1477 |
""" |
|
1478 |
self._wpool.AddManyTasks([(job, ) for job in jobs], |
|
1479 |
priority=[job.CalcPriority() for job in jobs]) |
|
1480 |
|
|
1464 | 1481 |
@_RequireOpenQueue |
1465 | 1482 |
def UpdateJobUnlocked(self, job, replicate=True): |
1466 | 1483 |
"""Update a job's on disk storage. |
Also available in: Unified diff