Revision f1da30e6

b/lib/constants.py
91 91
SSL_CERT_FILE = DATA_DIR + "/server.pem"
92 92
WATCHER_STATEFILE = DATA_DIR + "/watcher.data"
93 93
SSH_KNOWN_HOSTS_FILE = DATA_DIR + "/known_hosts"
94
QUEUE_DIR = DATA_DIR + "/queue"
94 95
ETC_HOSTS = "/etc/hosts"
95 96
DEFAULT_FILE_STORAGE_DIR = _autoconf.FILE_STORAGE_DIR
96 97
MASTER_SOCKET = RUN_GANETI_DIR + "/master.sock"
......
244 245
IARUN_FAILURE = 2
245 246
IARUN_SUCCESS = 3
246 247

  
248
# Job queue
249
JOB_QUEUE_VERSION = 1
250
JOB_QUEUE_LOCK_FILE = QUEUE_DIR + "/lock"
251
JOB_QUEUE_VERSION_FILE = QUEUE_DIR + "/version"
252
JOB_QUEUE_SERIAL_FILE = QUEUE_DIR + "/serial"
253

  
247 254
# Job status
248 255
JOB_STATUS_QUEUED = "queued"
249 256
JOB_STATUS_RUNNING = "running"
b/lib/errors.py
236 236

  
237 237
  """
238 238

  
239

  
240
class JobQueueError(Exception):
241
  """Job queue error.
242

  
243
  """
b/lib/jqueue.py
21 21

  
22 22
"""Module implementing the job queue handling."""
23 23

  
24
import os
24 25
import logging
25 26
import threading
27
import errno
28
import re
26 29

  
27 30
from ganeti import constants
31
from ganeti import serializer
28 32
from ganeti import workerpool
33
from ganeti import opcodes
29 34
from ganeti import errors
30 35
from ganeti import mcpu
31 36
from ganeti import utils
......
41 46

  
42 47
  """
43 48
  def __init__(self, op):
44
    self.input = op
45
    self.status = constants.OP_STATUS_QUEUED
46
    self.result = None
49
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
50

  
51
  def __Setup(self, input, status, result):
47 52
    self._lock = threading.Lock()
53
    self.input = input
54
    self.status = status
55
    self.result = result
56

  
57
  @classmethod
58
  def Restore(cls, state):
59
    obj = object.__new__(cls)
60
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
61
                state["status"], state["result"])
62
    return obj
63

  
64
  @utils.LockedMethod
65
  def Serialize(self):
66
    return {
67
      "input": self.input.__getstate__(),
68
      "status": self.status,
69
      "result": self.result,
70
      }
48 71

  
49 72
  @utils.LockedMethod
50 73
  def GetInput(self):
......
82 105
  This is what we use to track the user-submitted jobs.
