Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ e92376d7

History | View | Annotate | Download (23.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48

    
49
JOBQUEUE_THREADS = 25
50

    
51

    
52
def TimeStampNow():
53
  return utils.SplitTime(time.time())
54

    
55

    
56
class _QueuedOpCode(object):
57
  """Encasulates an opcode object.
58

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

62
  """
63
  def __init__(self, op):
64
    self.input = op
65
    self.status = constants.OP_STATUS_QUEUED
66
    self.result = None
67
    self.log = []
68
    self.start_timestamp = None
69
    self.end_timestamp = None
70

    
71
  @classmethod
72
  def Restore(cls, state):
73
    obj = _QueuedOpCode.__new__(cls)
74
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75
    obj.status = state["status"]
76
    obj.result = state["result"]
77
    obj.log = state["log"]
78
    obj.start_timestamp = state.get("start_timestamp", None)
79
    obj.end_timestamp = state.get("end_timestamp", None)
80
    return obj
81

    
82
  def Serialize(self):
83
    return {
84
      "input": self.input.__getstate__(),
85
      "status": self.status,
86
      "result": self.result,
87
      "log": self.log,
88
      "start_timestamp": self.start_timestamp,
89
      "end_timestamp": self.end_timestamp,
90
      }
91

    
92

    
93
class _QueuedJob(object):
94
  """In-memory job representation.
95

96
  This is what we use to track the user-submitted jobs. Locking must be taken
97
  care of by users of this class.
98

99
  """
100
  def __init__(self, queue, job_id, ops):
101
    if not ops:
102
      # TODO
103
      raise Exception("No opcodes")
104

    
105
    self.queue = queue
106
    self.id = job_id
107
    self.ops = [_QueuedOpCode(op) for op in ops]
108
    self.run_op_index = -1
109
    self.log_serial = 0
110
    self.received_timestamp = TimeStampNow()
111
    self.start_timestamp = None
112
    self.end_timestamp = None
113

    
114
    # Condition to wait for changes
115
    self.change = threading.Condition(self.queue._lock)
116

    
117
  @classmethod
118
  def Restore(cls, queue, state):
119
    obj = _QueuedJob.__new__(cls)
120
    obj.queue = queue
121
    obj.id = state["id"]
122
    obj.run_op_index = state["run_op_index"]
123
    obj.received_timestamp = state.get("received_timestamp", None)
124
    obj.start_timestamp = state.get("start_timestamp", None)
125
    obj.end_timestamp = state.get("end_timestamp", None)
126

    
127
    obj.ops = []
128
    obj.log_serial = 0
129
    for op_state in state["ops"]:
130
      op = _QueuedOpCode.Restore(op_state)
131
      for log_entry in op.log:
132
        obj.log_serial = max(obj.log_serial, log_entry[0])
133
      obj.ops.append(op)
134

    
135
    # Condition to wait for changes
136
    obj.change = threading.Condition(obj.queue._lock)
137

    
138
    return obj
139

    
140
  def Serialize(self):
141
    return {
142
      "id": self.id,
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145
      "start_timestamp": self.start_timestamp,
146
      "end_timestamp": self.end_timestamp,
147
      "received_timestamp": self.received_timestamp,
148
      }
149

    
150
  def CalcStatus(self):
151
    status = constants.JOB_STATUS_QUEUED
152

    
153
    all_success = True
154
    for op in self.ops:
155
      if op.status == constants.OP_STATUS_SUCCESS:
156
        continue
157

    
158
      all_success = False
159

    
160
      if op.status == constants.OP_STATUS_QUEUED:
161
        pass
162
      elif op.status == constants.OP_STATUS_WAITLOCK:
163
        status = constants.JOB_STATUS_WAITLOCK
164
      elif op.status == constants.OP_STATUS_RUNNING:
165
        status = constants.JOB_STATUS_RUNNING
166
      elif op.status == constants.OP_STATUS_ERROR:
167
        status = constants.JOB_STATUS_ERROR
168
        # The whole job fails if one opcode failed
169
        break
170
      elif op.status == constants.OP_STATUS_CANCELED:
171
        status = constants.OP_STATUS_CANCELED
172
        break
173

    
174
    if all_success:
175
      status = constants.JOB_STATUS_SUCCESS
176

    
177
    return status
178

    
179
  def GetLogEntries(self, newer_than):
180
    if newer_than is None:
181
      serial = -1
182
    else:
183
      serial = newer_than
184

    
185
    entries = []
186
    for op in self.ops:
187
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
188

    
189
    return entries
190

    
191

    
192
class _JobQueueWorker(workerpool.BaseWorker):
193
  def _NotifyStart(self):
194
    """Mark the opcode as running, not lock-waiting.
195

196
    This is called from the mcpu code as a notifier function, when the
197
    LU is finally about to start the Exec() method. Of course, to have
198
    end-user visible results, the opcode must be initially (before
199
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
200

201
    """
202
    assert self.queue, "Queue attribute is missing"
203
    assert self.opcode, "Opcode attribute is missing"
204

    
205
    self.queue.acquire()
206
    try:
207
      self.opcode.status = constants.OP_STATUS_RUNNING
208
    finally:
209
      self.queue.release()
210

    
211
  def RunTask(self, job):
212
    """Job executor.
213

214
    This functions processes a job. It is closely tied to the _QueuedJob and
215
    _QueuedOpCode classes.
216

217
    """
218
    logging.debug("Worker %s processing job %s",
219
                  self.worker_id, job.id)
220
    proc = mcpu.Processor(self.pool.queue.context)
221
    self.queue = queue = job.queue
222
    try:
223
      try:
224
        count = len(job.ops)
225
        for idx, op in enumerate(job.ops):
226
          try:
227
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
228

    
229
            queue.acquire()
230
            try:
231
              job.run_op_index = idx
232
              op.status = constants.OP_STATUS_WAITLOCK
233
              op.result = None
234
              op.start_timestamp = TimeStampNow()
235
              if idx == 0: # first opcode
236
                job.start_timestamp = op.start_timestamp
237
              queue.UpdateJobUnlocked(job)
238

    
239
              input_opcode = op.input
240
            finally:
241
              queue.release()
242

    
243
            def _Log(*args):
244
              """Append a log entry.
245

246
              """
247
              assert len(args) < 3
248

    
249
              if len(args) == 1:
250
                log_type = constants.ELOG_MESSAGE
251
                log_msg = args[0]
252
              else:
253
                log_type, log_msg = args
254

    
255
              # The time is split to make serialization easier and not lose
256
              # precision.
257
              timestamp = utils.SplitTime(time.time())
258

    
259
              queue.acquire()
260
              try:
261
                job.log_serial += 1
262
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
263

    
264
                job.change.notifyAll()
265
              finally:
266
                queue.release()
267

    
268
            # Make sure not to hold lock while _Log is called
269
            self.opcode = op
270
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
271

    
272
            queue.acquire()
273
            try:
274
              op.status = constants.OP_STATUS_SUCCESS
275
              op.result = result
276
              op.end_timestamp = TimeStampNow()
277
              queue.UpdateJobUnlocked(job)
278
            finally:
279
              queue.release()
280

    
281
            logging.debug("Op %s/%s: Successfully finished %s",
282
                          idx + 1, count, op)
283
          except Exception, err:
284
            queue.acquire()
285
            try:
286
              try:
287
                op.status = constants.OP_STATUS_ERROR
288
                op.result = str(err)
289
                op.end_timestamp = TimeStampNow()
290
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
291
              finally:
292
                queue.UpdateJobUnlocked(job)
293
            finally:
294
              queue.release()
295
            raise
296

    
297
      except errors.GenericError, err:
298
        logging.exception("Ganeti exception")
299
      except:
300
        logging.exception("Unhandled exception")
301
    finally:
302
      queue.acquire()
303
      try:
304
        try:
305
          job.run_op_idx = -1
306
          job.end_timestamp = TimeStampNow()
307
          queue.UpdateJobUnlocked(job)
308
        finally:
309
          job_id = job.id
310
          status = job.CalcStatus()
311
      finally:
312
        queue.release()
313
      logging.debug("Worker %s finished job %s, status = %s",
314
                    self.worker_id, job_id, status)
315

    
316

    
317
class _JobQueueWorkerPool(workerpool.WorkerPool):
318
  def __init__(self, queue):
319
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
320
                                              _JobQueueWorker)
