Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 1d544ba3

History | View | Annotate | Download (2.3 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 498ae1cc Iustin Pop
import threading
25 498ae1cc Iustin Pop
import Queue
26 498ae1cc Iustin Pop
27 498ae1cc Iustin Pop
from ganeti import opcodes
28 498ae1cc Iustin Pop
29 498ae1cc Iustin Pop
class JobObject:
30 498ae1cc Iustin Pop
  """In-memory job representation.
31 498ae1cc Iustin Pop

32 498ae1cc Iustin Pop
  This is what we use to track the user-submitted jobs (which are of
33 498ae1cc Iustin Pop
  class opcodes.Job).
34 498ae1cc Iustin Pop

35 498ae1cc Iustin Pop
  """
36 498ae1cc Iustin Pop
  def __init__(self, jid, jdesc):
37 498ae1cc Iustin Pop
    self.data = jdesc
38 498ae1cc Iustin Pop
    jdesc.status = opcodes.Job.STATUS_PENDING
39 498ae1cc Iustin Pop
    jdesc.job_id = jid
40 498ae1cc Iustin Pop
    self.lock = threading.Lock()
41 498ae1cc Iustin Pop
42 498ae1cc Iustin Pop
  def SetStatus(self, status, result=None):
43 498ae1cc Iustin Pop
    self.lock.acquire()
44 498ae1cc Iustin Pop
    self.data.status = status
45 498ae1cc Iustin Pop
    if result is not None:
46 498ae1cc Iustin Pop
      self.data.result = result
47 498ae1cc Iustin Pop
    self.lock.release()
48 498ae1cc Iustin Pop
49 498ae1cc Iustin Pop
  def GetData(self):
50 498ae1cc Iustin Pop
    self.lock.acquire()
51 498ae1cc Iustin Pop
    #FIXME(iustin): make a deep copy of result
52 498ae1cc Iustin Pop
    result = self.data
53 498ae1cc Iustin Pop
    self.lock.release()
54 498ae1cc Iustin Pop
    return result
55 498ae1cc Iustin Pop
56 498ae1cc Iustin Pop
57 498ae1cc Iustin Pop
class QueueManager:
58 498ae1cc Iustin Pop
  """Example queue implementation.
59 498ae1cc Iustin Pop

60 498ae1cc Iustin Pop
  """
61 498ae1cc Iustin Pop
  def __init__(self):
62 498ae1cc Iustin Pop
    self.job_queue = {}
63 498ae1cc Iustin Pop
    self.jid = 1
64 498ae1cc Iustin Pop
    self.lock = threading.Lock()
65 498ae1cc Iustin Pop
    self.new_queue = Queue.Queue()
66 498ae1cc Iustin Pop
67 498ae1cc Iustin Pop
  def put(self, item):
68 498ae1cc Iustin Pop
    """Add a new job to the queue.
69 498ae1cc Iustin Pop

70 498ae1cc Iustin Pop
    This enters the job into our job queue and also puts it on the new
71 498ae1cc Iustin Pop
    queue, in order for it to be picked up by the queue processors.
72 498ae1cc Iustin Pop

73 498ae1cc Iustin Pop
    """
74 498ae1cc Iustin Pop
    self.lock.acquire()
75 498ae1cc Iustin Pop
    try:
76 498ae1cc Iustin Pop
      rid = self.jid
77 498ae1cc Iustin Pop
      self.jid += 1
78 498ae1cc Iustin Pop
      job = JobObject(rid, item)
79 498ae1cc Iustin Pop
      self.job_queue[rid] = job
80 498ae1cc Iustin Pop
    finally:
81 498ae1cc Iustin Pop
      self.lock.release()
82 498ae1cc Iustin Pop
    self.new_queue.put(job)
83 498ae1cc Iustin Pop
    return rid
84 498ae1cc Iustin Pop
85 498ae1cc Iustin Pop
  def query(self, rid):
86 498ae1cc Iustin Pop
    """Query a given job ID.
87 498ae1cc Iustin Pop

88 498ae1cc Iustin Pop
    """
89 498ae1cc Iustin Pop
    self.lock.acquire()
90 498ae1cc Iustin Pop
    result = self.job_queue.get(rid, None)
91 498ae1cc Iustin Pop
    self.lock.release()
92 498ae1cc Iustin Pop
    return result