4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the job queue handling."""
27 from ganeti import constants
28 from ganeti import workerpool
29 from ganeti import errors
30 from ganeti import mcpu
31 from ganeti import utils
37 class _QueuedOpCode(object):
38 """Encasulates an opcode object.
40 Access is synchronized by the '_lock' attribute.
43 def __init__(self, op):
45 self.status = constants.OP_STATUS_QUEUED
47 self._lock = threading.Lock()
51 """Returns the original opcode.
57 def SetStatus(self, status, result):
58 """Update the opcode status and result.
66 """Get the opcode status.
73 """Get the opcode result.
79 class _QueuedJob(object):
80 """In-memory job representation.
82 This is what we use to track the user-submitted jobs.
85 def __init__(self, ops, job_id):
88 raise Exception("No opcodes")
91 self._lock = threading.Lock()
93 # _ops should not be modified again because we don't acquire the lock
95 self._ops = [_QueuedOpCode(op) for op in ops]
98 status = constants.JOB_STATUS_QUEUED
102 op_status = op.GetStatus()
103 if op_status == constants.OP_STATUS_SUCCESS:
108 if op_status == constants.OP_STATUS_QUEUED:
110 elif op_status == constants.OP_STATUS_ERROR:
111 status = constants.JOB_STATUS_ERROR
112 elif op_status == constants.OP_STATUS_RUNNING:
113 status = constants.JOB_STATUS_RUNNING
116 status = constants.JOB_STATUS_SUCCESS
123 This functions processes a this job in the context of given processor
127 - proc: Ganeti Processor to run the job with
131 count = len(self._ops)
132 for idx, op in enumerate(self._ops):
134 logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
135 op.SetStatus(constants.OP_STATUS_RUNNING, None)
137 result = proc.ExecOpCode(op.input)
139 op.SetStatus(constants.OP_STATUS_SUCCESS, result)
140 logging.debug("Op %s/%s: Successfully finished %s",
142 except Exception, err:
143 op.SetStatus(constants.OP_STATUS_ERROR, str(err))
144 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
147 except errors.GenericError, err:
148 logging.error("ganeti exception %s", exc_info=err)
149 except Exception, err:
150 logging.error("unhandled exception %s", exc_info=err)
152 logging.error("unhandled unknown exception %s", exc_info=err)
155 class _JobQueueWorker(workerpool.BaseWorker):
156 def RunTask(self, job):
157 logging.debug("Worker %s processing job %s",
158 self.worker_id, job.id)
159 # TODO: feedback function
160 proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
164 logging.debug("Worker %s finished job %s, status = %s",
165 self.worker_id, job.id, job.GetStatus())
168 class _JobQueueWorkerPool(workerpool.WorkerPool):
169 def __init__(self, context):
170 super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
172 self.context = context
179 def __init__(self, context):
180 self._lock = threading.Lock()
181 self._last_job_id = 0
183 self._wpool = _JobQueueWorkerPool(context)
185 def _NewJobIdUnlocked(self):
186 """Generates a new job identifier.
188 Returns: A string representing the job identifier.
191 self._last_job_id += 1
192 return str(self._last_job_id)
194 def SubmitJob(self, ops):
195 """Add a new job to the queue.
197 This enters the job into our job queue and also puts it on the new
198 queue, in order for it to be picked up by the queue processors.
201 - ops: Sequence of opcodes
207 job_id = self._NewJobIdUnlocked()
211 job = _QueuedJob(ops, job_id)
213 # Add it to our internal queue
216 self._jobs[job_id] = job
221 self._wpool.AddTask(job)
225 def ArchiveJob(self, job_id):
226 raise NotImplementedError()
228 def CancelJob(self, job_id):
229 raise NotImplementedError()
231 def _GetJobInfo(self, job, fields):
236 elif fname == "status":
237 row.append(job.GetStatus())
239 row.append([op.GetInput().__getstate__() for op in job._ops])
240 elif fname == "opresult":
241 row.append([op.GetResult() for op in job._ops])
242 elif fname == "opstatus":
243 row.append([op.GetStatus() for op in job._ops])
245 raise errors.OpExecError("Invalid job query field '%s'" % fname)
248 def QueryJobs(self, job_ids, fields):
249 """Returns a list of jobs in queue.
252 - job_ids: Sequence of job identifiers or None for all
253 - fields: Names of fields to return
259 job_ids = self._jobs.keys()
261 # TODO: define sort order?
266 for job_id in job_ids:
267 job = self._jobs.get(job_id, None)
271 jobs.append(self._GetJobInfo(job, fields))
278 """Stops the job queue.
281 self._wpool.TerminateWorkers()