321
    self.queue = queue
322

    
323

    
324
class JobQueue(object):
325
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
326

    
327
  def _RequireOpenQueue(fn):
328
    """Decorator for "public" functions.
329

330
    This function should be used for all "public" functions. That is, functions
331
    usually called from other classes.
332

333
    Important: Use this decorator only after utils.LockedMethod!
334

335
    Example:
336
      @utils.LockedMethod
337
      @_RequireOpenQueue
338
      def Example(self):
339
        pass
340

341
    """
342
    def wrapper(self, *args, **kwargs):
343
      assert self._queue_lock is not None, "Queue should be open"
344
      return fn(self, *args, **kwargs)
345
    return wrapper
346

    
347
  def __init__(self, context):
348
    self.context = context
349
    self._memcache = weakref.WeakValueDictionary()
350
    self._my_hostname = utils.HostInfo().name
351

    
352
    # Locking
353
    self._lock = threading.Lock()
354
    self.acquire = self._lock.acquire
355
    self.release = self._lock.release
356

    
357
    # Initialize
358
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
359

    
360
    # Read serial file
361
    self._last_serial = jstore.ReadSerial()
362
    assert self._last_serial is not None, ("Serial file was modified between"
363
                                           " check in jstore and here")
364

    
365
    # Get initial list of nodes
