4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
27 from ganeti import opcodes
28 from ganeti import errors
31 """In-memory job representation.
33 This is what we use to track the user-submitted jobs (which are of
37 def __init__(self, jid, jdesc):
39 jdesc.status = opcodes.Job.STATUS_PENDING
41 jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
42 jdesc.op_result = [None for i in jdesc.op_list]
43 self.lock = threading.Lock()
45 def SetStatus(self, status, result=None):
47 self.data.status = status
48 if result is not None:
49 self.data.op_result = result
54 #FIXME(iustin): make a deep copy of result
61 """Example queue implementation.
67 self.lock = threading.Lock()
68 self.new_queue = Queue.Queue()
71 """Add a new job to the queue.
73 This enters the job into our job queue and also puts it on the new
74 queue, in order for it to be picked up by the queue processors.
81 job = JobObject(rid, item)
82 self.job_queue[rid] = job
85 self.new_queue.put(job)
89 """Query a given job ID.
93 result = self.job_queue.get(rid, None)
97 def query_jobs(self, fields, names):
100 The fields and names parameters are similar to the ones passed to
101 the OpQueryInstances.
107 values = [self.job_queue[j_id] for j_id in names]
109 values = self.job_queue.itervalues()
116 row.append(jdata.job_id)
117 elif fname == "status":
118 row.append(jdata.status)
119 elif fname == "op_list":
120 row.append([op.__getstate__() for op in jdata.op_list])
121 elif fname == "op_status":
122 row.append(jdata.op_status)
123 elif fname == "op_result":
124 row.append(jdata.op_result)
126 raise errors.OpExecError("Invalid job query field '%s'" %