Revision 04ab05ce

b/lib/jqueue.py
35 35
from ganeti import errors
36 36
from ganeti import mcpu
37 37
from ganeti import utils
38
from ganeti import jstore
38 39
from ganeti import rpc
39 40

  
40 41

  
......
267 268

  
268 269
    """
269 270
    def wrapper(self, *args, **kwargs):
270
      assert self.lock_fd, "Queue should be open"
271
      assert self._queue_lock is not None, "Queue should be open"
271 272
      return fn(self, *args, **kwargs)
272 273
    return wrapper
273 274

  
......
281 282
    self.acquire = self._lock.acquire
282 283
    self.release = self._lock.release
283 284

  
284
    # Make sure our directories exists
285
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
286
      try:
287
        os.mkdir(path, 0700)
288
      except OSError, err:
289
        if err.errno not in (errno.EEXIST, ):
290
          raise
291

  
292
    # Get queue lock
293
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
294
    try:
295
      utils.LockFile(self.lock_fd)
296
    except:
297
      self.lock_fd.close()
298
      raise
299

  
300
    # Read version
301
    try:
302
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
303
    except IOError, err:
304
      if err.errno not in (errno.ENOENT, ):
305
        raise
306

  
307
      # Setup a new queue
308
      self._InitQueueUnlocked()
309

  
310
      # Try to open again
311
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
285
    # Initialize
286
    self._queue_lock = jstore.InitAndVerifyQueue(exclusive=True)
312 287

  
313
    try:
314
      # Try to read version
315
      version = int(version_fd.read(128))
316

  
317
      # Verify version
318
      if version != constants.JOB_QUEUE_VERSION:
319
        raise errors.JobQueueError("Found version %s, expected %s",
320
                                   version, constants.JOB_QUEUE_VERSION)
321
    finally:
322
      version_fd.close()
323

  
324
    self._last_serial = self._ReadSerial()
325
    if self._last_serial is None:
326
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
327
                                      " file")
288
    # Read serial file
289
    self._last_serial = jstore.ReadSerial()
290
    assert self._last_serial is not None, ("Serial file was modified between"
291
                                           " check in jstore and here")
328 292

  
329 293
    # Setup worker pool
330 294
    self._wpool = _JobQueueWorkerPool(self)
......
350 314
    finally:
351 315
      self.release()
352 316

  
353
  @staticmethod
354
  def _ReadSerial():
355
    """Try to read the job serial file.
356

  
357
    @rtype: None or int
358
    @return: If the serial can be read, then it is returned. Otherwise None
359
             is returned.
360

  
361
    """
362
    try:
363
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
364
      try:
365
        # Read last serial
366
        serial = int(serial_fd.read(1024).strip())
367
      finally:
368
        serial_fd.close()
369
    except (ValueError, EnvironmentError):
370
      serial = None
371

  
372
    return serial
373

  
374
  def _InitQueueUnlocked(self):
375
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
376
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
377
    if self._ReadSerial() is None:
378
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
379
                      data="%s\n" % 0)
380

  
381 317
  def _FormatJobID(self, job_id):
382 318
    if not isinstance(job_id, (int, long)):
383 319
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
......
657 593
    """
658 594
    self._wpool.TerminateWorkers()
659 595

  
660
    self.lock_fd.close()
661
    self.lock_fd = None
596
    self._queue_lock.Close()
597
    self._queue_lock = None

Also available in: Unified diff