366
    self._nodes = set(self.context.cfg.GetNodeList())
367

    
368
    # Remove master node
369
    try:
370
      self._nodes.remove(self._my_hostname)
371
    except ValueError:
372
      pass
373

    
374
    # TODO: Check consistency across nodes
375

    
376
    # Setup worker pool
377
    self._wpool = _JobQueueWorkerPool(self)
378

    
379
    # We need to lock here because WorkerPool.AddTask() may start a job while
380
    # we're still doing our work.
381
    self.acquire()
382
    try:
383
      for job in self._GetJobsUnlocked(None):
384
        status = job.CalcStatus()
385

    
386
        if status in (constants.JOB_STATUS_QUEUED, ):
387
          self._wpool.AddTask(job)
388

    
389
        elif status in (constants.JOB_STATUS_RUNNING,
390
                        constants.JOB_STATUS_WAITLOCK):
391
          logging.warning("Unfinished job %s found: %s", job.id, job)
392
          try:
393
            for op in job.ops:
394
              op.status = constants.OP_STATUS_ERROR
395
              op.result = "Unclean master daemon shutdown"
396
          finally:
397
            self.UpdateJobUnlocked(job)
398
    finally:
399
      self.release()
400

    
401
  @utils.LockedMethod
402
  @_RequireOpenQueue
403
  def AddNode(self, node_name):
404
    assert node_name != self._my_hostname
405

    
406
    # Clean queue directory on added node
407
    rpc.call_jobqueue_purge(node_name)
408

    
409
    # Upload the whole queue excluding archived jobs
410
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
411

    
412
    # Upload current serial file
413
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
414

    
415
    for file_name in files:
416
      # Read file content
417
      fd = open(file_name, "r")
418
      try:
419
        content = fd.read()
420
      finally:
421
        fd.close()
422

    
423
      result = rpc.call_jobqueue_update([node_name], file_name, content)
424
      if not result[node_name]:
425
        logging.error("Failed to upload %s to %s", file_name, node_name)
426

    
427
    self._nodes.add(node_name)
428

    
429
  @utils.LockedMethod
430
  @_RequireOpenQueue
431
  def RemoveNode(self, node_name):
432
    try:
433
      # The queue is removed by the "leave node" RPC call.
434
      self._nodes.remove(node_name)
435
    except KeyError:
436
      pass
437

    
438
  def _CheckRpcResult(self, result, nodes, failmsg):
439
    failed = []
440
    success = []
