Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b74159ee

History | View | Annotate | Download (3.4 kB)

1 498ae1cc Iustin Pop
#
2 498ae1cc Iustin Pop
#
3 498ae1cc Iustin Pop
4 498ae1cc Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 498ae1cc Iustin Pop
#
6 498ae1cc Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 498ae1cc Iustin Pop
# it under the terms of the GNU General Public License as published by
8 498ae1cc Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 498ae1cc Iustin Pop
# (at your option) any later version.
10 498ae1cc Iustin Pop
#
11 498ae1cc Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 498ae1cc Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 498ae1cc Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 498ae1cc Iustin Pop
# General Public License for more details.
15 498ae1cc Iustin Pop
#
16 498ae1cc Iustin Pop
# You should have received a copy of the GNU General Public License
17 498ae1cc Iustin Pop
# along with this program; if not, write to the Free Software
18 498ae1cc Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 498ae1cc Iustin Pop
# 02110-1301, USA.
20 498ae1cc Iustin Pop
21 498ae1cc Iustin Pop
22 498ae1cc Iustin Pop
"""Module implementing the job queue handling."""
23 498ae1cc Iustin Pop
24 498ae1cc Iustin Pop
import threading
25 498ae1cc Iustin Pop
import Queue
26 498ae1cc Iustin Pop
27 498ae1cc Iustin Pop
from ganeti import opcodes
28 7a1ecaed Iustin Pop
from ganeti import errors
29 498ae1cc Iustin Pop
30 498ae1cc Iustin Pop
class JobObject:
31 498ae1cc Iustin Pop
  """In-memory job representation.
32 498ae1cc Iustin Pop

33 498ae1cc Iustin Pop
  This is what we use to track the user-submitted jobs (which are of
34 498ae1cc Iustin Pop
  class opcodes.Job).
35 498ae1cc Iustin Pop

36 498ae1cc Iustin Pop
  """
37 498ae1cc Iustin Pop
  def __init__(self, jid, jdesc):
38 498ae1cc Iustin Pop
    self.data = jdesc
39 498ae1cc Iustin Pop
    jdesc.status = opcodes.Job.STATUS_PENDING
40 498ae1cc Iustin Pop
    jdesc.job_id = jid
41 35049ff2 Iustin Pop
    jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
42 35049ff2 Iustin Pop
    jdesc.op_result = [None for i in jdesc.op_list]
43 498ae1cc Iustin Pop
    self.lock = threading.Lock()
44 498ae1cc Iustin Pop
45 498ae1cc Iustin Pop
  def SetStatus(self, status, result=None):
46 498ae1cc Iustin Pop
    self.lock.acquire()
47 498ae1cc Iustin Pop
    self.data.status = status
48 498ae1cc Iustin Pop
    if result is not None:
49 498ae1cc Iustin Pop
      self.data.result = result
50 498ae1cc Iustin Pop
    self.lock.release()
51 498ae1cc Iustin Pop
52 498ae1cc Iustin Pop
  def GetData(self):
53 498ae1cc Iustin Pop
    self.lock.acquire()
54 498ae1cc Iustin Pop
    #FIXME(iustin): make a deep copy of result
55 498ae1cc Iustin Pop
    result = self.data
56 498ae1cc Iustin Pop
    self.lock.release()
57 498ae1cc Iustin Pop
    return result
58 498ae1cc Iustin Pop
59 498ae1cc Iustin Pop
60 498ae1cc Iustin Pop
class QueueManager:
61 498ae1cc Iustin Pop
  """Example queue implementation.
62 498ae1cc Iustin Pop

63 498ae1cc Iustin Pop
  """
64 498ae1cc Iustin Pop
  def __init__(self):
65 498ae1cc Iustin Pop
    self.job_queue = {}
66 498ae1cc Iustin Pop
    self.jid = 1
67 498ae1cc Iustin Pop
    self.lock = threading.Lock()
68 498ae1cc Iustin Pop
    self.new_queue = Queue.Queue()
69 498ae1cc Iustin Pop
70 498ae1cc Iustin Pop
  def put(self, item):
