Revision c0f6d0d8 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
174 | 174 |
@ivar received_timestamp: the timestamp for when the job was received |
175 | 175 |
@ivar start_timestmap: the timestamp for start of execution |
176 | 176 |
@ivar end_timestamp: the timestamp for end of execution |
177 |
@ivar writable: Whether the job is allowed to be modified |
|
177 | 178 |
|
178 | 179 |
""" |
179 | 180 |
# pylint: disable-msg=W0212 |
180 | 181 |
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", |
181 | 182 |
"received_timestamp", "start_timestamp", "end_timestamp", |
182 |
"__weakref__", "processor_lock"] |
|
183 |
"__weakref__", "processor_lock", "writable"]
|
|
183 | 184 |
|
184 |
def __init__(self, queue, job_id, ops): |
|
185 |
def __init__(self, queue, job_id, ops, writable):
|
|
185 | 186 |
"""Constructor for the _QueuedJob. |
186 | 187 |
|
187 | 188 |
@type queue: L{JobQueue} |
... | ... | |
191 | 192 |
@type ops: list |
192 | 193 |
@param ops: the list of opcodes we hold, which will be encapsulated |
193 | 194 |
in _QueuedOpCodes |
195 |
@type writable: bool |
|
196 |
@param writable: Whether job can be modified |
|
194 | 197 |
|
195 | 198 |
""" |
196 | 199 |
if not ops: |
... | ... | |
204 | 207 |
self.start_timestamp = None |
205 | 208 |
self.end_timestamp = None |
206 | 209 |
|
207 |
self._InitInMemory(self) |
|
210 |
self._InitInMemory(self, writable)
|
|
208 | 211 |
|
209 | 212 |
@staticmethod |
210 |
def _InitInMemory(obj): |
|
213 |
def _InitInMemory(obj, writable):
|
|
211 | 214 |
"""Initializes in-memory variables. |
212 | 215 |
|
213 | 216 |
""" |
217 |
obj.writable = writable |
|
214 | 218 |
obj.ops_iter = None |
215 | 219 |
obj.cur_opctx = None |
216 | 220 |
obj.processor_lock = threading.Lock() |
... | ... | |
223 | 227 |
return "<%s at %#x>" % (" ".join(status), id(self)) |
224 | 228 |
|
225 | 229 |
@classmethod |
226 |
def Restore(cls, queue, state): |
|
230 |
def Restore(cls, queue, state, writable):
|
|
227 | 231 |
"""Restore a _QueuedJob from serialized state: |
228 | 232 |
|
229 | 233 |
@type queue: L{JobQueue} |
230 | 234 |
@param queue: to which queue the restored job belongs |
231 | 235 |
@type state: dict |
232 | 236 |
@param state: the serialized state |
237 |
@type writable: bool |
|
238 |
@param writable: Whether job can be modified |
|
233 | 239 |
@rtype: _JobQueue |
234 | 240 |
@return: the restored _JobQueue instance |
235 | 241 |
|
... | ... | |
249 | 255 |
obj.log_serial = max(obj.log_serial, log_entry[0]) |
250 | 256 |
obj.ops.append(op) |
251 | 257 |
|
252 |
cls._InitInMemory(obj) |
|
258 |
cls._InitInMemory(obj, writable)
|
|
253 | 259 |
|
254 | 260 |
return obj |
255 | 261 |
|
... | ... | |
583 | 589 |
@param job: Job object |
584 | 590 |
|
585 | 591 |
""" |
592 |
assert not job.writable, "Expected read-only job" |
|
593 |
|
|
586 | 594 |
status = job.CalcStatus() |
587 | 595 |
job_info = job.GetInfo(self._fields) |
588 | 596 |
log_entries = job.GetLogEntries(self._prev_log_serial) |
... | ... | |
1051 | 1059 |
try: |
1052 | 1060 |
opcount = len(job.ops) |
1053 | 1061 |
|
1062 |
assert job.writable, "Expected writable job" |
|
1063 |
|
|
1054 | 1064 |
# Don't do anything for finalized jobs |
1055 | 1065 |
if job.CalcStatus() in constants.JOBS_FINALIZED: |
1056 | 1066 |
return True |
... | ... | |
1206 | 1216 |
|
1207 | 1217 |
return bool(waitjob) |
1208 | 1218 |
finally: |
1219 |
assert job.writable, "Job became read-only while being processed" |
|
1209 | 1220 |
queue.release() |
1210 | 1221 |
|
1211 | 1222 |
|
... | ... | |
1823 | 1834 |
job = self._memcache.get(job_id, None) |
1824 | 1835 |
if job: |
1825 | 1836 |
logging.debug("Found job %s in memcache", job_id) |
1837 |
assert job.writable, "Found read-only job in memcache" |
|
1826 | 1838 |
return job |
1827 | 1839 |
|
1828 | 1840 |
try: |
... | ... | |
1841 | 1853 |
self._RenameFilesUnlocked([(old_path, new_path)]) |
1842 | 1854 |
return None |
1843 | 1855 |
|
1856 |
assert job.writable, "Job just loaded is not writable" |
|
1857 |
|
|
1844 | 1858 |
self._memcache[job_id] = job |
1845 | 1859 |
logging.debug("Added job %s to the cache", job_id) |
1846 | 1860 |
return job |
1847 | 1861 |
|
1848 |
def _LoadJobFromDisk(self, job_id, try_archived): |
|
1862 |
def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
|
|
1849 | 1863 |
"""Load the given job file from disk. |
1850 | 1864 |
|
1851 | 1865 |
Given a job file, read, load and restore it in a _QueuedJob format. |
... | ... | |
1858 | 1872 |
@return: either None or the job object |
1859 | 1873 |
|
1860 | 1874 |
""" |
1861 |
path_functions = [self._GetJobPath]
|
|
1875 |
path_functions = [(self._GetJobPath, True)]
|
|
1862 | 1876 |
|
1863 | 1877 |
if try_archived: |
1864 |
path_functions.append(self._GetArchivedJobPath)
|
|
1878 |
path_functions.append((self._GetArchivedJobPath, False))
|
|
1865 | 1879 |
|
1866 | 1880 |
raw_data = None |
1881 |
writable_default = None |
|
1867 | 1882 |
|
1868 |
for fn in path_functions:
|
|
1883 |
for (fn, writable_default) in path_functions:
|
|
1869 | 1884 |
filepath = fn(job_id) |
1870 | 1885 |
logging.debug("Loading job from %s", filepath) |
1871 | 1886 |
try: |
... | ... | |
1879 | 1894 |
if not raw_data: |
1880 | 1895 |
return None |
1881 | 1896 |
|
1897 |
if writable is None: |
|
1898 |
writable = writable_default |
|
1899 |
|
|
1882 | 1900 |
try: |
1883 | 1901 |
data = serializer.LoadJson(raw_data) |
1884 |
job = _QueuedJob.Restore(self, data) |
|
1902 |
job = _QueuedJob.Restore(self, data, writable)
|
|
1885 | 1903 |
except Exception, err: # pylint: disable-msg=W0703 |
1886 | 1904 |
raise errors.JobFileCorrupted(err) |
1887 | 1905 |
|
1888 | 1906 |
return job |
1889 | 1907 |
|
1890 |
def SafeLoadJobFromDisk(self, job_id, try_archived): |
|
1908 |
def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
|
|
1891 | 1909 |
"""Load the given job file from disk. |
1892 | 1910 |
|
1893 | 1911 |
Given a job file, read, load and restore it in a _QueuedJob format. |
... | ... | |
1903 | 1921 |
|
1904 | 1922 |
""" |
1905 | 1923 |
try: |
1906 |
return self._LoadJobFromDisk(job_id, try_archived) |
|
1924 |
return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
|
|
1907 | 1925 |
except (errors.JobFileCorrupted, EnvironmentError): |
1908 | 1926 |
logging.exception("Can't load/parse job %s", job_id) |
1909 | 1927 |
return None |
... | ... | |
1955 | 1973 |
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT: |
1956 | 1974 |
raise errors.JobQueueFull() |
1957 | 1975 |
|
1958 |
job = _QueuedJob(self, job_id, ops) |
|
1976 |
job = _QueuedJob(self, job_id, ops, True)
|
|
1959 | 1977 |
|
1960 | 1978 |
# Check priority |
1961 | 1979 |
for idx, op in enumerate(job.ops): |
... | ... | |
2036 | 2054 |
# Not using in-memory cache as doing so would require an exclusive lock |
2037 | 2055 |
|
2038 | 2056 |
# Try to load from disk |
2039 |
job = self.SafeLoadJobFromDisk(job_id, True) |
|
2057 |
job = self.SafeLoadJobFromDisk(job_id, True, writable=False) |
|
2058 |
|
|
2059 |
assert not job.writable, "Got writable job" |
|
2040 | 2060 |
|
2041 | 2061 |
if job: |
2042 | 2062 |
return job.CalcStatus() |
... | ... | |
2060 | 2080 |
if __debug__: |
2061 | 2081 |
finalized = job.CalcStatus() in constants.JOBS_FINALIZED |
2062 | 2082 |
assert (finalized ^ (job.end_timestamp is None)) |
2083 |
assert job.writable, "Can't update read-only job" |
|
2063 | 2084 |
|
2064 | 2085 |
filename = self._GetJobPath(job.id) |
2065 | 2086 |
data = serializer.DumpJson(job.Serialize(), indent=False) |
... | ... | |
2090 | 2111 |
as such by the clients |
2091 | 2112 |
|
2092 | 2113 |
""" |
2093 |
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False) |
|
2114 |
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False, |
|
2115 |
writable=False) |
|
2094 | 2116 |
|
2095 | 2117 |
helper = _WaitForJobChangesHelper() |
2096 | 2118 |
|
... | ... | |
2115 | 2137 |
logging.debug("Job %s not found", job_id) |
2116 | 2138 |
return (False, "Job %s not found" % job_id) |
2117 | 2139 |
|
2140 |
assert job.writable, "Can't cancel read-only job" |
|
2141 |
|
|
2118 | 2142 |
(success, msg) = job.Cancel() |
2119 | 2143 |
|
2120 | 2144 |
if success: |
... | ... | |
2137 | 2161 |
archive_jobs = [] |
2138 | 2162 |
rename_files = [] |
2139 | 2163 |
for job in jobs: |
2164 |
assert job.writable, "Can't archive read-only job" |
|
2165 |
|
|
2140 | 2166 |
if job.CalcStatus() not in constants.JOBS_FINALIZED: |
2141 | 2167 |
logging.debug("Job %s is not yet done", job.id) |
2142 | 2168 |
continue |
Also available in: Unified diff