441

    
442
    for node in nodes:
443
      if result[node]:
444
        success.append(node)
445
      else:
446
        failed.append(node)
447

    
448
    if failed:
449
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
450

    
451
    # +1 for the master node
452
    if (len(success) + 1) < len(failed):
453
      # TODO: Handle failing nodes
454
      logging.error("More than half of the nodes failed")
455

    
456
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
457
    """Writes a file locally and then replicates it to all nodes.
458

459
    """
460
    utils.WriteFile(file_name, data=data)
461

    
462
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
463
    self._CheckRpcResult(result, self._nodes,
464
                         "Updating %s" % file_name)
465

    
466
  def _RenameFileUnlocked(self, old, new):
467
    os.rename(old, new)
468

    
469
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
470
    self._CheckRpcResult(result, self._nodes,
471
                         "Moving %s to %s" % (old, new))
472

    
473
  def _FormatJobID(self, job_id):
474
    if not isinstance(job_id, (int, long)):
475
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
476
    if job_id < 0:
477
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
478

    
479
    return str(job_id)
480

    
481
  def _NewSerialUnlocked(self):
482
    """Generates a new job identifier.
483

484
    Job identifiers are unique during the lifetime of a cluster.
485

486
    Returns: A string representing the job identifier.
487

488
    """
489
    # New number
490
    serial = self._last_serial + 1
491

    
492
    # Write to file
493
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
494
                                        "%s\n" % serial)
495

    
496
    # Keep it only if we were able to write the file
497
    self._last_serial = serial
498

    
499
    return self._FormatJobID(serial)
500

    
501
  @staticmethod
502
  def _GetJobPath(job_id):
503
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
504

    
505
  @staticmethod
506
  def _GetArchivedJobPath(job_id):
507
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
508

    
509
  @classmethod
510
  def _ExtractJobID(cls, name):
511
    m = cls._RE_JOB_FILE.match(name)
512
    if m:
513
      return m.group(1)
514
    else:
515
      return None
516

    
517
  def _GetJobIDsUnlocked(self, archived=False):
518
    """Return all known job IDs.
519

520
    If the parameter archived is True, archived jobs IDs will be
521
    included. Currently this argument is unused.
522

523
    The method only looks at disk because it's a requirement that all
524
    jobs are present on disk (so in the _memcache we don't have any
525
    extra IDs).
526

527
    """
528
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
529
    jlist = utils.NiceSort(jlist)
530
    return jlist
531

    
532
  def _ListJobFiles(self):
533
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
534
            if self._RE_JOB_FILE.match(name)]
535

    
536
  def _LoadJobUnlocked(self, job_id):
537
    job = self._memcache.get(job_id, None)
538
    if job:
539
      logging.debug("Found job %s in memcache", job_id)
540
      return job
541

    
542
    filepath = self._GetJobPath(job_id)
543
    logging.debug("Loading job from %s", filepath)
544
    try:
545
      fd = open(filepath, "r")
546
    except IOError, err:
547
      if err.errno in (errno.ENOENT, ):
548
        return None
549
      raise
550
    try:
551
      data = serializer.LoadJson(fd.read())
552
    finally:
553
      fd.close()
554

    
555
    job = _QueuedJob.Restore(self, data)
556
    self._memcache[job_id] = job
557
    logging.debug("Added job %s to the cache", job_id)
558
    return job
559

    
560
  def _GetJobsUnlocked(self, job_ids):
561
    if not job_ids:
562
      job_ids = self._GetJobIDsUnlocked()
563

    
564
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
565

    
566
  @utils.LockedMethod
567
  @_RequireOpenQueue
568
  def SubmitJob(self, ops):
569
    """Create and store a new job.
570

571
    This enters the job into our job queue and also puts it on the new
572
    queue, in order for it to be picked up by the queue processors.
573

574
    @type ops: list
575
    @param ops: The list of OpCodes that will become the new job.
576

577
    """
578
    # Get job identifier
579
    job_id = self._NewSerialUnlocked()
580
    job = _QueuedJob(self, job_id, ops)
581

    
582
    # Write to disk
