Revision 009e73d0 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
796 | 796 |
""" |
797 | 797 |
return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY) |
798 | 798 |
|
799 |
def _NewSerialUnlocked(self):
|
|
799 |
def _NewSerialsUnlocked(self, count):
|
|
800 | 800 |
"""Generates a new job identifier. |
801 | 801 |
|
802 | 802 |
Job identifiers are unique during the lifetime of a cluster. |
803 | 803 |
|
804 |
@type count: integer |
|
805 |
@param count: how many serials to return |
|
804 | 806 |
@rtype: str |
805 | 807 |
@return: a string representing the job identifier. |
806 | 808 |
|
807 | 809 |
""" |
810 |
assert count > 0 |
|
808 | 811 |
# New number |
809 |
serial = self._last_serial + 1
|
|
812 |
serial = self._last_serial + count
|
|
810 | 813 |
|
811 | 814 |
# Write to file |
812 | 815 |
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE, |
813 | 816 |
"%s\n" % serial) |
814 | 817 |
|
818 |
result = [self._FormatJobID(v) |
|
819 |
for v in range(self._last_serial, serial + 1)] |
|
815 | 820 |
# Keep it only if we were able to write the file |
816 | 821 |
self._last_serial = serial |
817 | 822 |
|
818 |
return self._FormatJobID(serial)
|
|
823 |
return result
|
|
819 | 824 |
|
820 | 825 |
@staticmethod |
821 | 826 |
def _GetJobPath(job_id): |
... | ... | |
981 | 986 |
return True |
982 | 987 |
|
983 | 988 |
@_RequireOpenQueue |
984 |
def _SubmitJobUnlocked(self, ops): |
|
989 |
def _SubmitJobUnlocked(self, job_id, ops):
|
|
985 | 990 |
"""Create and store a new job. |
986 | 991 |
|
987 | 992 |
This enters the job into our job queue and also puts it on the new |
988 | 993 |
queue, in order for it to be picked up by the queue processors. |
989 | 994 |
|
995 |
@type job_id: job ID |
|
996 |
@param jod_id: the job ID for the new job |
|
990 | 997 |
@type ops: list |
991 | 998 |
@param ops: The list of OpCodes that will become the new job. |
992 | 999 |
@rtype: job ID |
... | ... | |
1008 | 1015 |
if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: |
1009 | 1016 |
raise errors.JobQueueFull() |
1010 | 1017 |
|
1011 |
# Get job identifier |
|
1012 |
job_id = self._NewSerialUnlocked() |
|
1013 | 1018 |
job = _QueuedJob(self, job_id, ops) |
1014 | 1019 |
|
1015 | 1020 |
# Write to disk |
... | ... | |
1031 | 1036 |
@see: L{_SubmitJobUnlocked} |
1032 | 1037 |
|
1033 | 1038 |
""" |
1034 |
return self._SubmitJobUnlocked(ops) |
|
1039 |
job_id = self._NewSerialsUnlocked(1)[0] |
|
1040 |
return self._SubmitJobUnlocked(job_id, ops) |
|
1035 | 1041 |
|
1036 | 1042 |
@utils.LockedMethod |
1037 | 1043 |
@_RequireOpenQueue |
... | ... | |
1042 | 1048 |
|
1043 | 1049 |
""" |
1044 | 1050 |
results = [] |
1045 |
for ops in jobs: |
|
1051 |
all_job_ids = self._NewSerialsUnlocked(len(jobs)) |
|
1052 |
for job_id, ops in zip(all_job_ids, jobs): |
|
1046 | 1053 |
try: |
1047 |
data = self._SubmitJobUnlocked(ops) |
|
1054 |
data = self._SubmitJobUnlocked(job_id, ops)
|
|
1048 | 1055 |
status = True |
1049 | 1056 |
except errors.GenericError, err: |
1050 | 1057 |
data = str(err) |
Also available in: Unified diff