Statistics
| Branch: | Tag: | Revision:

root / lib / jstore.py @ 91c17910

History | View | Annotate | Download (5.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import errno
25
import os
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import runtime
30
from ganeti import utils
31
from ganeti import pathutils
32

    
33

    
34
JOBS_PER_ARCHIVE_DIRECTORY = 10000
35

    
36

    
37
def _ReadNumericFile(file_name):
38
  """Reads a file containing a number.
39

40
  @rtype: None or int
41
  @return: None if file is not found, otherwise number
42

43
  """
44
  try:
45
    contents = utils.ReadFile(file_name)
46
  except EnvironmentError, err:
47
    if err.errno in (errno.ENOENT, ):
48
      return None
49
    raise
50

    
51
  try:
52
    return int(contents)
53
  except (ValueError, TypeError), err:
54
    # Couldn't convert to int
55
    raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
56
                               (file_name, err))
57

    
58

    
59
def ReadSerial():
60
  """Read the serial file.
61

62
  The queue should be locked while this function is called.
63

64
  """
65
  return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
66

    
67

    
68
def ReadVersion():
69
  """Read the queue version.
70

71
  The queue should be locked while this function is called.
72

73
  """
74
  return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
75

    
76

    
77
def InitAndVerifyQueue(must_lock):
78
  """Open and lock job queue.
79

80
  If necessary, the queue is automatically initialized.
81

82
  @type must_lock: bool
83
  @param must_lock: Whether an exclusive lock must be held.
84
  @rtype: utils.FileLock
85
  @return: Lock object for the queue. This can be used to change the
86
           locking mode.
87

88
  """
89
  getents = runtime.GetEnts()
90

    
91
  # Lock queue
92
  queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
93
  try:
94
    # The queue needs to be locked in exclusive mode to write to the serial and
95
    # version files.
96
    if must_lock:
97
      queue_lock.Exclusive(blocking=True)
98
      holding_lock = True
99
    else:
100
      try:
101
        queue_lock.Exclusive(blocking=False)
102
        holding_lock = True
103
      except errors.LockError:
104
        # Ignore errors and assume the process keeping the lock checked
105
        # everything.
106
        holding_lock = False
107

    
108
    if holding_lock:
109
      # Verify version
110
      version = ReadVersion()
111
      if version is None:
112
        # Write new version file
113
        utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114
                        uid=getents.masterd_uid, gid=getents.masterd_gid,
115
                        data="%s\n" % constants.JOB_QUEUE_VERSION)
116

    
117
        # Read again
118
        version = ReadVersion()
119

    
120
      if version != constants.JOB_QUEUE_VERSION:
121
        raise errors.JobQueueError("Found job queue version %s, expected %s",
122
                                   version, constants.JOB_QUEUE_VERSION)
123

    
124
      serial = ReadSerial()
125
      if serial is None:
126
        # Write new serial file
127
        utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
128
                        uid=getents.masterd_uid, gid=getents.masterd_gid,
129
                        data="%s\n" % 0)
130

    
131
        # Read again
132
        serial = ReadSerial()
133

    
134
      if serial is None:
135
        # There must be a serious problem
136
        raise errors.JobQueueError("Can't read/parse the job queue"
137
                                   " serial file")
138

    
139
      if not must_lock:
140
        # There's no need for more error handling. Closing the lock
141
        # file below in case of an error will unlock it anyway.
142
        queue_lock.Unlock()
143

    
144
  except:
145
    queue_lock.Close()
146
    raise
147

    
148
  return queue_lock
149

    
150

    
151
def CheckDrainFlag():
152
  """Check if the queue is marked to be drained.
153

154
  This currently uses the queue drain file, which makes it a per-node flag.
155
  In the future this can be moved to the config file.
156

157
  @rtype: boolean
158
  @return: True if the job queue is marked drained
159

160
  """
161
  return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
162

    
163

    
164
def SetDrainFlag(drain_flag):
165
  """Sets the drain flag for the queue.
166

167
  @type drain_flag: boolean
168
  @param drain_flag: Whether to set or unset the drain flag
169
  @attention: This function should only called the current holder of the queue
170
    lock
171

172
  """
173
  getents = runtime.GetEnts()
174

    
175
  if drain_flag:
176
    utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
177
                    uid=getents.masterd_uid, gid=getents.masterd_gid)
178
  else:
179
    utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
180

    
181
  assert (not drain_flag) ^ CheckDrainFlag()
182

    
183

    
184
def FormatJobID(job_id):
185
  """Convert a job ID to int format.
186

187
  Currently this just is a no-op that performs some checks, but if we
188
  want to change the job id format this will abstract this change.
189

190
  @type job_id: int or long
191
  @param job_id: the numeric job id
192
  @rtype: int
193
  @return: the formatted job id
194

195
  """
196
  if not isinstance(job_id, (int, long)):
197
    raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
198
  if job_id < 0:
199
    raise errors.ProgrammerError("Job ID %s is negative" % job_id)
200

    
201
  return job_id
202

    
203

    
204
def GetArchiveDirectory(job_id):
205
  """Returns the archive directory for a job.
206

207
  @type job_id: str
208
  @param job_id: Job identifier
209
  @rtype: str
210
  @return: Directory name
211

212
  """
213
  return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
214

    
215

    
216
def ParseJobId(job_id):
217
  """Parses a job ID and converts it to integer.
218

219
  """
220
  try:
221
    return int(job_id)
222
  except (ValueError, TypeError):
223
    raise errors.ParameterError("Invalid job ID '%s'" % job_id)