Small code style fix
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import threading
25 import Queue
26
27 from ganeti import opcodes
28 from ganeti import errors
29
30 class JobObject:
31   """In-memory job representation.
32
33   This is what we use to track the user-submitted jobs (which are of
34   class opcodes.Job).
35
36   """
37   def __init__(self, jid, jdesc):
38     self.data = jdesc
39     jdesc.status = opcodes.Job.STATUS_PENDING
40     jdesc.job_id = jid
41     self.lock = threading.Lock()
42
43   def SetStatus(self, status, result=None):
44     self.lock.acquire()
45     self.data.status = status
46     if result is not None:
47       self.data.result = result
48     self.lock.release()
49
50   def GetData(self):
51     self.lock.acquire()
52     #FIXME(iustin): make a deep copy of result
53     result = self.data
54     self.lock.release()
55     return result
56
57
58 class QueueManager:
59   """Example queue implementation.
60
61   """
62   def __init__(self):
63     self.job_queue = {}
64     self.jid = 1
65     self.lock = threading.Lock()
66     self.new_queue = Queue.Queue()
67
68   def put(self, item):
69     """Add a new job to the queue.
70
71     This enters the job into our job queue and also puts it on the new
72     queue, in order for it to be picked up by the queue processors.
73
74     """
75     self.lock.acquire()
76     try:
77       rid = self.jid
78       self.jid += 1
79       job = JobObject(rid, item)
80       self.job_queue[rid] = job
81     finally:
82       self.lock.release()
83     self.new_queue.put(job)
84     return rid
85
86   def query(self, rid):
87     """Query a given job ID.
88
89     """
90     self.lock.acquire()
91     result = self.job_queue.get(rid, None)
92     self.lock.release()
93     return result
94
95   def query_jobs(self, fields, names):
96     """Query all jobs.
97
98     The fields and names parameters are similar to the ones passed to
99     the OpQueryInstances.
100
101     """
102     result = []
103     self.lock.acquire()
104     if names:
105       values = [self.job_queue[j_id] for j_id in names]
106     else:
107       values = self.job_queue.itervalues()
108     try:
109       for jobj in values:
110         row = []
111         jdata = jobj.data
112         for fname in fields:
113           if fname == "id":
114             row.append(jdata.job_id)
115           elif fname == "status":
116             row.append(jdata.status)
117           elif fname == "opcodes":
118             row.append(",".join([op.OP_ID for op in jdata.op_list]))
119           else:
120             raise errors.OpExecError("Invalid job query field '%s'" %
121                                            fname)
122         result.append(row)
123     finally:
124       self.lock.release()
125     return result