Revision c0f6d0d8

b/lib/jqueue.py
174 174
  @ivar received_timestamp: the timestamp for when the job was received
175 175
  @ivar start_timestmap: the timestamp for start of execution
176 176
  @ivar end_timestamp: the timestamp for end of execution
177
  @ivar writable: Whether the job is allowed to be modified
177 178

  
178 179
  """
179 180
  # pylint: disable-msg=W0212
180 181
  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
181 182
               "received_timestamp", "start_timestamp", "end_timestamp",
182
               "__weakref__", "processor_lock"]
183
               "__weakref__", "processor_lock", "writable"]
183 184

  
184
  def __init__(self, queue, job_id, ops):
185
  def __init__(self, queue, job_id, ops, writable):
185 186
    """Constructor for the _QueuedJob.
186 187

  
187 188
    @type queue: L{JobQueue}
......
191 192
    @type ops: list
192 193
    @param ops: the list of opcodes we hold, which will be encapsulated
193 194
        in _QueuedOpCodes
195
    @type writable: bool
196
    @param writable: Whether job can be modified
194 197

  
195 198
    """
196 199
    if not ops:
......
204 207
    self.start_timestamp = None
205 208
    self.end_timestamp = None
206 209

  
207
    self._InitInMemory(self)
210
    self._InitInMemory(self, writable)
208 211

  
209 212
  @staticmethod
210
  def _InitInMemory(obj):
213
  def _InitInMemory(obj, writable):
211 214
    """Initializes in-memory variables.
212 215

  
213 216
    """
217
    obj.writable = writable
214 218
    obj.ops_iter = None
215 219
    obj.cur_opctx = None
216 220
    obj.processor_lock = threading.Lock()
......
223 227
    return "<%s at %#x>" % (" ".join(status), id(self))
224 228

  
225 229
  @classmethod
226
  def Restore(cls, queue, state):
230
  def Restore(cls, queue, state, writable):
227 231
    """Restore a _QueuedJob from serialized state:
228 232

  
229 233
    @type queue: L{JobQueue}
230 234
    @param queue: to which queue the restored job belongs
231 235
    @type state: dict
232 236
    @param state: the serialized state
237
    @type writable: bool
238
    @param writable: Whether job can be modified
233 239
    @rtype: _JobQueue
234 240
    @return: the restored _JobQueue instance
235 241

  
......
249 255
        obj.log_serial = max(obj.log_serial, log_entry[0])
250 256
      obj.ops.append(op)
251 257

  
252
    cls._InitInMemory(obj)
258
    cls._InitInMemory(obj, writable)
253 259

  
254 260
    return obj
255 261

  
......
583 589
    @param job: Job object
584 590

  
585 591
    """
592
    assert not job.writable, "Expected read-only job"
593

  
586 594
    status = job.CalcStatus()
587 595
    job_info = job.GetInfo(self._fields)
588 596
    log_entries = job.GetLogEntries(self._prev_log_serial)
......
1051 1059
    try:
1052 1060
      opcount = len(job.ops)
1053 1061

  
1062
      assert job.writable, "Expected writable job"
1063

  
1054 1064
      # Don't do anything for finalized jobs
1055 1065
      if job.CalcStatus() in constants.JOBS_FINALIZED:
1056 1066
        return True
......
1206 1216

  
1207 1217
      return bool(waitjob)
1208 1218
    finally:
1219
      assert job.writable, "Job became read-only while being processed"
1209 1220
      queue.release()
1210 1221

  
1211 1222

  
......
1823 1834
    job = self._memcache.get(job_id, None)
1824 1835
    if job:
1825 1836
      logging.debug("Found job %s in memcache", job_id)
1837
      assert job.writable, "Found read-only job in memcache"
1826 1838
      return job
1827 1839

  
1828 1840
    try:
......
1841 1853
        self._RenameFilesUnlocked([(old_path, new_path)])
1842 1854
      return None
1843 1855

  
1856
    assert job.writable, "Job just loaded is not writable"
1857

  
1844 1858
    self._memcache[job_id] = job
1845 1859
    logging.debug("Added job %s to the cache", job_id)
1846 1860
    return job
1847 1861

  
1848
  def _LoadJobFromDisk(self, job_id, try_archived):
1862
  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1849 1863
    """Load the given job file from disk.
1850 1864

  
1851 1865
    Given a job file, read, load and restore it in a _QueuedJob format.
......
1858 1872
    @return: either None or the job object
1859 1873

  
1860 1874
    """
1861
    path_functions = [self._GetJobPath]
1875
    path_functions = [(self._GetJobPath, True)]
1862 1876

  
1863 1877
    if try_archived:
1864
      path_functions.append(self._GetArchivedJobPath)
1878
      path_functions.append((self._GetArchivedJobPath, False))
1865 1879

  
1866 1880
    raw_data = None
1881
    writable_default = None
1867 1882

  
1868
    for fn in path_functions:
1883
    for (fn, writable_default) in path_functions:
1869 1884
      filepath = fn(job_id)
1870 1885
      logging.debug("Loading job from %s", filepath)
1871 1886
      try:
......
1879 1894
    if not raw_data:
1880 1895
      return None
1881 1896

  
1897
    if writable is None:
1898
      writable = writable_default
1899

  
1882 1900
    try:
1883 1901
      data = serializer.LoadJson(raw_data)
1884
      job = _QueuedJob.Restore(self, data)
1902
      job = _QueuedJob.Restore(self, data, writable)
1885 1903
    except Exception, err: # pylint: disable-msg=W0703
1886 1904
      raise errors.JobFileCorrupted(err)
1887 1905

  
1888 1906
    return job
1889 1907

  
1890
  def SafeLoadJobFromDisk(self, job_id, try_archived):
1908
  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1891 1909
    """Load the given job file from disk.
