Revision b247c6fc lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
1982 | 1982 |
raise errors.GenericError("Opcode %s has invalid priority %s, allowed" |
1983 | 1983 |
" are %s" % (idx, op.priority, allowed)) |
1984 | 1984 |
|
1985 |
dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) |
|
1986 |
if not opcodes.TNoRelativeJobDependencies(dependencies): |
|
1987 |
raise errors.GenericError("Opcode %s has invalid dependencies, must" |
|
1988 |
" match %s: %s" % |
|
1989 |
(idx, opcodes.TNoRelativeJobDependencies, |
|
1990 |
dependencies)) |
|
1991 |
|
|
1985 | 1992 |
# Write to disk |
1986 | 1993 |
self.UpdateJobUnlocked(job) |
1987 | 1994 |
|
... | ... | |
2000 | 2007 |
@see: L{_SubmitJobUnlocked} |
2001 | 2008 |
|
2002 | 2009 |
""" |
2003 |
job_id = self._NewSerialsUnlocked(1)[0]
|
|
2010 |
(job_id, ) = self._NewSerialsUnlocked(1)
|
|
2004 | 2011 |
self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) |
2005 | 2012 |
return job_id |
2006 | 2013 |
|
... | ... | |
2012 | 2019 |
@see: L{_SubmitJobUnlocked} |
2013 | 2020 |
|
2014 | 2021 |
""" |
2015 |
results = [] |
|
2016 |
added_jobs = [] |
|
2017 | 2022 |
all_job_ids = self._NewSerialsUnlocked(len(jobs)) |
2018 |
for job_id, ops in zip(all_job_ids, jobs): |
|
2019 |
try: |
|
2020 |
added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) |
|
2021 |
status = True |
|
2022 |
data = job_id |
|
2023 |
except errors.GenericError, err: |
|
2024 |
data = ("%s; opcodes %s" % |
|
2025 |
(err, utils.CommaJoin(op.Summary() for op in ops))) |
|
2026 |
status = False |
|
2027 |
results.append((status, data)) |
|
2023 |
|
|
2024 |
(results, added_jobs) = \ |
|
2025 |
self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) |
|
2028 | 2026 |
|
2029 | 2027 |
self._EnqueueJobs(added_jobs) |
2030 | 2028 |
|
2031 | 2029 |
return results |
2032 | 2030 |
|
2031 |
@staticmethod |
|
2032 |
def _FormatSubmitError(msg, ops): |
|
2033 |
"""Formats errors which occurred while submitting a job. |
|
2034 |
|
|
2035 |
""" |
|
2036 |
return ("%s; opcodes %s" % |
|
2037 |
(msg, utils.CommaJoin(op.Summary() for op in ops))) |
|
2038 |
|
|
2039 |
@staticmethod |
|
2040 |
def _ResolveJobDependencies(resolve_fn, deps): |
|
2041 |
"""Resolves relative job IDs in dependencies. |
|
2042 |
|
|
2043 |
@type resolve_fn: callable |
|
2044 |
@param resolve_fn: Function to resolve a relative job ID |
|
2045 |
@type deps: list |
|
2046 |
@param deps: Dependencies |
|
2047 |
@rtype: list |
|
2048 |
@return: Resolved dependencies |
|
2049 |
|
|
2050 |
""" |
|
2051 |
result = [] |
|
2052 |
|
|
2053 |
for (dep_job_id, dep_status) in deps: |
|
2054 |
if ht.TRelativeJobId(dep_job_id): |
|
2055 |
assert ht.TInt(dep_job_id) and dep_job_id < 0 |
|
2056 |
try: |
|
2057 |
job_id = resolve_fn(dep_job_id) |
|
2058 |
except IndexError: |
|
2059 |
# Abort |
|
2060 |
return (False, "Unable to resolve relative job ID %s" % dep_job_id) |
|
2061 |
else: |
|
2062 |
job_id = dep_job_id |
|
2063 |
|
|
2064 |
result.append((job_id, dep_status)) |
|
2065 |
|
|
2066 |
return (True, result) |
|
2067 |
|
|
2068 |
def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids): |
|
2069 |
"""Create and store multiple jobs. |
|
2070 |
|
|
2071 |
@see: L{_SubmitJobUnlocked} |
|
2072 |
|
|
2073 |
""" |
|
2074 |
results = [] |
|
2075 |
added_jobs = [] |
|
2076 |
|
|
2077 |
def resolve_fn(job_idx, reljobid): |
|
2078 |
assert reljobid < 0 |
|
2079 |
return (previous_job_ids + job_ids[:job_idx])[reljobid] |
|
2080 |
|
|
2081 |
for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): |
|
2082 |
for op in ops: |
|
2083 |
if getattr(op, opcodes.DEPEND_ATTR, None): |
|
2084 |
(status, data) = \ |
|
2085 |
self._ResolveJobDependencies(compat.partial(resolve_fn, idx), |
|
2086 |
op.depends) |
|
2087 |
if not status: |
|
2088 |
# Abort resolving dependencies |
|
2089 |
assert ht.TNonEmptyString(data), "No error message" |
|
2090 |
break |
|
2091 |
# Use resolved dependencies |
|
2092 |
op.depends = data |
|
2093 |
else: |
|
2094 |
try: |
|
2095 |
job = self._SubmitJobUnlocked(job_id, ops) |
|
2096 |
except errors.GenericError, err: |
|
2097 |
status = False |
|
2098 |
data = self._FormatSubmitError(str(err), ops) |
|
2099 |
else: |
|
2100 |
status = True |
|
2101 |
data = job_id |
|
2102 |
added_jobs.append(job) |
|
2103 |
|
|
2104 |
results.append((status, data)) |
|
2105 |
|
|
2106 |
return (results, added_jobs) |
|
2107 |
|
|
2033 | 2108 |
def _EnqueueJobs(self, jobs): |
2034 | 2109 |
"""Helper function to add jobs to worker pool's queue. |
2035 | 2110 |
|
Also available in: Unified diff