583
    self.UpdateJobUnlocked(job)
584

    
585
    logging.debug("Adding new job %s to the cache", job_id)
586
    self._memcache[job_id] = job
587

    
588
    # Add to worker pool
589
    self._wpool.AddTask(job)
590

    
591
    return job.id
592

    
593
  @_RequireOpenQueue
594
  def UpdateJobUnlocked(self, job):
595
    filename = self._GetJobPath(job.id)
596
    data = serializer.DumpJson(job.Serialize(), indent=False)
597
    logging.debug("Writing job %s to %s", job.id, filename)
598
    self._WriteAndReplicateFileUnlocked(filename, data)
599

    
600
    # Notify waiters about potential changes
601
    job.change.notifyAll()
602

    
603
  @utils.LockedMethod
604
  @_RequireOpenQueue
605
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
606
                        timeout):
607
    """Waits for changes in a job.
608

609
    @type job_id: string
610
    @param job_id: Job identifier
611
    @type fields: list of strings
612
    @param fields: Which fields to check for changes
613
    @type prev_job_info: list or None
614
    @param prev_job_info: Last job information returned
615
    @type prev_log_serial: int
616
    @param prev_log_serial: Last job message serial number
617
    @type timeout: float
618
    @param timeout: maximum time to wait
619

620
    """
621
    logging.debug("Waiting for changes in job %s", job_id)
622
    end_time = time.time() + timeout
623
    while True:
624
      delta_time = end_time - time.time()
625
      if delta_time < 0:
626
        return constants.JOB_NOTCHANGED
627

    
628
      job = self._LoadJobUnlocked(job_id)
629
      if not job:
630
        logging.debug("Job %s not found", job_id)
631
        break
632

    
633
      status = job.CalcStatus()
634
      job_info = self._GetJobInfoUnlocked(job, fields)
635
      log_entries = job.GetLogEntries(prev_log_serial)
636

    
637
      # Serializing and deserializing data can cause type changes (e.g. from
638
      # tuple to list) or precision loss. We're doing it here so that we get
639
      # the same modifications as the data received from the client. Without
640
      # this, the comparison afterwards might fail without the data being
641
      # significantly different.
642
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
643
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
644

    
645
      if status not in (constants.JOB_STATUS_QUEUED,
646
                        constants.JOB_STATUS_RUNNING,
647
                        constants.JOB_STATUS_WAITLOCK):
648
        # Don't even try to wait if the job is no longer running, there will be
649
        # no changes.
650
        break
651

    
652
      if (prev_job_info != job_info or
653
          (log_entries and prev_log_serial != log_entries[0][0])):
654
        break
655

    
656
      logging.debug("Waiting again")
657

    
658
      # Release the queue lock while waiting
659
      job.change.wait(delta_time)
660

    
661
    logging.debug("Job %s changed", job_id)
662

    
663
    return (job_info, log_entries)
664

    
665
  @utils.LockedMethod
666
  @_RequireOpenQueue
667
  def CancelJob(self, job_id):
668
    """Cancels a job.
669

670
    @type job_id: string
671
    @param job_id: Job ID of job to be cancelled.
672

673
    """
674
    logging.debug("Cancelling job %s", job_id)
675

    
676
    job = self._LoadJobUnlocked(job_id)
677
    if not job:
678
      logging.debug("Job %s not found", job_id)
679
      return
680

    
681
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
682
      logging.debug("Job %s is no longer in the queue", job.id)
683
      return
684

    
685
    try:
686
      for op in job.ops:
687
        op.status = constants.OP_STATUS_ERROR
688
        op.result = "Job cancelled by request"
689
    finally:
690
      self.UpdateJobUnlocked(job)
691

    
692
  @_RequireOpenQueue
693
  def _ArchiveJobUnlocked(self, job_id):
694
    """Archives a job.
695

696
    @type job_id: string
697
    @param job_id: Job ID of job to be archived.
698

699
    """
700
    logging.info("Archiving job %s", job_id)
701

    
702
    job = self._LoadJobUnlocked(job_id)
703
    if not job:
704
      logging.debug("Job %s not found", job_id)