71 498ae1cc Iustin Pop
    """Add a new job to the queue.
72 498ae1cc Iustin Pop

73 498ae1cc Iustin Pop
    This enters the job into our job queue and also puts it on the new
74 498ae1cc Iustin Pop
    queue, in order for it to be picked up by the queue processors.
75 498ae1cc Iustin Pop

76 498ae1cc Iustin Pop
    """
77 498ae1cc Iustin Pop
    self.lock.acquire()
78 498ae1cc Iustin Pop
    try:
79 498ae1cc Iustin Pop
      rid = self.jid
80 498ae1cc Iustin Pop
      self.jid += 1
81 498ae1cc Iustin Pop
      job = JobObject(rid, item)
82 498ae1cc Iustin Pop
      self.job_queue[rid] = job
83 498ae1cc Iustin Pop
    finally:
84 498ae1cc Iustin Pop
      self.lock.release()
85 498ae1cc Iustin Pop
    self.new_queue.put(job)
86 498ae1cc Iustin Pop
    return rid
87 498ae1cc Iustin Pop
88 498ae1cc Iustin Pop
  def query(self, rid):
89 498ae1cc Iustin Pop
    """Query a given job ID.
90 498ae1cc Iustin Pop

91 498ae1cc Iustin Pop
    """
92 498ae1cc Iustin Pop
    self.lock.acquire()
93 498ae1cc Iustin Pop
    result = self.job_queue.get(rid, None)
94 498ae1cc Iustin Pop
    self.lock.release()
95 498ae1cc Iustin Pop
    return result
96 7a1ecaed Iustin Pop
97 7a1ecaed Iustin Pop
  def query_jobs(self, fields, names):
98 7a1ecaed Iustin Pop
    """Query all jobs.
99 7a1ecaed Iustin Pop

100 7a1ecaed Iustin Pop
    The fields and names parameters are similar to the ones passed to
101 7a1ecaed Iustin Pop
    the OpQueryInstances.
102 7a1ecaed Iustin Pop

103 7a1ecaed Iustin Pop
    """
104 7a1ecaed Iustin Pop
    result = []
105 7a1ecaed Iustin Pop
    self.lock.acquire()
106 283439c9 Iustin Pop
    if names:
107 283439c9 Iustin Pop
      values = [self.job_queue[j_id] for j_id in names]
108 283439c9 Iustin Pop
    else:
109 283439c9 Iustin Pop
      values = self.job_queue.itervalues()
110 7a1ecaed Iustin Pop
    try:
111 283439c9 Iustin Pop
      for jobj in values:
112 7a1ecaed Iustin Pop
        row = []
113 7a1ecaed Iustin Pop
        jdata = jobj.data
114 7a1ecaed Iustin Pop
        for fname in fields:
115 7a1ecaed Iustin Pop
          if fname == "id":
116 7a1ecaed Iustin Pop
            row.append(jdata.job_id)
117 7a1ecaed Iustin Pop
          elif fname == "status":
118 7a1ecaed Iustin Pop
            row.append(jdata.status)
119 35049ff2 Iustin Pop
          elif fname == "op_list":
120 35049ff2 Iustin Pop
            row.append([op.__getstate__() for op in jdata.op_list])
121 35049ff2 Iustin Pop
          elif fname == "op_status":
122 35049ff2 Iustin Pop
            row.append(jdata.op_status)
123 35049ff2 Iustin Pop
          elif fname == "op_result":
124 35049ff2 Iustin Pop
            row.append(jdata.op_result)
125 7a1ecaed Iustin Pop
          else:
126 7a1ecaed Iustin Pop
            raise errors.OpExecError("Invalid job query field '%s'" %
127 7a1ecaed Iustin Pop
                                           fname)
128 7a1ecaed Iustin Pop
        result.append(row)
129 7a1ecaed Iustin Pop
    finally:
130 7a1ecaed Iustin Pop
      self.lock.release()
131 7a1ecaed Iustin Pop
    return result