83 106

  
84 107
  """
85
  def __init__(self, ops, job_id):
108
  def __init__(self, storage, job_id, ops):
86 109
    if not ops:
87 110
      # TODO
88 111
      raise Exception("No opcodes")
89 112

  
90
    self.id = job_id
91
    self._lock = threading.Lock()
113
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
92 114

  
93
    # _ops should not be modified again because we don't acquire the lock
94
    # to use it.
95
    self._ops = [_QueuedOpCode(op) for op in ops]
115
  def __Setup(self, storage, job_id, ops):
116
    self.storage = storage
117
    self.id = job_id
118
    self._ops = ops
119

  
120
  @classmethod
121
  def Restore(cls, storage, state):
122
    obj = object.__new__(cls)
123
    obj.__Setup(storage, state["id"],
124
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
125
    return obj
126

  
127
  def Serialize(self):
128
    return {
129
      "id": self.id,
130
      "ops": [op.Serialize() for op in self._ops],
131
      }
132

  
133
  def SetUnclean(self, msg):
134
    try:
135
      for op in self._ops:
136
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
137
    finally:
138
      self.storage.UpdateJob(self)
96 139

  
97 140
  def GetStatus(self):
98 141
    status = constants.JOB_STATUS_QUEUED
......
107 150

  
108 151
      if op_status == constants.OP_STATUS_QUEUED:
109 152
        pass
110
      elif op_status == constants.OP_STATUS_ERROR:
111
        status = constants.JOB_STATUS_ERROR
112 153
      elif op_status == constants.OP_STATUS_RUNNING:
113 154
        status = constants.JOB_STATUS_RUNNING
155
      elif op_status == constants.OP_STATUS_ERROR:
156
        status = constants.JOB_STATUS_ERROR
157
        # The whole job fails if one opcode failed
158
        break
114 159

  
115 160
    if all_success:
116 161
      status = constants.JOB_STATUS_SUCCESS
......
133 178
        try:
134 179
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
135 180
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181
          self.storage.UpdateJob(self)
136 182

  
137 183
          result = proc.ExecOpCode(op.input)
138 184

  
139 185
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186
          self.storage.UpdateJob(self)
140 187
          logging.debug("Op %s/%s: Successfully finished %s",
141 188
                        idx + 1, count, op)
142 189
        except Exception, err:
143
          op.SetStatus(constants.OP_STATUS_ERROR, str(err))
144
          logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
190
          try:
191
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
192
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
193
          finally:
194
            self.storage.UpdateJob(self)
145 195
          raise
146 196

  
147 197
    except errors.GenericError, err:
......
172 222
    self.context = context
173 223

  
174 224

  
225
class JobStorage(object):
226
  _RE_JOB_FILE = re.compile(r"^job-\d+$")
227

  
228
  def __init__(self):
229
    self._lock = threading.Lock()
230

  
231
    # Make sure our directory exists
232
    try:
233
      os.mkdir(constants.QUEUE_DIR, 0700)
234
    except OSError, err:
235
      if err.errno not in (errno.EEXIST, ):
236
        raise
237

  
238
    # Get queue lock
239
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
240
    try:
241
      utils.LockFile(self.lock_fd)
242
    except:
243
      self.lock_fd.close()
244
      raise
245

  
246
    # Read version
247
    try:
248
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
249
    except IOError, err:
250
      if err.errno not in (errno.ENOENT, ):
251
        raise
252

  
253
      # Setup a new queue
254
      self._InitQueueUnlocked()
255

  
256
      # Try to open again
257
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
258

  
259
    try:
260
      # Try to read version
261
      version = int(version_fd.read(128))
262

  
263
      # Verify version
264
      if version != constants.JOB_QUEUE_VERSION:
265
        raise errors.JobQueueError("Found version %s, expected %s",
266
                                   version, constants.JOB_QUEUE_VERSION)
267
    finally:
268
      version_fd.close()
269

  
270
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
271
    try:
272
      # Read last serial
273
      self._last_serial = int(serial_fd.read(1024).strip())
274
    finally:
275
      serial_fd.close()
276

  
277
  def Close(self):
278
    assert self.lock_fd, "Queue should be open"
279

  
280
    self.lock_fd.close()
281
    self.lock_fd = None
282

  
283
  def _InitQueueUnlocked(self):
284
    assert self.lock_fd, "Queue should be open"
285

  
286
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
287
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
288
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
289
                    data="%s\n" % 0)
290

  
291
  def _NewSerialUnlocked(self):
292
    """Generates a new job identifier.
293

  
294
    Job identifiers are unique during the lifetime of a cluster.
295

  
296
    Returns: A string representing the job identifier.
