Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ ac0930b9

History | View | Annotate | Download (13.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29

    
30
from ganeti import constants
31
from ganeti import serializer
32
from ganeti import workerpool
33
from ganeti import opcodes
34
from ganeti import errors
35
from ganeti import mcpu
36
from ganeti import utils
37

    
38

    
39
JOBQUEUE_THREADS = 5
40

    
41

    
42
class _QueuedOpCode(object):
43
  """Encasulates an opcode object.
44

45
  Access is synchronized by the '_lock' attribute.
46

47
  """
48
  def __init__(self, op):
49
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)
50

    
51
  def __Setup(self, input, status, result):
52
    self._lock = threading.Lock()
53
    self.input = input
54
    self.status = status
55
    self.result = result
56

    
57
  @classmethod
58
  def Restore(cls, state):
59
    obj = object.__new__(cls)
60
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
61
                state["status"], state["result"])
62
    return obj
63

    
64
  @utils.LockedMethod
65
  def Serialize(self):
66
    return {
67
      "input": self.input.__getstate__(),
68
      "status": self.status,
69
      "result": self.result,
70
      }
71

    
72
  @utils.LockedMethod
73
  def GetInput(self):
74
    """Returns the original opcode.
75

76
    """
77
    return self.input
78

    
79
  @utils.LockedMethod
80
  def SetStatus(self, status, result):
81
    """Update the opcode status and result.
82

83
    """
84
    self.status = status
85
    self.result = result
86

    
87
  @utils.LockedMethod
88
  def GetStatus(self):
89
    """Get the opcode status.
90

91
    """
92
    return self.status
93

    
94
  @utils.LockedMethod
95
  def GetResult(self):
96
    """Get the opcode result.
97

98
    """
99
    return self.result
100

    
101

    
102
class _QueuedJob(object):
103
  """In-memory job representation.
104

105
  This is what we use to track the user-submitted jobs.
106

107
  """
108
  def __init__(self, storage, job_id, ops):
109
    if not ops:
110
      # TODO
111
      raise Exception("No opcodes")
112

    
113
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
114

    
115
  def __Setup(self, storage, job_id, ops):
116
    self.storage = storage
117
    self.id = job_id
118
    self._ops = ops
119

    
120
  @classmethod
121
  def Restore(cls, storage, state):
122
    obj = object.__new__(cls)
123
    obj.__Setup(storage, state["id"],
124
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
125
    return obj
126

    
127
  def Serialize(self):
128
    return {
129
      "id": self.id,
130
      "ops": [op.Serialize() for op in self._ops],
131
      }
132

    
133
  def SetUnclean(self, msg):
134
    try:
135
      for op in self._ops:
136
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
137
    finally:
138
      self.storage.UpdateJob(self)
139

    
140
  def GetStatus(self):
141
    status = constants.JOB_STATUS_QUEUED
142

    
143
    all_success = True
144
    for op in self._ops:
145
      op_status = op.GetStatus()
146
      if op_status == constants.OP_STATUS_SUCCESS:
147
        continue
148

    
149
      all_success = False
150

    
151
      if op_status == constants.OP_STATUS_QUEUED:
152
        pass
153
      elif op_status == constants.OP_STATUS_RUNNING:
154
        status = constants.JOB_STATUS_RUNNING
155
      elif op_status == constants.OP_STATUS_ERROR:
156
        status = constants.JOB_STATUS_ERROR
157
        # The whole job fails if one opcode failed
158
        break
159

    
160
    if all_success:
161
      status = constants.JOB_STATUS_SUCCESS
162

    
163
    return status
164

    
165
  def Run(self, proc):
166
    """Job executor.
167

168
    This functions processes a this job in the context of given processor
169
    instance.
170

171
    Args:
172
    - proc: Ganeti Processor to run the job with
173

174
    """
175
    try:
176
      count = len(self._ops)
177
      for idx, op in enumerate(self._ops):
178
        try:
179
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
180
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181
          self.storage.UpdateJob(self)
182

    
183
          result = proc.ExecOpCode(op.input)
184

    
185
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186
          self.storage.UpdateJob(self)
187
          logging.debug("Op %s/%s: Successfully finished %s",
188
                        idx + 1, count, op)
189
        except Exception, err:
190
          try:
191
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
192
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
193
          finally:
194
            self.storage.UpdateJob(self)
195
          raise
196

    
197
    except errors.GenericError, err:
198
      logging.error("ganeti exception %s", exc_info=err)
199
    except Exception, err:
200
      logging.error("unhandled exception %s", exc_info=err)
201
    except:
202
      logging.error("unhandled unknown exception %s", exc_info=err)
203

    
204

    
205
class _JobQueueWorker(workerpool.BaseWorker):
206
  def RunTask(self, job):
207
    logging.debug("Worker %s processing job %s",
208
                  self.worker_id, job.id)
209
    # TODO: feedback function
210
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
211
    try:
212
      job.Run(proc)
213
    finally:
214
      logging.debug("Worker %s finished job %s, status = %s",
215
                    self.worker_id, job.id, job.GetStatus())
216

    
217

    
218
class _JobQueueWorkerPool(workerpool.WorkerPool):
219
  def __init__(self, context):
220
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
221
                                              _JobQueueWorker)
