174 |
174 |
@ivar received_timestamp: the timestamp for when the job was received
|
175 |
175 |
@ivar start_timestmap: the timestamp for start of execution
|
176 |
176 |
@ivar end_timestamp: the timestamp for end of execution
|
|
177 |
@ivar writable: Whether the job is allowed to be modified
|
177 |
178 |
|
178 |
179 |
"""
|
179 |
180 |
# pylint: disable-msg=W0212
|
180 |
181 |
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
|
181 |
182 |
"received_timestamp", "start_timestamp", "end_timestamp",
|
182 |
|
"__weakref__", "processor_lock"]
|
|
183 |
"__weakref__", "processor_lock", "writable"]
|
183 |
184 |
|
184 |
|
def __init__(self, queue, job_id, ops):
|
|
185 |
def __init__(self, queue, job_id, ops, writable):
|
185 |
186 |
"""Constructor for the _QueuedJob.
|
186 |
187 |
|
187 |
188 |
@type queue: L{JobQueue}
|
... | ... | |
191 |
192 |
@type ops: list
|
192 |
193 |
@param ops: the list of opcodes we hold, which will be encapsulated
|
193 |
194 |
in _QueuedOpCodes
|
|
195 |
@type writable: bool
|
|
196 |
@param writable: Whether job can be modified
|
194 |
197 |
|
195 |
198 |
"""
|
196 |
199 |
if not ops:
|
... | ... | |
204 |
207 |
self.start_timestamp = None
|
205 |
208 |
self.end_timestamp = None
|
206 |
209 |
|
207 |
|
self._InitInMemory(self)
|
|
210 |
self._InitInMemory(self, writable)
|
208 |
211 |
|
209 |
212 |
@staticmethod
|
210 |
|
def _InitInMemory(obj):
|
|
213 |
def _InitInMemory(obj, writable):
|
211 |
214 |
"""Initializes in-memory variables.
|
212 |
215 |
|
213 |
216 |
"""
|
|
217 |
obj.writable = writable
|
214 |
218 |
obj.ops_iter = None
|
215 |
219 |
obj.cur_opctx = None
|
216 |
220 |
obj.processor_lock = threading.Lock()
|
... | ... | |
223 |
227 |
return "<%s at %#x>" % (" ".join(status), id(self))
|
224 |
228 |
|
225 |
229 |
@classmethod
|
226 |
|
def Restore(cls, queue, state):
|
|
230 |
def Restore(cls, queue, state, writable):
|
227 |
231 |
"""Restore a _QueuedJob from serialized state:
|
228 |
232 |
|
229 |
233 |
@type queue: L{JobQueue}
|
230 |
234 |
@param queue: to which queue the restored job belongs
|
231 |
235 |
@type state: dict
|
232 |
236 |
@param state: the serialized state
|
|
237 |
@type writable: bool
|
|
238 |
@param writable: Whether job can be modified
|
233 |
239 |
@rtype: _JobQueue
|
234 |
240 |
@return: the restored _JobQueue instance
|
235 |
241 |
|
... | ... | |
249 |
255 |
obj.log_serial = max(obj.log_serial, log_entry[0])
|
250 |
256 |
obj.ops.append(op)
|
251 |
257 |
|
252 |
|
cls._InitInMemory(obj)
|
|
258 |
cls._InitInMemory(obj, writable)
|
253 |
259 |
|
254 |
260 |
return obj
|
255 |
261 |
|
... | ... | |
583 |
589 |
@param job: Job object
|
584 |
590 |
|
585 |
591 |
"""
|
|
592 |
assert not job.writable, "Expected read-only job"
|
|
593 |
|
586 |
594 |
status = job.CalcStatus()
|
587 |
595 |
job_info = job.GetInfo(self._fields)
|
588 |
596 |
log_entries = job.GetLogEntries(self._prev_log_serial)
|
... | ... | |
1051 |
1059 |
try:
|
1052 |
1060 |
opcount = len(job.ops)
|
1053 |
1061 |
|
|
1062 |
assert job.writable, "Expected writable job"
|
|
1063 |
|
1054 |
1064 |
# Don't do anything for finalized jobs
|
1055 |
1065 |
if job.CalcStatus() in constants.JOBS_FINALIZED:
|
1056 |
1066 |
return True
|
... | ... | |
1206 |
1216 |
|
1207 |
1217 |
return bool(waitjob)
|
1208 |
1218 |
finally:
|
|
1219 |
assert job.writable, "Job became read-only while being processed"
|
1209 |
1220 |
queue.release()
|
1210 |
1221 |
|
1211 |
1222 |
|
... | ... | |
1823 |
1834 |
job = self._memcache.get(job_id, None)
|
1824 |
1835 |
if job:
|
1825 |
1836 |
logging.debug("Found job %s in memcache", job_id)
|
|
1837 |
assert job.writable, "Found read-only job in memcache"
|
1826 |
1838 |
return job
|
1827 |
1839 |
|
1828 |
1840 |
try:
|
... | ... | |
1841 |
1853 |
self._RenameFilesUnlocked([(old_path, new_path)])
|
1842 |
1854 |
return None
|
1843 |
1855 |
|
|
1856 |
assert job.writable, "Job just loaded is not writable"
|
|
1857 |
|
1844 |
1858 |
self._memcache[job_id] = job
|
1845 |
1859 |
logging.debug("Added job %s to the cache", job_id)
|
1846 |
1860 |
return job
|
1847 |
1861 |
|
1848 |
|
def _LoadJobFromDisk(self, job_id, try_archived):
|
|
1862 |
def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
|
1849 |
1863 |
"""Load the given job file from disk.
|
1850 |
1864 |
|
1851 |
1865 |
Given a job file, read, load and restore it in a _QueuedJob format.
|
... | ... | |
1858 |
1872 |
@return: either None or the job object
|
1859 |
1873 |
|
1860 |
1874 |
"""
|
1861 |
|
path_functions = [self._GetJobPath]
|
|
1875 |
path_functions = [(self._GetJobPath, True)]
|
1862 |
1876 |
|
1863 |
1877 |
if try_archived:
|
1864 |
|
path_functions.append(self._GetArchivedJobPath)
|
|
1878 |
path_functions.append((self._GetArchivedJobPath, False))
|
1865 |
1879 |
|
1866 |
1880 |
raw_data = None
|
|
1881 |
writable_default = None
|
1867 |
1882 |
|
1868 |
|
for fn in path_functions:
|
|
1883 |
for (fn, writable_default) in path_functions:
|
1869 |
1884 |
filepath = fn(job_id)
|
1870 |
1885 |
logging.debug("Loading job from %s", filepath)
|
1871 |
1886 |
try:
|
... | ... | |
1879 |
1894 |
if not raw_data:
|
1880 |
1895 |
return None
|
1881 |
1896 |
|
|
1897 |
if writable is None:
|
|
1898 |
writable = writable_default
|
|
1899 |
|
1882 |
1900 |
try:
|
1883 |
1901 |
data = serializer.LoadJson(raw_data)
|
1884 |
|
job = _QueuedJob.Restore(self, data)
|
|
1902 |
job = _QueuedJob.Restore(self, data, writable)
|
1885 |
1903 |
except Exception, err: # pylint: disable-msg=W0703
|
1886 |
1904 |
raise errors.JobFileCorrupted(err)
|
1887 |
1905 |
|
1888 |
1906 |
return job
|
1889 |
1907 |
|
1890 |
|
def SafeLoadJobFromDisk(self, job_id, try_archived):
|
|
1908 |
def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
|
1891 |
1909 |
"""Load the given job file from disk.
|
1892 |
1910 |
|
1893 |
1911 |
Given a job file, read, load and restore it in a _QueuedJob format.
|
... | ... | |
1903 |
1921 |
|
1904 |
1922 |
"""
|
1905 |
1923 |
try:
|
1906 |
|
return self._LoadJobFromDisk(job_id, try_archived)
|
|
1924 |
return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
|
1907 |
1925 |
except (errors.JobFileCorrupted, EnvironmentError):
|
1908 |
1926 |
logging.exception("Can't load/parse job %s", job_id)
|
1909 |
1927 |
return None
|
... | ... | |
1955 |
1973 |
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
|
1956 |
1974 |
raise errors.JobQueueFull()
|
1957 |
1975 |
|
1958 |
|
job = _QueuedJob(self, job_id, ops)
|
|
1976 |
job = _QueuedJob(self, job_id, ops, True)
|
1959 |
1977 |
|
1960 |
1978 |
# Check priority
|
1961 |
1979 |
for idx, op in enumerate(job.ops):
|
... | ... | |
2036 |
2054 |
# Not using in-memory cache as doing so would require an exclusive lock
|
2037 |
2055 |
|
2038 |
2056 |
# Try to load from disk
|
2039 |
|
job = self.SafeLoadJobFromDisk(job_id, True)
|
|
2057 |
job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
|
|
2058 |
|
|
2059 |
assert not job.writable, "Got writable job"
|
2040 |
2060 |
|
2041 |
2061 |
if job:
|
2042 |
2062 |
return job.CalcStatus()
|
... | ... | |
2060 |
2080 |
if __debug__:
|
2061 |
2081 |
finalized = job.CalcStatus() in constants.JOBS_FINALIZED
|
2062 |
2082 |
assert (finalized ^ (job.end_timestamp is None))
|
|
2083 |
assert job.writable, "Can't update read-only job"
|
2063 |
2084 |
|
2064 |
2085 |
filename = self._GetJobPath(job.id)
|
2065 |
2086 |
data = serializer.DumpJson(job.Serialize(), indent=False)
|
... | ... | |
2090 |
2111 |
as such by the clients
|
2091 |
2112 |
|
2092 |
2113 |
"""
|
2093 |
|
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
|
|
2114 |
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
|
|
2115 |
writable=False)
|
2094 |
2116 |
|
2095 |
2117 |
helper = _WaitForJobChangesHelper()
|
2096 |
2118 |
|
... | ... | |
2115 |
2137 |
logging.debug("Job %s not found", job_id)
|
2116 |
2138 |
return (False, "Job %s not found" % job_id)
|
2117 |
2139 |
|
|
2140 |
assert job.writable, "Can't cancel read-only job"
|
|
2141 |
|
2118 |
2142 |
(success, msg) = job.Cancel()
|
2119 |
2143 |
|
2120 |
2144 |
if success:
|
... | ... | |
2137 |
2161 |
archive_jobs = []
|
2138 |
2162 |
rename_files = []
|
2139 |
2163 |
for job in jobs:
|
|
2164 |
assert job.writable, "Can't archive read-only job"
|
|
2165 |
|
2140 |
2166 |
if job.CalcStatus() not in constants.JOBS_FINALIZED:
|
2141 |
2167 |
logging.debug("Job %s is not yet done", job.id)
|
2142 |
2168 |
continue
|