Statistics
| Branch: | Tag: | Revision:

root / lib / jstore.py @ 5349519d

History | View | Annotate | Download (6.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import errno
25
import os
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import runtime
30
from ganeti import utils
31
from ganeti import pathutils
32

    
33

    
34
JOBS_PER_ARCHIVE_DIRECTORY = constants.JSTORE_JOBS_PER_ARCHIVE_DIRECTORY
35

    
36

    
37
def _ReadNumericFile(file_name):
38
  """Reads a file containing a number.
39

40
  @rtype: None or int
41
  @return: None if file is not found, otherwise number
42

43
  """
44
  try:
45
    contents = utils.ReadFile(file_name)
46
  except EnvironmentError, err:
47
    if err.errno in (errno.ENOENT, ):
48
      return None
49
    raise
50

    
51
  try:
52
    return int(contents)
53
  except (ValueError, TypeError), err:
54
    # Couldn't convert to int
55
    raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
56
                               (file_name, err))
57

    
58

    
59
def ReadSerial():
60
  """Read the serial file.
61

62
  The queue should be locked while this function is called.
63

64
  """
65
  return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
66

    
67

    
68
def ReadVersion():
69
  """Read the queue version.
70

71
  The queue should be locked while this function is called.
72

73
  """
74
  return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
75

    
76

    
77
def InitAndVerifyQueue(must_lock):
78
  """Open and lock job queue.
79

80
  If necessary, the queue is automatically initialized.
81

82
  @type must_lock: bool
83
  @param must_lock: Whether an exclusive lock must be held.
84
  @rtype: utils.FileLock
85
  @return: Lock object for the queue. This can be used to change the
86
           locking mode.
87

88
  """
89
  getents = runtime.GetEnts()
90

    
91
  # Lock queue
92
  queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
93
  try:
94
    # The queue needs to be locked in exclusive mode to write to the serial and
95
    # version files.
96
    if must_lock:
97
      queue_lock.Exclusive(blocking=True)
98
      holding_lock = True
99
    else:
100
      try:
101
        queue_lock.Exclusive(blocking=False)
102
        holding_lock = True
103
      except errors.LockError:
104
        # Ignore errors and assume the process keeping the lock checked
105
        # everything.
106
        holding_lock = False
107

    
108
    if holding_lock:
109
      # Verify version
110
      version = ReadVersion()
111
      if version is None:
112
        # Write new version file
113
        utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
114
                        uid=getents.masterd_uid, gid=getents.daemons_gid,
115
                        mode=constants.JOB_QUEUE_FILES_PERMS,
116
                        data="%s\n" % constants.JOB_QUEUE_VERSION)
117

    
118
        # Read again
119
        version = ReadVersion()
120

    
121
      if version != constants.JOB_QUEUE_VERSION:
122
        raise errors.JobQueueError("Found job queue version %s, expected %s",
123
                                   version, constants.JOB_QUEUE_VERSION)
124

    
125
      serial = ReadSerial()
126
      if serial is None:
127
        # Write new serial file
128
        utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
129
                        uid=getents.masterd_uid, gid=getents.daemons_gid,
130
                        mode=constants.JOB_QUEUE_FILES_PERMS,
131
                        data="%s\n" % 0)
132

    
133
        # Read again
134
        serial = ReadSerial()
135

    
136
      if serial is None:
137
        # There must be a serious problem
138
        raise errors.JobQueueError("Can't read/parse the job queue"
139
                                   " serial file")
140

    
141
      if not must_lock:
142
        # There's no need for more error handling. Closing the lock
143
        # file below in case of an error will unlock it anyway.
144
        queue_lock.Unlock()
145

    
146
  except:
147
    queue_lock.Close()
148
    raise
149

    
150
  return queue_lock
151

    
152

    
153
def CheckDrainFlag():
154
  """Check if the queue is marked to be drained.
155

156
  This currently uses the queue drain file, which makes it a per-node flag.
157
  In the future this can be moved to the config file.
158

159
  @rtype: boolean
160
  @return: True if the job queue is marked drained
161

162
  """
163
  return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
164

    
165

    
166
def SetDrainFlag(drain_flag):
167
  """Sets the drain flag for the queue.
168

169
  @type drain_flag: boolean
170
  @param drain_flag: Whether to set or unset the drain flag
171
  @attention: This function should only called the current holder of the queue
172
    lock
173

174
  """
175
  getents = runtime.GetEnts()
176

    
177
  if drain_flag:
178
    utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
179
                    uid=getents.masterd_uid, gid=getents.daemons_gid,
180
                    mode=constants.JOB_QUEUE_FILES_PERMS)
181
  else:
182
    utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
183

    
184
  assert (not drain_flag) ^ CheckDrainFlag()
185

    
186

    
187
def FormatJobID(job_id):
188
  """Convert a job ID to int format.
189

190
  Currently this just is a no-op that performs some checks, but if we
191
  want to change the job id format this will abstract this change.
192

193
  @type job_id: int or long
194
  @param job_id: the numeric job id
195
  @rtype: int
196
  @return: the formatted job id
197

198
  """
199
  if not isinstance(job_id, (int, long)):
200
    raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
201
  if job_id < 0:
202
    raise errors.ProgrammerError("Job ID %s is negative" % job_id)
203

    
204
  return job_id
205

    
206

    
207
def GetArchiveDirectory(job_id):
208
  """Returns the archive directory for a job.
209

210
  @type job_id: str
211
  @param job_id: Job identifier
212
  @rtype: str
213
  @return: Directory name
214

215
  """
216
  return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
217

    
218

    
219
def ParseJobId(job_id):
220
  """Parses a job ID and converts it to integer.
221

222
  """
223
  try:
224
    return int(job_id)
225
  except (ValueError, TypeError):
226
    raise errors.ParameterError("Invalid job ID '%s'" % job_id)