705
      return
706

    
707
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
708
                                constants.JOB_STATUS_SUCCESS,
709
                                constants.JOB_STATUS_ERROR):
710
      logging.debug("Job %s is not yet done", job.id)
711
      return
712

    
713
    old = self._GetJobPath(job.id)
714
    new = self._GetArchivedJobPath(job.id)
715

    
716
    self._RenameFileUnlocked(old, new)
717

    
718
    logging.debug("Successfully archived job %s", job.id)
719

    
720
  @utils.LockedMethod
721
  @_RequireOpenQueue
722
  def ArchiveJob(self, job_id):
723
    """Archives a job.
724

725
    @type job_id: string
726
    @param job_id: Job ID of job to be archived.
727

728
    """
729
    return self._ArchiveJobUnlocked(job_id)
730

    
731
  @utils.LockedMethod
732
  @_RequireOpenQueue
733
  def AutoArchiveJobs(self, age):
734
    """Archives all jobs based on age.
735

736
    The method will archive all jobs which are older than the age
737
    parameter. For jobs that don't have an end timestamp, the start
738
    timestamp will be considered. The special '-1' age will cause
739
    archival of all jobs (that are not running or queued).
740

741
    @type age: int
742
    @param age: the minimum age in seconds
743

744
    """
745
    logging.info("Archiving jobs with age more than %s seconds", age)
746

    
747
    now = time.time()
748
    for jid in self._GetJobIDsUnlocked(archived=False):
749
      job = self._LoadJobUnlocked(jid)
750
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
751
                                  constants.OP_STATUS_ERROR,
752
                                  constants.OP_STATUS_CANCELED):
753
        continue
754
      if job.end_timestamp is None:
755
        if job.start_timestamp is None:
756
          job_age = job.received_timestamp
757
        else:
758
          job_age = job.start_timestamp
759
      else:
760
        job_age = job.end_timestamp
761

    
762
      if age == -1 or now - job_age[0] > age:
763
        self._ArchiveJobUnlocked(jid)
764

    
765
  def _GetJobInfoUnlocked(self, job, fields):
766
    row = []
767
    for fname in fields:
768
      if fname == "id":
769
        row.append(job.id)
770
      elif fname == "status":
771
        row.append(job.CalcStatus())
772
      elif fname == "ops":
773
        row.append([op.input.__getstate__() for op in job.ops])
774
      elif fname == "opresult":
775
        row.append([op.result for op in job.ops])
776
      elif fname == "opstatus":
777
        row.append([op.status for op in job.ops])
778
      elif fname == "oplog":
779
        row.append([op.log for op in job.ops])
780
      elif fname == "opstart":
781
        row.append([op.start_timestamp for op in job.ops])
782
      elif fname == "opend":
783
        row.append([op.end_timestamp for op in job.ops])
784
      elif fname == "received_ts":
785
        row.append(job.received_timestamp)
786
      elif fname == "start_ts":
787
        row.append(job.start_timestamp)
788
      elif fname == "end_ts":
789
        row.append(job.end_timestamp)
790
      elif fname == "summary":
791
        row.append([op.input.Summary() for op in job.ops])
792
      else:
793
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
794
    return row
795

    
796
  @utils.LockedMethod
797
  @_RequireOpenQueue
798
  def QueryJobs(self, job_ids, fields):
799
    """Returns a list of jobs in queue.
800

801
    Args:
802
    - job_ids: Sequence of job identifiers or None for all
803
    - fields: Names of fields to return
804

805
    """
806
    jobs = []
807

    
808
    for job in self._GetJobsUnlocked(job_ids):
809
      if job is None:
810
        jobs.append(None)
811
      else:
812
        jobs.append(self._GetJobInfoUnlocked(job, fields))
813

    
814
    return jobs
815

    
816
  @utils.LockedMethod
817
  @_RequireOpenQueue
818
  def Shutdown(self):
819
    """Stops the job queue.
820

821
    """
822
    self._wpool.TerminateWorkers()
823

    
824
    self._queue_lock.Close()
825
    self._queue_lock = None