Statistics
| Branch: | Tag: | Revision:

root / lib / jstore.py @ e2b4a7ba

History | View | Annotate | Download (5.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import errno
25
import os
26

    
27
from ganeti import constants
28
from ganeti import errors
29
from ganeti import runtime
30
from ganeti import utils
31
from ganeti import pathutils
32

    
33

    
34
JOBS_PER_ARCHIVE_DIRECTORY = 10000
35

    
36

    
37
def _ReadNumericFile(file_name):
38
  """Reads a file containing a number.
39

40
  @rtype: None or int
41
  @return: None if file is not found, otherwise number
42

43
  """
44
  try:
45
    return int(utils.ReadFile(file_name))
46
  except EnvironmentError, err:
47
    if err.errno in (errno.ENOENT, ):
48
      return None
49
    raise
50

    
51

    
52
def ReadSerial():
53
  """Read the serial file.
54

55
  The queue should be locked while this function is called.
56

57
  """
58
  return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
59

    
60

    
61
def ReadVersion():
62
  """Read the queue version.
63

64
  The queue should be locked while this function is called.
65

66
  """
67
  return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
68

    
69

    
70
def InitAndVerifyQueue(must_lock):
71
  """Open and lock job queue.
72

73
  If necessary, the queue is automatically initialized.
74

75
  @type must_lock: bool
76
  @param must_lock: Whether an exclusive lock must be held.
77
  @rtype: utils.FileLock
78
  @return: Lock object for the queue. This can be used to change the
79
           locking mode.
80

81
  """
82
  getents = runtime.GetEnts()
83

    
84
  # Lock queue
85
  queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
86
  try:
87
    # The queue needs to be locked in exclusive mode to write to the serial and
88
    # version files.
89
    if must_lock:
90
      queue_lock.Exclusive(blocking=True)
91
      holding_lock = True
92
    else:
93
      try:
94
        queue_lock.Exclusive(blocking=False)
95
        holding_lock = True
96
      except errors.LockError:
97
        # Ignore errors and assume the process keeping the lock checked
98
        # everything.
99
        holding_lock = False
100

    
101
    if holding_lock:
102
      # Verify version
103
      version = ReadVersion()
104
      if version is None:
105
        # Write new version file
106
        utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
107
                        uid=getents.masterd_uid, gid=getents.masterd_gid,
108
                        data="%s\n" % constants.JOB_QUEUE_VERSION)
109

    
110
        # Read again
111
        version = ReadVersion()
112

    
113
      if version != constants.JOB_QUEUE_VERSION:
114
        raise errors.JobQueueError("Found job queue version %s, expected %s",
115
                                   version, constants.JOB_QUEUE_VERSION)
116

    
117
      serial = ReadSerial()
118
      if serial is None:
119
        # Write new serial file
120
        utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
121
                        uid=getents.masterd_uid, gid=getents.masterd_gid,
122
                        data="%s\n" % 0)
123

    
124
        # Read again
125
        serial = ReadSerial()
126

    
127
      if serial is None:
128
        # There must be a serious problem
129
        raise errors.JobQueueError("Can't read/parse the job queue"
130
                                   " serial file")
131

    
132
      if not must_lock:
133
        # There's no need for more error handling. Closing the lock
134
        # file below in case of an error will unlock it anyway.
135
        queue_lock.Unlock()
136

    
137
  except:
138
    queue_lock.Close()
139
    raise
140

    
141
  return queue_lock
142

    
143

    
144
def CheckDrainFlag():
145
  """Check if the queue is marked to be drained.
146

147
  This currently uses the queue drain file, which makes it a per-node flag.
148
  In the future this can be moved to the config file.
149

150
  @rtype: boolean
151
  @return: True if the job queue is marked drained
152

153
  """
154
  return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
155

    
156

    
157
def SetDrainFlag(drain_flag):
158
  """Sets the drain flag for the queue.
159

160
  @type drain_flag: boolean
161
  @param drain_flag: Whether to set or unset the drain flag
162
  @attention: This function should only called the current holder of the queue
163
    lock
164

165
  """
166
  getents = runtime.GetEnts()
167

    
168
  if drain_flag:
169
    utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
170
                    uid=getents.masterd_uid, gid=getents.masterd_gid)
171
  else:
172
    utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
173

    
174
  assert (not drain_flag) ^ CheckDrainFlag()
175

    
176

    
177
def FormatJobID(job_id):
178
  """Convert a job ID to int format.
179

180
  Currently this just is a no-op that performs some checks, but if we
181
  want to change the job id format this will abstract this change.
182

183
  @type job_id: int or long
184
  @param job_id: the numeric job id
185
  @rtype: int
186
  @return: the formatted job id
187

188
  """
189
  if not isinstance(job_id, (int, long)):
190
    raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
191
  if job_id < 0:
192
    raise errors.ProgrammerError("Job ID %s is negative" % job_id)
193

    
194
  return job_id
195

    
196

    
197
def GetArchiveDirectory(job_id):
198
  """Returns the archive directory for a job.
199

200
  @type job_id: str
201
  @param job_id: Job identifier
202
  @rtype: str
203
  @return: Directory name
204

205
  """
206
  return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
207

    
208

    
209
def ParseJobId(job_id):
210
  """Parses a job ID and converts it to integer.
211

212
  """
213
  try:
214
    return int(job_id)
215
  except (ValueError, TypeError):
216
    raise errors.ParameterError("Invalid job ID '%s'" % job_id)