1892 1910

  
1893 1911
    Given a job file, read, load and restore it in a _QueuedJob format.
......
1903 1921

  
1904 1922
    """
1905 1923
    try:
1906
      return self._LoadJobFromDisk(job_id, try_archived)
1924
      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1907 1925
    except (errors.JobFileCorrupted, EnvironmentError):
1908 1926
      logging.exception("Can't load/parse job %s", job_id)
1909 1927
      return None
......
1955 1973
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1956 1974
      raise errors.JobQueueFull()
1957 1975

  
1958
    job = _QueuedJob(self, job_id, ops)
1976
    job = _QueuedJob(self, job_id, ops, True)
1959 1977

  
1960 1978
    # Check priority
1961 1979
    for idx, op in enumerate(job.ops):
......
2036 2054
    # Not using in-memory cache as doing so would require an exclusive lock
2037 2055

  
2038 2056
    # Try to load from disk
2039
    job = self.SafeLoadJobFromDisk(job_id, True)
2057
    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2058

  
2059
    assert not job.writable, "Got writable job"
2040 2060

  
2041 2061
    if job:
2042 2062
      return job.CalcStatus()
......
2060 2080
    if __debug__:
2061 2081
      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2062 2082
      assert (finalized ^ (job.end_timestamp is None))
2083
      assert job.writable, "Can't update read-only job"
2063 2084

  
2064 2085
    filename = self._GetJobPath(job.id)
2065 2086
    data = serializer.DumpJson(job.Serialize(), indent=False)
......
2090 2111
        as such by the clients
2091 2112

  
2092 2113
    """
2093
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
2114
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2115
                             writable=False)
2094 2116

  
2095 2117
    helper = _WaitForJobChangesHelper()
2096 2118

  
......
2115 2137
      logging.debug("Job %s not found", job_id)
2116 2138
      return (False, "Job %s not found" % job_id)
2117 2139

  
2140
    assert job.writable, "Can't cancel read-only job"
2141

  
2118 2142
    (success, msg) = job.Cancel()
2119 2143

  
2120 2144
    if success:
......
2137 2161
    archive_jobs = []
2138 2162
    rename_files = []
2139 2163
    for job in jobs:
2164
      assert job.writable, "Can't archive read-only job"
2165

  
2140 2166
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
2141 2167
        logging.debug("Job %s is not yet done", job.id)
2142 2168
        continue
b/test/ganeti.jqueue_unittest.py
43 43
class _FakeJob:
44 44
  def __init__(self, job_id, status):
45 45
    self.id = job_id
46
    self.writable = False
46 47
    self._status = status
47 48
    self._log = []
48 49

  
......
279 280
class TestQueuedJob(unittest.TestCase):
280 281
  def test(self):
281 282
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283
                      None, 1, [], False)
283 284

  
284 285
  def testDefaults(self):
285 286
    job_id = 4260
......
289 290
      ]
290 291

  
291 292
    def _Check(job):
293
      self.assertTrue(job.writable)
292 294
      self.assertEqual(job.id, job_id)
293 295
      self.assertEqual(job.log_serial, 0)
294 296
      self.assert_(job.received_timestamp)
......
305 307
      self.assertEqual(job.GetInfo(["summary"]),
306 308
                       [[op.input.Summary() for op in job.ops]])
307 309

  
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
310
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
309 311
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
312
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
311 313
    _Check(job2)
312 314
    self.assertEqual(job1.Serialize(), job2.Serialize())
313 315

  
316
  def testWritable(self):
317
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
318
    self.assertFalse(job.writable)
319

  
320
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
321
    self.assertTrue(job.writable)
322

  
314 323
  def testPriority(self):
315 324
    job_id = 4283
316 325
    ops = [
......
323 332
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 333
      self.assert_(repr(job).startswith("<"))
325 334

  
326
    job = jqueue._QueuedJob(None, job_id, ops)
335
    job = jqueue._QueuedJob(None, job_id, ops, True)
327 336
    _Check(job)
328 337
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329 338
                            for op in job.ops))
......
409 418

  
410 419
    def _NewJob():
411 420
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
421
                              [opcodes.OpTestDelay() for _ in range(10)],
422
                              True)
413 423
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 424
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415 425
                              for op in job.ops))
......
550 560

  
551 561
class _JobProcessorTestUtils:
552 562
  def _CreateJob(self, queue, job_id, ops):
553
    job = jqueue._QueuedJob(queue, job_id, ops)
563
    job = jqueue._QueuedJob(queue, job_id, ops, True)
554 564
    self.assertFalse(job.start_timestamp)
555 565
    self.assertFalse(job.end_timestamp)
556 566
    self.assertEqual(len(ops), len(job.ops))
......
972 982
      self.assert_(job.ops_iter)
973 983

  
974 984
      # Serialize and restore (simulates program restart)
975
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
985
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
976 986
      self.assertFalse(newjob.ops_iter)
977 987
      self._TestPartial(newjob, successcount)
978 988

  
......
1016 1026
    self.assertRaises(IndexError, queue.GetNextUpdate)
1017 1027

  
1018 1028
    # ... also after being restored
1019
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
1029
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1020 1030
    # Calling the processor on a finished job should be a no-op
1021 1031
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1022 1032
    self.assertRaises(IndexError, queue.GetNextUpdate)
......
1117 1127
    self._CheckLogMessages(job, logmsgcount)
1118 1128

  
1119 1129
    # Serialize and restore (simulates program restart)
1120
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1130
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1121 1131
    self._CheckLogMessages(newjob, logmsgcount)
1122 1132

  
1123 1133
    # Check each message

Also available in: Unified diff