297

  
298
    """
299
    assert self.lock_fd, "Queue should be open"
300

  
301
    # New number
302
    serial = self._last_serial + 1
303

  
304
    # Write to file
305
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
306
                    data="%s\n" % serial)
307

  
308
    # Keep it only if we were able to write the file
309
    self._last_serial = serial
310

  
311
    return serial
312

  
313
  def _GetJobPath(self, job_id):
314
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
315

  
316
  def _ListJobFiles(self):
317
    assert self.lock_fd, "Queue should be open"
318

  
319
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
320
            if self._RE_JOB_FILE.match(name)]
321

  
322
  def _LoadJobUnlocked(self, filepath):
323
    assert self.lock_fd, "Queue should be open"
324

  
325
    logging.debug("Loading job from %s", filepath)
326
    try:
327
      fd = open(filepath, "r")
328
    except IOError, err:
329
      if err.errno in (errno.ENOENT, ):
330
        return None
331
      raise
332
    try:
333
      data = serializer.LoadJson(fd.read())
334
    finally:
335
      fd.close()
336

  
337
    return _QueuedJob.Restore(self, data)
338

  
339
  def _GetJobsUnlocked(self, job_ids):
340
    if job_ids:
341
      files = [self._GetJobPath(job_id) for job_id in job_ids]
342
    else:
343
      files = [os.path.join(constants.QUEUE_DIR, filename)
344
               for filename in self._ListJobFiles()]
345

  
346
    return [self._LoadJobUnlocked(filepath) for filepath in files]
347

  
348
  @utils.LockedMethod
349
  def GetJobs(self, job_ids):
350
    return self._GetJobsUnlocked(job_ids)
351

  
352
  @utils.LockedMethod
353
  def AddJob(self, ops):
354
    assert self.lock_fd, "Queue should be open"
355

  
356
    # Get job identifier
357
    job_id = self._NewSerialUnlocked()
358
    job = _QueuedJob(self, job_id, ops)
359

  
360
    # Write to disk
361
    self._UpdateJobUnlocked(job)
362

  
363
    return job
364

  
365
  def _UpdateJobUnlocked(self, job):
366
    assert self.lock_fd, "Queue should be open"
367

  
368
    filename = self._GetJobPath(job.id)
369
    logging.debug("Writing job %s to %s", job.id, filename)
370
    utils.WriteFile(filename,
371
                    data=serializer.DumpJson(job.Serialize(), indent=False))
372

  
373
  @utils.LockedMethod
374
  def UpdateJob(self, job):
375
    return self._UpdateJobUnlocked(job)
376

  
377
  def ArchiveJob(self, job_id):
378
    raise NotImplementedError()
379

  
380

  
175 381
class JobQueue:
176 382
  """The job queue.
177 383

  
178 384
   """
179 385
  def __init__(self, context):
180 386
    self._lock = threading.Lock()
181
    self._last_job_id = 0
182
    self._jobs = {}
387
    self._jobs = JobStorage()
183 388
    self._wpool = _JobQueueWorkerPool(context)
184 389

  
185
  def _NewJobIdUnlocked(self):
186
    """Generates a new job identifier.
187

  
188
    Returns: A string representing the job identifier.
390
    for job in self._jobs.GetJobs(None):
391
      status = job.GetStatus()
392
      if status in (constants.JOB_STATUS_QUEUED, ):
393
        self._wpool.AddTask(job)
189 394

  
190
    """
191
    self._last_job_id += 1
192
    return str(self._last_job_id)
395
      elif status in (constants.JOB_STATUS_RUNNING, ):
396
        logging.warning("Unfinished job %s found: %s", job.id, job)
397
        job.SetUnclean("Unclean master daemon shutdown")
193 398

  
399
  @utils.LockedMethod
194 400
  def SubmitJob(self, ops):
195 401
    """Add a new job to the queue.
196 402

  
......
201 407
    - ops: Sequence of opcodes
202 408

  
203 409
    """
204
    # Get job identifier
205
    self._lock.acquire()
206
    try:
207
      job_id = self._NewJobIdUnlocked()
208
    finally:
209
      self._lock.release()
210

  
211
    job = _QueuedJob(ops, job_id)
212

  
213
    # Add it to our internal queue
214
    self._lock.acquire()
215
    try:
216
      self._jobs[job_id] = job
217
    finally:
218
      self._lock.release()
410
    job = self._jobs.AddJob(ops)
219 411

  
220 412
    # Add to worker pool
221 413
    self._wpool.AddTask(job)
222 414

  
223
    return job_id
415
    return job.id
224 416

  
225 417
  def ArchiveJob(self, job_id):
226 418
    raise NotImplementedError()
......
255 447
    """
256 448
    self._lock.acquire()
257 449
    try:
258
      if not job_ids:
259
        job_ids = self._jobs.keys()
260

  
261
      # TODO: define sort order?
262
      job_ids.sort()
263

  
264 450
      jobs = []
265 451

  
266
      for job_id in job_ids:
267
        job = self._jobs.get(job_id, None)
452
      for job in self._jobs.GetJobs(job_ids):
268 453
        if job is None:
269 454
          jobs.append(None)
270 455
        else:
......
274 459
    finally:
275 460
      self._lock.release()
276 461

  
462
  @utils.LockedMethod
277 463
  def Shutdown(self):
278 464
    """Stops the job queue.
279 465

  
280 466
    """
281 467
    self._wpool.TerminateWorkers()
468
    self._jobs.Close()

Also available in: Unified diff