Statistics
| Branch: | Tag: | Revision:

root / lib / jstore.py @ 176b0ee2

History | View | Annotate | Download (5.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import errno
25
import os
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import runtime
30
from ganeti import utils
31

    
32

    
33
JOBS_PER_ARCHIVE_DIRECTORY = 10000
34

    
35

    
36
def _ReadNumericFile(file_name):
37
  """Reads a file containing a number.
38

39
  @rtype: None or int
40
  @return: None if file is not found, otherwise number
41

42
  """
43
  try:
44
    return int(utils.ReadFile(file_name))
45
  except EnvironmentError, err:
46
    if err.errno in (errno.ENOENT, ):
47
      return None
48
    raise
49

    
50

    
51
def ReadSerial():
52
  """Read the serial file.
53

54
  The queue should be locked while this function is called.
55

56
  """
57
  return _ReadNumericFile(constants.JOB_QUEUE_SERIAL_FILE)
58

    
59

    
60
def ReadVersion():
61
  """Read the queue version.
62

63
  The queue should be locked while this function is called.
64

65
  """
66
  return _ReadNumericFile(constants.JOB_QUEUE_VERSION_FILE)
67

    
68

    
69
def InitAndVerifyQueue(must_lock):
70
  """Open and lock job queue.
71

72
  If necessary, the queue is automatically initialized.
73

74
  @type must_lock: bool
75
  @param must_lock: Whether an exclusive lock must be held.
76
  @rtype: utils.FileLock
77
  @return: Lock object for the queue. This can be used to change the
78
           locking mode.
79

80
  """
81
  getents = runtime.GetEnts()
82

    
83
  # Lock queue
84
  queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
85
  try:
86
    # The queue needs to be locked in exclusive mode to write to the serial and
87
    # version files.
88
    if must_lock:
89
      queue_lock.Exclusive(blocking=True)
90
      holding_lock = True
91
    else:
92
      try:
93
        queue_lock.Exclusive(blocking=False)
94
        holding_lock = True
95
      except errors.LockError:
96
        # Ignore errors and assume the process keeping the lock checked
97
        # everything.
98
        holding_lock = False
99

    
100
    if holding_lock:
101
      # Verify version
102
      version = ReadVersion()
103
      if version is None:
104
        # Write new version file
105
        utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
106
                        uid=getents.masterd_uid, gid=getents.masterd_gid,
107
                        data="%s\n" % constants.JOB_QUEUE_VERSION)
108

    
109
        # Read again
110
        version = ReadVersion()
111

    
112
      if version != constants.JOB_QUEUE_VERSION:
113
        raise errors.JobQueueError("Found job queue version %s, expected %s",
114
                                   version, constants.JOB_QUEUE_VERSION)
115

    
116
      serial = ReadSerial()
117
      if serial is None:
118
        # Write new serial file
119
        utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
120
                        uid=getents.masterd_uid, gid=getents.masterd_gid,
121
                        data="%s\n" % 0)
122

    
123
        # Read again
124
        serial = ReadSerial()
125

    
126
      if serial is None:
127
        # There must be a serious problem
128
        raise errors.JobQueueError("Can't read/parse the job queue"
129
                                   " serial file")
130

    
131
      if not must_lock:
132
        # There's no need for more error handling. Closing the lock
133
        # file below in case of an error will unlock it anyway.
134
        queue_lock.Unlock()
135

    
136
  except:
137
    queue_lock.Close()
138
    raise
139

    
140
  return queue_lock
141

    
142

    
143
def CheckDrainFlag():
144
  """Check if the queue is marked to be drained.
145

146
  This currently uses the queue drain file, which makes it a per-node flag.
147
  In the future this can be moved to the config file.
148

149
  @rtype: boolean
150
  @return: True if the job queue is marked drained
151

152
  """
153
  return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
154

    
155

    
156
def SetDrainFlag(drain_flag):
157
  """Sets the drain flag for the queue.
158

159
  @type drain_flag: boolean
160
  @param drain_flag: Whether to set or unset the drain flag
161
  @attention: This function should only called the current holder of the queue
162
    lock
163

164
  """
165
  getents = runtime.GetEnts()
166

    
167
  if drain_flag:
168
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
169
                    uid=getents.masterd_uid, gid=getents.masterd_gid)
170
  else:
171
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
172

    
173
  assert (not drain_flag) ^ CheckDrainFlag()
174

    
175

    
176
def FormatJobID(job_id):
177
  """Convert a job ID to string format.
178

179
  Currently this just does C{str(job_id)} after performing some
180
  checks, but if we want to change the job id format this will
181
  abstract this change.
182

183
  @type job_id: int or long
184
  @param job_id: the numeric job id
185
  @rtype: str
186
  @return: the formatted job id
187

188
  """
189
  if not isinstance(job_id, (int, long)):
190
    raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
191
  if job_id < 0:
192
    raise errors.ProgrammerError("Job ID %s is negative" % job_id)
193

    
194
  return str(job_id)
195

    
196

    
197
def GetArchiveDirectory(job_id):
198
  """Returns the archive directory for a job.
199

200
  @type job_id: str
201
  @param job_id: Job identifier
202
  @rtype: str
203
  @return: Directory name
204

205
  """
206
  return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
207

    
208

    
209
def ParseJobId(job_id):
210
  """Parses a job ID and converts it to integer.
211

212
  """
213
  try:
214
    return int(job_id)
215
  except (ValueError, TypeError):
216
    raise errors.ParameterError("Invalid job ID '%s'" % job_id)