Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 498ae1cc

History | View | Annotate | Download (2.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import threading
25
import Queue
26

    
27
from ganeti import opcodes
28

    
29
class JobObject:
30
  """In-memory job representation.
31

32
  This is what we use to track the user-submitted jobs (which are of
33
  class opcodes.Job).
34

35
  """
36
  def __init__(self, jid, jdesc):
37
    self.data = jdesc
38
    jdesc.status = opcodes.Job.STATUS_PENDING
39
    jdesc.job_id = jid
40
    self.lock = threading.Lock()
41

    
42
  def SetStatus(self, status, result=None):
43
    self.lock.acquire()
44
    self.data.status = status
45
    if result is not None:
46
      self.data.result = result
47
    self.lock.release()
48

    
49
  def GetData(self):
50
    self.lock.acquire()
51
    #FIXME(iustin): make a deep copy of result
52
    result = self.data
53
    self.lock.release()
54
    return result
55

    
56

    
57
class QueueManager:
58
  """Example queue implementation.
59

60
  """
61
  def __init__(self):
62
    self.job_queue = {}
63
    self.jid = 1
64
    self.lock = threading.Lock()
65
    self.new_queue = Queue.Queue()
66

    
67
  def put(self, item):
68
    """Add a new job to the queue.
69

70
    This enters the job into our job queue and also puts it on the new
71
    queue, in order for it to be picked up by the queue processors.
72

73
    """
74
    self.lock.acquire()
75
    try:
76
      rid = self.jid
77
      self.jid += 1
78
      job = JobObject(rid, item)
79
      self.job_queue[rid] = job
80
    finally:
81
      self.lock.release()
82
    self.new_queue.put(job)
83
    return rid
84

    
85
  def query(self, rid):
86
    """Query a given job ID.
87

88
    """
89
    self.lock.acquire()
90
    result = self.job_queue.get(rid, None)
91
    self.lock.release()
92
    return result