Revision 5d6fb8eb lib/jstore.py

b/lib/jstore.py
69 69
  return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
70 70

  
71 71

  
72
def InitAndVerifyQueue(exclusive):
72
def InitAndVerifyQueue(must_lock):
73 73
  """Open and lock job queue.
74 74

  
75 75
  If necessary, the queue is automatically initialized.
76 76

  
77
  @type exclusive: bool
78
  @param exclusive: Whether to lock the queue in exclusive mode. Shared
79
                    mode otherwise.
77
  @type must_lock: bool
78
  @param must_lock: Whether an exclusive lock must be held.
80 79
  @rtype: utils.FileLock
81 80
  @return: Lock object for the queue. This can be used to change the
82 81
           locking mode.
......
93 92
  # Lock queue
94 93
  queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE)
95 94
  try:
96
    # Determine locking function and call it
97
    if exclusive:
98
      fn = queue_lock.Exclusive
95
    # The queue needs to be locked in exclusive mode to write to the serial and
96
    # version files.
97
    if must_lock:
98
      queue_lock.Exclusive(blocking=True)
99
      holding_lock = True
99 100
    else:
100
      fn = queue_lock.Shared
101

  
102
    fn(blocking=False)
103

  
104
    # Verify version
105
    version = ReadVersion()
106
    if version is None:
107
      # Write new version file
108
      utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
109
                      data="%s\n" % constants.JOB_QUEUE_VERSION)
110

  
111
      # Read again
101
      try:
102
        queue_lock.Exclusive(blocking=False)
103
        holding_lock = True
104
      except errors.LockError:
105
        # Ignore errors and assume the process keeping the lock checked
106
        # everything.
107
        holding_lock = False
108

  
109
    if holding_lock:
110
      # Verify version
112 111
      version = ReadVersion()
112
      if version is None:
113
        # Write new version file
114
        utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
115
                        data="%s\n" % constants.JOB_QUEUE_VERSION)
113 116

  
114
    if version != constants.JOB_QUEUE_VERSION:
115
      raise errors.JobQueueError("Found job queue version %s, expected %s",
116
                                 version, constants.JOB_QUEUE_VERSION)
117
        # Read again
118
        version = ReadVersion()
117 119

  
118
    serial = ReadSerial()
119
    if serial is None:
120
      # Write new serial file
121
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
122
                      data="%s\n" % 0)
120
      if version != constants.JOB_QUEUE_VERSION:
121
        raise errors.JobQueueError("Found job queue version %s, expected %s",
122
                                   version, constants.JOB_QUEUE_VERSION)
123 123

  
124
      # Read again
125 124
      serial = ReadSerial()
125
      if serial is None:
126
        # Write new serial file
127
        utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
128
                        data="%s\n" % 0)
129

  
130
        # Read again
131
        serial = ReadSerial()
126 132

  
127
    if serial is None:
128
      # There must be a serious problem
129
      raise errors.JobQueueError("Can't read/parse the job queue serial file")
133
      if serial is None:
134
        # There must be a serious problem
135
        raise errors.JobQueueError("Can't read/parse the job queue serial file")
130 136

  
131 137
  except:
132 138
    queue_lock.Close()

Also available in: Unified diff