222
    self.context = context
223

    
224

    
225
class JobStorage(object):
226
  _RE_JOB_FILE = re.compile(r"^job-(\d+)$")
227

    
228
  def __init__(self):
229
    self._lock = threading.Lock()
230
    self._memcache = {}
231

    
232
    # Make sure our directory exists
233
    try:
234
      os.mkdir(constants.QUEUE_DIR, 0700)
235
    except OSError, err:
236
      if err.errno not in (errno.EEXIST, ):
237
        raise
238

    
239
    # Get queue lock
240
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
241
    try:
242
      utils.LockFile(self.lock_fd)
243
    except:
244
      self.lock_fd.close()
245
      raise
246

    
247
    # Read version
248
    try:
249
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
250
    except IOError, err:
251
      if err.errno not in (errno.ENOENT, ):
252
        raise
253

    
254
      # Setup a new queue
255
      self._InitQueueUnlocked()
256

    
257
      # Try to open again
258
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
259

    
260
    try:
261
      # Try to read version
262
      version = int(version_fd.read(128))
263

    
264
      # Verify version
265
      if version != constants.JOB_QUEUE_VERSION:
266
        raise errors.JobQueueError("Found version %s, expected %s",
267
                                   version, constants.JOB_QUEUE_VERSION)
268
    finally:
269
      version_fd.close()
270

    
271
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
272
    try:
273
      # Read last serial
274
      self._last_serial = int(serial_fd.read(1024).strip())
275
    finally:
276
      serial_fd.close()
277

    
278
  def Close(self):
279
    assert self.lock_fd, "Queue should be open"
280

    
281
    self.lock_fd.close()
282
    self.lock_fd = None
283

    
284
  def _InitQueueUnlocked(self):
285
    assert self.lock_fd, "Queue should be open"
286

    
287
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
288
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
289
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
290
                    data="%s\n" % 0)
291

    
292
  def _NewSerialUnlocked(self):
293
    """Generates a new job identifier.
294

295
    Job identifiers are unique during the lifetime of a cluster.
296

297
    Returns: A string representing the job identifier.
298

299
    """
300
    assert self.lock_fd, "Queue should be open"
301

    
302
    # New number
303
    serial = self._last_serial + 1
304

    
305
    # Write to file
306
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
307
                    data="%s\n" % serial)
308

    
309
    # Keep it only if we were able to write the file
310
    self._last_serial = serial
311

    
312
    return serial
313

    
314
  def _GetJobPath(self, job_id):
315
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
316

    
317
  def _GetJobIDsUnlocked(self, archived=False):
318
    """Return all known job IDs.
319

320
    If the parameter archived is True, archived jobs IDs will be
321
    included. Currently this argument is unused.
322

323
    The method only looks at disk because it's a requirement that all
324
    jobs are present on disk (so in the _memcache we don't have any
325
    extra IDs).
326

327
    """
328
    jfiles = self._ListJobFiles()
329
    return [int(m.group(1)) for m in
330
            [self._RE_JOB_FILE.match(name) for name in jfiles]]
331

    
332
  def _ListJobFiles(self):
333
    assert self.lock_fd, "Queue should be open"
334

    
335
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
336
            if self._RE_JOB_FILE.match(name)]
337

    
338
  def _LoadJobUnlocked(self, job_id):
339
    assert self.lock_fd, "Queue should be open"
340

    
341
    if job_id in self._memcache:
342
      logging.debug("Found job %d in memcache", job_id)
343
      return self._memcache[job_id]
344

    
345
    filepath = self._GetJobPath(job_id)
346
    logging.debug("Loading job from %s", filepath)
347
    try:
348
      fd = open(filepath, "r")
349
    except IOError, err:
350
      if err.errno in (errno.ENOENT, ):
351
        return None
352
      raise
353
    try:
354
      data = serializer.LoadJson(fd.read())
355
    finally:
356
      fd.close()
357

    
358
    job = _QueuedJob.Restore(self, data)
359
    self._memcache[job_id] = job
360
    logging.debug("Added job %d to the cache", job_id)
361
    return job
362

    
363
  def _GetJobsUnlocked(self, job_ids):
364
    if not job_ids:
