Revision e2715f69

b/lib/jqueue.py
21 21

  
22 22
"""Module implementing the job queue handling."""
23 23

  
24
import threading
24
import logging
25 25
import Queue
26
import threading
26 27

  
28
from ganeti import constants
29
from ganeti import workerpool
27 30
from ganeti import opcodes
28 31
from ganeti import errors
32
from ganeti import mcpu
33

  
34

  
35
JOBQUEUE_THREADS = 5
36

  
29 37

  
30 38
class JobObject:
31 39
  """In-memory job representation.
......
129 137
    finally:
130 138
      self.lock.release()
131 139
    return result
140

  
141

  
142
class _QueuedOpCode(object):
143
  """Encasulates an opcode object.
144

  
145
  Access must be synchronized by using an external lock.
146

  
147
  """
148
  def __init__(self, op):
149
    self.input = op
150
    self.status = constants.OP_STATUS_QUEUED
151
    self.result = None
152

  
153

  
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

  
157
  This is what we use to track the user-submitted jobs.
158

  
159
  """
160
  def __init__(self, ops, job_id):
161
    if not ops:
162
      # TODO
163
      raise Exception("No opcodes")
164

  
165
    self.id = job_id
166
    self._lock = threading.Lock()
167

  
168
    # _ops should not be modified again because we don't acquire the lock
169
    # to use it.
170
    self._ops = [_QueuedOpCode(op) for op in ops]
171

  
172
  def _GetStatusUnlocked(self):
173
    status = constants.JOB_STATUS_QUEUED
174

  
175
    all_success = True
176
    for op in self._ops:
177
      if op.status == constants.OP_STATUS_SUCCESS:
178
        continue
179

  
180
      all_success = False
181

  
182
      if op.status == constants.OP_STATUS_QUEUED:
183
        pass
184
      elif op.status == constants.OP_STATUS_ERROR:
185
        status = constants.JOB_STATUS_ERROR
186
      elif op.status == constants.OP_STATUS_RUNNING:
187
        status = constants.JOB_STATUS_RUNNING
188

  
189
    if all_success:
190
      status = constants.JOB_STATUS_SUCCESS
191

  
192
    return status
193

  
194
  def GetStatus(self):
195
    self._lock.acquire()
196
    try:
197
      return self._GetStatusUnlocked()
198
    finally:
199
      self._lock.release()
200

  
201
  def Run(self, proc):
202
    """Job executor.
203

  
204
    This functions processes a this job in the context of given processor
205
    instance.
206

  
207
    Args:
208
    - proc: Ganeti Processor to run the job with
209

  
210
    """
211
    try:
212
      for op in self._ops:
213
        try:
214
          self._lock.acquire()
215
          try:
216
            op.status = constants.OP_STATUS_RUNNING
217
          finally:
218
            self._lock.release()
219

  
220
          result = proc.ExecOpCode(op.input)
221

  
222
          self._lock.acquire()
223
          try:
224
            op.status = constants.OP_STATUS_SUCCESS
225
            op.result = result
226
          finally:
227
            self._lock.release()
228
        except Exception, err:
229
          self._lock.acquire()
230
          try:
231
            op.status = constants.OP_STATUS_ERROR
232
            op.result = str(err)
233
          finally:
234
            self._lock.release()
235
          raise
236

  
237
    except errors.GenericError, err:
238
      logging.error("ganeti exception %s", exc_info=err)
239
    except Exception, err:
240
      logging.error("unhandled exception %s", exc_info=err)
241
    except:
242
      logging.error("unhandled unknown exception %s", exc_info=err)
243

  
244

  
245
class _JobQueueWorker(workerpool.BaseWorker):
246
  def RunTask(self, job):
247
    logging.debug("Worker %s processing job %s",
248
                  self.worker_id, job.id)
249
    # TODO: feedback function
250
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
251
    try:
252
      job.Run(proc)
253
    finally:
254
      logging.debug("Worker %s finished job %s, status = %s",
255
                    self.worker_id, job.id, job.GetStatus())
256

  
257

  
258
class _JobQueueWorkerPool(workerpool.WorkerPool):
259
  def __init__(self, context):
260
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
261
                                              _JobQueueWorker)
262
    self.context = context
263

  
264

  
265
class JobQueue:
266
  """The job queue.
267

  
268
   """
269
  def __init__(self, context):
270
    self._lock = threading.Lock()
271
    self._last_job_id = 0
272
    self._jobs = {}
273
    self._wpool = _JobQueueWorkerPool(context)
274

  
275
  def _NewJobIdUnlocked(self):
276
    """Generates a new job identifier.
277

  
278
    Returns: A string representing the job identifier.
279

  
280
    """
281
    self._last_job_id += 1
282
    return str(self._last_job_id)
283

  
284
  def SubmitJob(self, ops):
285
    """Add a new job to the queue.
286

  
287
    This enters the job into our job queue and also puts it on the new
288
    queue, in order for it to be picked up by the queue processors.
289

  
290
    Args:
291
    - ops: Sequence of opcodes
292

  
293
    """
294
    # Get job identifier
295
    self._lock.acquire()
296
    try:
297
      job_id = self._NewJobIdUnlocked()
298
    finally:
299
      self._lock.release()
300

  
301
    job = _QueuedJob(ops, job_id)
302

  
303
    # Add it to our internal queue
304
    self._lock.acquire()
305
    try:
306
      self._jobs[job_id] = job
307
    finally:
308
      self._lock.release()
309

  
310
    # Add to worker pool
311
    self._wpool.AddTask(job)
312

  
313
    return job_id
314

  
315
  def ArchiveJob(self, job_id):
316
    raise NotImplementedError()
317

  
318
  def CancelJob(self, job_id):
319
    raise NotImplementedError()
320

  
321
  def _GetJobInfo(self, job, fields):
322
    row = []
323
    for fname in fields:
324
      if fname == "id":
325
        row.append(job.id)
326
      elif fname == "status":
327
        row.append(job.GetStatus())
328
      elif fname == "result":
329
        # TODO
330
        row.append(map(lambda op: op.result, job._ops))
331
      else:
332
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
333
    return row
334

  
335
  def QueryJobs(self, job_ids, fields):
336
    """Returns a list of jobs in queue.
337

  
338
    Args:
339
    - job_ids: Sequence of job identifiers or None for all
340
    - fields: Names of fields to return
341

  
342
    """
343
    self._lock.acquire()
344
    try:
345
      if not job_ids:
346
        job_ids = self._jobs.keys()
347

  
348
      # TODO: define sort order?
349
      job_ids.sort()
350

  
351
      jobs = []
352

  
353
      for job_id in job_ids:
354
        job = self._jobs.get(job_id, None)
355
        if job is None:
356
          jobs.append(None)
357
        else:
358
          jobs.append(self._GetJobInfo(job, fields))
359

  
360
      return jobs
361
    finally:
362
      self._lock.release()
363

  
364
  def Shutdown(self):
365
    """Stops the job queue.
366

  
367
    """
368
    self._wpool.TerminateWorkers()

Also available in: Unified diff