Add PID to all logs
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import logging
25 import threading
26
27 from ganeti import constants
28 from ganeti import workerpool
29 from ganeti import errors
30 from ganeti import mcpu
31 from ganeti import utils
32
33
34 JOBQUEUE_THREADS = 5
35
36
37 class _QueuedOpCode(object):
38   """Encasulates an opcode object.
39
40   Access is synchronized by the '_lock' attribute.
41
42   """
43   def __init__(self, op):
44     self.input = op
45     self.status = constants.OP_STATUS_QUEUED
46     self.result = None
47     self._lock = threading.Lock()
48
49   @utils.LockedMethod
50   def GetInput(self):
51     """Returns the original opcode.
52
53     """
54     return self.input
55
56   @utils.LockedMethod
57   def SetStatus(self, status, result):
58     """Update the opcode status and result.
59
60     """
61     self.status = status
62     self.result = result
63
64   @utils.LockedMethod
65   def GetStatus(self):
66     """Get the opcode status.
67
68     """
69     return self.status
70
71   @utils.LockedMethod
72   def GetResult(self):
73     """Get the opcode result.
74
75     """
76     return self.result
77
78
79 class _QueuedJob(object):
80   """In-memory job representation.
81
82   This is what we use to track the user-submitted jobs.
83
84   """
85   def __init__(self, ops, job_id):
86     if not ops:
87       # TODO
88       raise Exception("No opcodes")
89
90     self.id = job_id
91     self._lock = threading.Lock()
92
93     # _ops should not be modified again because we don't acquire the lock
94     # to use it.
95     self._ops = [_QueuedOpCode(op) for op in ops]
96
97   def GetStatus(self):
98     status = constants.JOB_STATUS_QUEUED
99
100     all_success = True
101     for op in self._ops:
102       op_status = op.GetStatus()
103       if op_status == constants.OP_STATUS_SUCCESS:
104         continue
105
106       all_success = False
107
108       if op_status == constants.OP_STATUS_QUEUED:
109         pass
110       elif op_status == constants.OP_STATUS_ERROR:
111         status = constants.JOB_STATUS_ERROR
112       elif op_status == constants.OP_STATUS_RUNNING:
113         status = constants.JOB_STATUS_RUNNING
114
115     if all_success:
116       status = constants.JOB_STATUS_SUCCESS
117
118     return status
119
120   def Run(self, proc):
121     """Job executor.
122
123     This functions processes a this job in the context of given processor
124     instance.
125
126     Args:
127     - proc: Ganeti Processor to run the job with
128
129     """
130     try:
131       count = len(self._ops)
132       for idx, op in enumerate(self._ops):
133         try:
134           logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
135           op.SetStatus(constants.OP_STATUS_RUNNING, None)
136
137           result = proc.ExecOpCode(op.input)
138
139           op.SetStatus(constants.OP_STATUS_SUCCESS, result)
140           logging.debug("Op %s/%s: Successfully finished %s",
141                         idx + 1, count, op)
142         except Exception, err:
143           op.SetStatus(constants.OP_STATUS_ERROR, str(err))
144           logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
145           raise
146
147     except errors.GenericError, err:
148       logging.error("ganeti exception %s", exc_info=err)
149     except Exception, err:
150       logging.error("unhandled exception %s", exc_info=err)
151     except:
152       logging.error("unhandled unknown exception %s", exc_info=err)
153
154
155 class _JobQueueWorker(workerpool.BaseWorker):
156   def RunTask(self, job):
157     logging.debug("Worker %s processing job %s",
158                   self.worker_id, job.id)
159     # TODO: feedback function
160     proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
161     try:
162       job.Run(proc)
163     finally:
164       logging.debug("Worker %s finished job %s, status = %s",
165                     self.worker_id, job.id, job.GetStatus())
166
167
168 class _JobQueueWorkerPool(workerpool.WorkerPool):
169   def __init__(self, context):
170     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
171                                               _JobQueueWorker)
172     self.context = context
173
174
175 class JobQueue:
176   """The job queue.
177
178    """
179   def __init__(self, context):
180     self._lock = threading.Lock()
181     self._last_job_id = 0
182     self._jobs = {}
183     self._wpool = _JobQueueWorkerPool(context)
184
185   def _NewJobIdUnlocked(self):
186     """Generates a new job identifier.
187
188     Returns: A string representing the job identifier.
189
190     """
191     self._last_job_id += 1
192     return str(self._last_job_id)
193
194   def SubmitJob(self, ops):
195     """Add a new job to the queue.
196
197     This enters the job into our job queue and also puts it on the new
198     queue, in order for it to be picked up by the queue processors.
199
200     Args:
201     - ops: Sequence of opcodes
202
203     """
204     # Get job identifier
205     self._lock.acquire()
206     try:
207       job_id = self._NewJobIdUnlocked()
208     finally:
209       self._lock.release()
210
211     job = _QueuedJob(ops, job_id)
212
213     # Add it to our internal queue
214     self._lock.acquire()
215     try:
216       self._jobs[job_id] = job
217     finally:
218       self._lock.release()
219
220     # Add to worker pool
221     self._wpool.AddTask(job)
222
223     return job_id
224
225   def ArchiveJob(self, job_id):
226     raise NotImplementedError()
227
228   def CancelJob(self, job_id):
229     raise NotImplementedError()
230
231   def _GetJobInfo(self, job, fields):
232     row = []
233     for fname in fields:
234       if fname == "id":
235         row.append(job.id)
236       elif fname == "status":
237         row.append(job.GetStatus())
238       elif fname == "ops":
239         row.append([op.GetInput().__getstate__() for op in job._ops])
240       elif fname == "opresult":
241         row.append([op.GetResult() for op in job._ops])
242       elif fname == "opstatus":
243         row.append([op.GetStatus() for op in job._ops])
244       else:
245         raise errors.OpExecError("Invalid job query field '%s'" % fname)
246     return row
247
248   def QueryJobs(self, job_ids, fields):
249     """Returns a list of jobs in queue.
250
251     Args:
252     - job_ids: Sequence of job identifiers or None for all
253     - fields: Names of fields to return
254
255     """
256     self._lock.acquire()
257     try:
258       if not job_ids:
259         job_ids = self._jobs.keys()
260
261       # TODO: define sort order?
262       job_ids.sort()
263
264       jobs = []
265
266       for job_id in job_ids:
267         job = self._jobs.get(job_id, None)
268         if job is None:
269           jobs.append(None)
270         else:
271           jobs.append(self._GetJobInfo(job, fields))
272
273       return jobs
274     finally:
275       self._lock.release()
276
277   def Shutdown(self):
278     """Stops the job queue.
279
280     """
281     self._wpool.TerminateWorkers()