365
      job_ids = self._GetJobIDsUnlocked()
366

    
367
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
368

    
369
  @utils.LockedMethod
370
  def GetJobs(self, job_ids):
371
    return self._GetJobsUnlocked(job_ids)
372

    
373
  @utils.LockedMethod
374
  def AddJob(self, ops):
375
    assert self.lock_fd, "Queue should be open"
376

    
377
    # Get job identifier
378
    job_id = self._NewSerialUnlocked()
379
    job = _QueuedJob(self, job_id, ops)
380

    
381
    # Write to disk
382
    self._UpdateJobUnlocked(job)
383

    
384
    logging.debug("Added new job %d to the cache", job_id)
385
    self._memcache[job_id] = job
386

    
387
    return job
388

    
389
  def _UpdateJobUnlocked(self, job):
390
    assert self.lock_fd, "Queue should be open"
391

    
392
    filename = self._GetJobPath(job.id)
393
    logging.debug("Writing job %s to %s", job.id, filename)
394
    utils.WriteFile(filename,
395
                    data=serializer.DumpJson(job.Serialize(), indent=False))
396
    self._CleanCacheUnlocked(exceptions=[job.id])
397

    
398
  def _CleanCacheUnlocked(self, exceptions=None):
399
    """Clean the memory cache.
400

401
    The exceptions argument contains job IDs that should not be
402
    cleaned.
403

404
    """
405
    assert isinstance(exceptions, list)
406
    for job in self._memcache.values():
407
      if job.id in exceptions:
408
        continue
409
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
410
                                 constants.JOB_STATUS_RUNNING):
411
        logging.debug("Cleaning job %d from the cache", job.id)
412
        try:
413
          del self._memcache[job.id]
414
        except KeyError:
415
          pass
416

    
417
  @utils.LockedMethod
418
  def UpdateJob(self, job):
419
    return self._UpdateJobUnlocked(job)
420

    
421
  def ArchiveJob(self, job_id):
422
    raise NotImplementedError()
423

    
424

    
425
class JobQueue:
426
  """The job queue.
427

428
   """
429
  def __init__(self, context):
430
    self._lock = threading.Lock()
431
    self._jobs = JobStorage()
432
    self._wpool = _JobQueueWorkerPool(context)
433

    
434
    for job in self._jobs.GetJobs(None):
435
      status = job.GetStatus()
436
      if status in (constants.JOB_STATUS_QUEUED, ):
437
        self._wpool.AddTask(job)
438

    
439
      elif status in (constants.JOB_STATUS_RUNNING, ):
440
        logging.warning("Unfinished job %s found: %s", job.id, job)
441
        job.SetUnclean("Unclean master daemon shutdown")
442

    
443
  @utils.LockedMethod
444
  def SubmitJob(self, ops):
445
    """Add a new job to the queue.
446

447
    This enters the job into our job queue and also puts it on the new
448
    queue, in order for it to be picked up by the queue processors.
449

450
    Args:
451
    - ops: Sequence of opcodes
452

453
    """
454
    job = self._jobs.AddJob(ops)
455

    
456
    # Add to worker pool
457
    self._wpool.AddTask(job)
458

    
459
    return job.id
460

    
461
  def ArchiveJob(self, job_id):
462
    raise NotImplementedError()
463

    
464
  def CancelJob(self, job_id):
465
    raise NotImplementedError()
466

    
467
  def _GetJobInfo(self, job, fields):
468
    row = []
469
    for fname in fields:
470
      if fname == "id":
471
        row.append(job.id)
472
      elif fname == "status":
473
        row.append(job.GetStatus())
474
      elif fname == "ops":
475
        row.append([op.GetInput().__getstate__() for op in job._ops])
476
      elif fname == "opresult":
477
        row.append([op.GetResult() for op in job._ops])
478
      elif fname == "opstatus":
479
        row.append([op.GetStatus() for op in job._ops])
480
      else:
481
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
482
    return row
483

    
484
  def QueryJobs(self, job_ids, fields):
485
    """Returns a list of jobs in queue.
486

487
    Args:
488
    - job_ids: Sequence of job identifiers or None for all
489
    - fields: Names of fields to return
490

491
    """
492
    self._lock.acquire()
493
    try:
494
      jobs = []
495

    
496
      for job in self._jobs.GetJobs(job_ids):
497
        if job is None:
498
          jobs.append(None)
499
        else:
500
          jobs.append(self._GetJobInfo(job, fields))
501

    
502
      return jobs
503
    finally:
504
      self._lock.release()
505

    
506
  @utils.LockedMethod
507
  def Shutdown(self):
508
    """Stops the job queue.
509

510
    """
511
    self._wpool.TerminateWorkers()
512
    self._jobs.Close()