Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 94ed59a5

History | View | Annotate | Download (24.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48
from ganeti.rpc import RpcRunner
49

    
50
JOBQUEUE_THREADS = 25
51

    
52

    
53
def TimeStampNow():
54
  return utils.SplitTime(time.time())
55

    
56

    
57
class _QueuedOpCode(object):
58
  """Encasulates an opcode object.
59

60
  The 'log' attribute holds the execution log and consists of tuples
61
  of the form (log_serial, timestamp, level, message).
62

63
  """
64
  def __init__(self, op):
65
    self.input = op
66
    self.status = constants.OP_STATUS_QUEUED
67
    self.result = None
68
    self.log = []
69
    self.start_timestamp = None
70
    self.end_timestamp = None
71

    
72
  @classmethod
73
  def Restore(cls, state):
74
    obj = _QueuedOpCode.__new__(cls)
75
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76
    obj.status = state["status"]
77
    obj.result = state["result"]
78
    obj.log = state["log"]
79
    obj.start_timestamp = state.get("start_timestamp", None)
80
    obj.end_timestamp = state.get("end_timestamp", None)
81
    return obj
82

    
83
  def Serialize(self):
84
    return {
85
      "input": self.input.__getstate__(),
86
      "status": self.status,
87
      "result": self.result,
88
      "log": self.log,
89
      "start_timestamp": self.start_timestamp,
90
      "end_timestamp": self.end_timestamp,
91
      }
92

    
93

    
94
class _QueuedJob(object):
95
  """In-memory job representation.
96

97
  This is what we use to track the user-submitted jobs. Locking must be taken
98
  care of by users of this class.
99

100
  """
101
  def __init__(self, queue, job_id, ops):
102
    if not ops:
103
      # TODO
104
      raise Exception("No opcodes")
105

    
106
    self.queue = queue
107
    self.id = job_id
108
    self.ops = [_QueuedOpCode(op) for op in ops]
109
    self.run_op_index = -1
110
    self.log_serial = 0
111
    self.received_timestamp = TimeStampNow()
112
    self.start_timestamp = None
113
    self.end_timestamp = None
114

    
115
    # Condition to wait for changes
116
    self.change = threading.Condition(self.queue._lock)
117

    
118
  @classmethod
119
  def Restore(cls, queue, state):
120
    obj = _QueuedJob.__new__(cls)
121
    obj.queue = queue
122
    obj.id = state["id"]
123
    obj.run_op_index = state["run_op_index"]
124
    obj.received_timestamp = state.get("received_timestamp", None)
125
    obj.start_timestamp = state.get("start_timestamp", None)
126
    obj.end_timestamp = state.get("end_timestamp", None)
127

    
128
    obj.ops = []
129
    obj.log_serial = 0
130
    for op_state in state["ops"]:
131
      op = _QueuedOpCode.Restore(op_state)
132
      for log_entry in op.log:
133
        obj.log_serial = max(obj.log_serial, log_entry[0])
134
      obj.ops.append(op)
135

    
136
    # Condition to wait for changes
137
    obj.change = threading.Condition(obj.queue._lock)
138

    
139
    return obj
140

    
141
  def Serialize(self):
142
    return {
143
      "id": self.id,
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146
      "start_timestamp": self.start_timestamp,
147
      "end_timestamp": self.end_timestamp,
148
      "received_timestamp": self.received_timestamp,
149
      }
150

    
151
  def CalcStatus(self):
152
    status = constants.JOB_STATUS_QUEUED
153

    
154
    all_success = True
155
    for op in self.ops:
156
      if op.status == constants.OP_STATUS_SUCCESS:
157
        continue
158

    
159
      all_success = False
160

    
161
      if op.status == constants.OP_STATUS_QUEUED:
162
        pass
163
      elif op.status == constants.OP_STATUS_WAITLOCK:
164
        status = constants.JOB_STATUS_WAITLOCK
165
      elif op.status == constants.OP_STATUS_RUNNING:
166
        status = constants.JOB_STATUS_RUNNING
167
      elif op.status == constants.OP_STATUS_ERROR:
168
        status = constants.JOB_STATUS_ERROR
169
        # The whole job fails if one opcode failed
170
        break
171
      elif op.status == constants.OP_STATUS_CANCELED:
172
        status = constants.OP_STATUS_CANCELED
173
        break
174

    
175
    if all_success:
176
      status = constants.JOB_STATUS_SUCCESS
177

    
178
    return status
179

    
180
  def GetLogEntries(self, newer_than):
181
    if newer_than is None:
182
      serial = -1
183
    else:
184
      serial = newer_than
185

    
186
    entries = []
187
    for op in self.ops:
188
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189

    
190
    return entries
191

    
192

    
193
class _JobQueueWorker(workerpool.BaseWorker):
194
  def _NotifyStart(self):
195
    """Mark the opcode as running, not lock-waiting.
196

197
    This is called from the mcpu code as a notifier function, when the
198
    LU is finally about to start the Exec() method. Of course, to have
199
    end-user visible results, the opcode must be initially (before
200
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201

202
    """
203
    assert self.queue, "Queue attribute is missing"
204
    assert self.opcode, "Opcode attribute is missing"
205

    
206
    self.queue.acquire()
207
    try:
208
      self.opcode.status = constants.OP_STATUS_RUNNING
209
    finally:
210
      self.queue.release()
211

    
212
  def RunTask(self, job):
213
    """Job executor.
214

215
    This functions processes a job. It is closely tied to the _QueuedJob and
216
    _QueuedOpCode classes.
217

218
    """
219
    logging.debug("Worker %s processing job %s",
220
                  self.worker_id, job.id)
221
    proc = mcpu.Processor(self.pool.queue.context)
222
    self.queue = queue = job.queue
223
    try:
224
      try:
225
        count = len(job.ops)
226
        for idx, op in enumerate(job.ops):
227
          try:
228
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229

    
230
            queue.acquire()
231
            try:
232
              job.run_op_index = idx
233
              op.status = constants.OP_STATUS_WAITLOCK
234
              op.result = None
235
              op.start_timestamp = TimeStampNow()
236
              if idx == 0: # first opcode
237
                job.start_timestamp = op.start_timestamp
238
              queue.UpdateJobUnlocked(job)
239

    
240
              input_opcode = op.input
241
            finally:
242
              queue.release()
243

    
244
            def _Log(*args):
245
              """Append a log entry.
246

247
              """
248
              assert len(args) < 3
249

    
250
              if len(args) == 1:
251
                log_type = constants.ELOG_MESSAGE
252
                log_msg = args[0]
253
              else:
254
                log_type, log_msg = args
255

    
256
              # The time is split to make serialization easier and not lose
257
              # precision.
258
              timestamp = utils.SplitTime(time.time())
259

    
260
              queue.acquire()
261
              try:
262
                job.log_serial += 1
263
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
264

    
265
                job.change.notifyAll()
266
              finally:
267
                queue.release()
268

    
269
            # Make sure not to hold lock while _Log is called
270
            self.opcode = op
271
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272

    
273
            queue.acquire()
274
            try:
275
              op.status = constants.OP_STATUS_SUCCESS
276
              op.result = result
277
              op.end_timestamp = TimeStampNow()
278
              queue.UpdateJobUnlocked(job)
279
            finally:
280
              queue.release()
281

    
282
            logging.debug("Op %s/%s: Successfully finished %s",
283
                          idx + 1, count, op)
284
          except Exception, err:
285
            queue.acquire()
286
            try:
287
              try:
288
                op.status = constants.OP_STATUS_ERROR
289
                op.result = str(err)
290
                op.end_timestamp = TimeStampNow()
291
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292
              finally:
293
                queue.UpdateJobUnlocked(job)
294
            finally:
295
              queue.release()
296
            raise
297

    
298
      except errors.GenericError, err:
299
        logging.exception("Ganeti exception")
300
      except:
301
        logging.exception("Unhandled exception")
302
    finally:
303
      queue.acquire()
304
      try:
305
        try:
306
          job.run_op_idx = -1
307
          job.end_timestamp = TimeStampNow()
308
          queue.UpdateJobUnlocked(job)
309
        finally:
310
          job_id = job.id
311
          status = job.CalcStatus()
312
      finally:
313
        queue.release()
314
      logging.debug("Worker %s finished job %s, status = %s",
315
                    self.worker_id, job_id, status)
316

    
317

    
318
class _JobQueueWorkerPool(workerpool.WorkerPool):
319
  def __init__(self, queue):
320
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321
                                              _JobQueueWorker)
322
    self.queue = queue
323

    
324

    
325
class JobQueue(object):
326
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327

    
328
  def _RequireOpenQueue(fn):
329
    """Decorator for "public" functions.
330

331
    This function should be used for all "public" functions. That is, functions
332
    usually called from other classes.
333

334
    Important: Use this decorator only after utils.LockedMethod!
335

336
    Example:
337
      @utils.LockedMethod
338
      @_RequireOpenQueue
339
      def Example(self):
340
        pass
341

342
    """
343
    def wrapper(self, *args, **kwargs):
344
      assert self._queue_lock is not None, "Queue should be open"
345
      return fn(self, *args, **kwargs)
346
    return wrapper
347

    
348
  def __init__(self, context):
349
    self.context = context
350
    self._memcache = weakref.WeakValueDictionary()
351
    self._my_hostname = utils.HostInfo().name
352

    
353
    # Locking
354
    self._lock = threading.Lock()
355
    self.acquire = self._lock.acquire
356
    self.release = self._lock.release
357

    
358
    # Initialize
359
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360

    
361
    # Read serial file
362
    self._last_serial = jstore.ReadSerial()
363
    assert self._last_serial is not None, ("Serial file was modified between"
364
                                           " check in jstore and here")
365

    
366
    # Get initial list of nodes
367
    self._nodes = set(self.context.cfg.GetNodeList())
368

    
369
    # Remove master node
370
    try:
371
      self._nodes.remove(self._my_hostname)
372
    except ValueError:
373
      pass
374

    
375
    # TODO: Check consistency across nodes
376

    
377
    # Setup worker pool
378
    self._wpool = _JobQueueWorkerPool(self)
379

    
380
    # We need to lock here because WorkerPool.AddTask() may start a job while
381
    # we're still doing our work.
382
    self.acquire()
383
    try:
384
      for job in self._GetJobsUnlocked(None):
385
        # a failure in loading the job can cause 'None' to be returned
386
        if job is None:
387
          continue
388

    
389
        status = job.CalcStatus()
390

    
391
        if status in (constants.JOB_STATUS_QUEUED, ):
392
          self._wpool.AddTask(job)
393

    
394
        elif status in (constants.JOB_STATUS_RUNNING,
395
                        constants.JOB_STATUS_WAITLOCK):
396
          logging.warning("Unfinished job %s found: %s", job.id, job)
397
          try:
398
            for op in job.ops:
399
              op.status = constants.OP_STATUS_ERROR
400
              op.result = "Unclean master daemon shutdown"
401
          finally:
402
            self.UpdateJobUnlocked(job)
403
    finally:
404
      self.release()
405

    
406
  @utils.LockedMethod
407
  @_RequireOpenQueue
408
  def AddNode(self, node_name):
409
    assert node_name != self._my_hostname
410

    
411
    # Clean queue directory on added node
412
    RpcRunner.call_jobqueue_purge(node_name)
413

    
414
    # Upload the whole queue excluding archived jobs
415
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
416

    
417
    # Upload current serial file
418
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
419

    
420
    for file_name in files:
421
      # Read file content
422
      fd = open(file_name, "r")
423
      try:
424
        content = fd.read()
425
      finally:
426
        fd.close()
427

    
428
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
429
      if not result[node_name]:
430
        logging.error("Failed to upload %s to %s", file_name, node_name)
431

    
432
    self._nodes.add(node_name)
433

    
434
  @utils.LockedMethod
435
  @_RequireOpenQueue
436
  def RemoveNode(self, node_name):
437
    try:
438
      # The queue is removed by the "leave node" RPC call.
439
      self._nodes.remove(node_name)
440
    except KeyError:
441
      pass
442

    
443
  def _CheckRpcResult(self, result, nodes, failmsg):
444
    failed = []
445
    success = []
446

    
447
    for node in nodes:
448
      if result[node]:
449
        success.append(node)
450
      else:
451
        failed.append(node)
452

    
453
    if failed:
454
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
455

    
456
    # +1 for the master node
457
    if (len(success) + 1) < len(failed):
458
      # TODO: Handle failing nodes
459
      logging.error("More than half of the nodes failed")
460

    
461
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
462
    """Writes a file locally and then replicates it to all nodes.
463

464
    """
465
    utils.WriteFile(file_name, data=data)
466

    
467
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
468
    self._CheckRpcResult(result, self._nodes,
469
                         "Updating %s" % file_name)
470

    
471
  def _RenameFileUnlocked(self, old, new):
472
    os.rename(old, new)
473

    
474
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
475
    self._CheckRpcResult(result, self._nodes,
476
                         "Moving %s to %s" % (old, new))
477

    
478
  def _FormatJobID(self, job_id):
479
    if not isinstance(job_id, (int, long)):
480
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
481
    if job_id < 0:
482
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
483

    
484
    return str(job_id)
485

    
486
  def _NewSerialUnlocked(self):
487
    """Generates a new job identifier.
488

489
    Job identifiers are unique during the lifetime of a cluster.
490

491
    Returns: A string representing the job identifier.
492

493
    """
494
    # New number
495
    serial = self._last_serial + 1
496

    
497
    # Write to file
498
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
499
                                        "%s\n" % serial)
500

    
501
    # Keep it only if we were able to write the file
502
    self._last_serial = serial
503

    
504
    return self._FormatJobID(serial)
505

    
506
  @staticmethod
507
  def _GetJobPath(job_id):
508
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
509

    
510
  @staticmethod
511
  def _GetArchivedJobPath(job_id):
512
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
513

    
514
  @classmethod
515
  def _ExtractJobID(cls, name):
516
    m = cls._RE_JOB_FILE.match(name)
517
    if m:
518
      return m.group(1)
519
    else:
520
      return None
521

    
522
  def _GetJobIDsUnlocked(self, archived=False):
523
    """Return all known job IDs.
524

525
    If the parameter archived is True, archived jobs IDs will be
526
    included. Currently this argument is unused.
527

528
    The method only looks at disk because it's a requirement that all
529
    jobs are present on disk (so in the _memcache we don't have any
530
    extra IDs).
531

532
    """
533
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
534
    jlist = utils.NiceSort(jlist)
535
    return jlist
536

    
537
  def _ListJobFiles(self):
538
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
539
            if self._RE_JOB_FILE.match(name)]
540

    
541
  def _LoadJobUnlocked(self, job_id):
542
    job = self._memcache.get(job_id, None)
543
    if job:
544
      logging.debug("Found job %s in memcache", job_id)
545
      return job
546

    
547
    filepath = self._GetJobPath(job_id)
548
    logging.debug("Loading job from %s", filepath)
549
    try:
550
      fd = open(filepath, "r")
551
    except IOError, err:
552
      if err.errno in (errno.ENOENT, ):
553
        return None
554
      raise
555
    try:
556
      data = serializer.LoadJson(fd.read())
557
    finally:
558
      fd.close()
559

    
560
    try:
561
      job = _QueuedJob.Restore(self, data)
562
    except Exception, err:
563
      new_path = self._GetArchivedJobPath(job_id)
564
      if filepath == new_path:
565
        # job already archived (future case)
566
        logging.exception("Can't parse job %s", job_id)
567
      else:
568
        # non-archived case
569
        logging.exception("Can't parse job %s, will archive.", job_id)
570
        self._RenameFileUnlocked(filepath, new_path)
571
      return None
572

    
573
    self._memcache[job_id] = job
574
    logging.debug("Added job %s to the cache", job_id)
575
    return job
576

    
577
  def _GetJobsUnlocked(self, job_ids):
578
    if not job_ids:
579
      job_ids = self._GetJobIDsUnlocked()
580

    
581
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
582

    
583
  @staticmethod
584
  def _IsQueueMarkedDrain():
585
    """Check if the queue is marked from drain.
586

587
    This currently uses the queue drain file, which makes it a
588
    per-node flag. In the future this can be moved to the config file.
589

590
    """
591
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
592

    
593
  @staticmethod
594
  def SetDrainFlag(drain_flag):
595
    """Sets the drain flag for the queue.
596

597
    This is similar to the function L{backend.JobQueueSetDrainFlag},
598
    and in the future we might merge them.
599

600
    """
601
    if drain_flag:
602
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
603
    else:
604
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
605
    return True
606

    
607
  @utils.LockedMethod
608
  @_RequireOpenQueue
609
  def SubmitJob(self, ops):
610
    """Create and store a new job.
611

612
    This enters the job into our job queue and also puts it on the new
613
    queue, in order for it to be picked up by the queue processors.
614

615
    @type ops: list
616
    @param ops: The list of OpCodes that will become the new job.
617

618
    """
619
    if self._IsQueueMarkedDrain():
620
      raise errors.JobQueueDrainError()
621
    # Get job identifier
622
    job_id = self._NewSerialUnlocked()
623
    job = _QueuedJob(self, job_id, ops)
624

    
625
    # Write to disk
626
    self.UpdateJobUnlocked(job)
627

    
628
    logging.debug("Adding new job %s to the cache", job_id)
629
    self._memcache[job_id] = job
630

    
631
    # Add to worker pool
632
    self._wpool.AddTask(job)
633

    
634
    return job.id
635

    
636
  @_RequireOpenQueue
637
  def UpdateJobUnlocked(self, job):
638
    filename = self._GetJobPath(job.id)
639
    data = serializer.DumpJson(job.Serialize(), indent=False)
640
    logging.debug("Writing job %s to %s", job.id, filename)
641
    self._WriteAndReplicateFileUnlocked(filename, data)
642

    
643
    # Notify waiters about potential changes
644
    job.change.notifyAll()
645

    
646
  @utils.LockedMethod
647
  @_RequireOpenQueue
648
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
649
                        timeout):
650
    """Waits for changes in a job.
651

652
    @type job_id: string
653
    @param job_id: Job identifier
654
    @type fields: list of strings
655
    @param fields: Which fields to check for changes
656
    @type prev_job_info: list or None
657
    @param prev_job_info: Last job information returned
658
    @type prev_log_serial: int
659
    @param prev_log_serial: Last job message serial number
660
    @type timeout: float
661
    @param timeout: maximum time to wait
662

663
    """
664
    logging.debug("Waiting for changes in job %s", job_id)
665
    end_time = time.time() + timeout
666
    while True:
667
      delta_time = end_time - time.time()
668
      if delta_time < 0:
669
        return constants.JOB_NOTCHANGED
670

    
671
      job = self._LoadJobUnlocked(job_id)
672
      if not job:
673
        logging.debug("Job %s not found", job_id)
674
        break
675

    
676
      status = job.CalcStatus()
677
      job_info = self._GetJobInfoUnlocked(job, fields)
678
      log_entries = job.GetLogEntries(prev_log_serial)
679

    
680
      # Serializing and deserializing data can cause type changes (e.g. from
681
      # tuple to list) or precision loss. We're doing it here so that we get
682
      # the same modifications as the data received from the client. Without
683
      # this, the comparison afterwards might fail without the data being
684
      # significantly different.
685
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
686
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
687

    
688
      if status not in (constants.JOB_STATUS_QUEUED,
689
                        constants.JOB_STATUS_RUNNING,
690
                        constants.JOB_STATUS_WAITLOCK):
691
        # Don't even try to wait if the job is no longer running, there will be
692
        # no changes.
693
        break
694

    
695
      if (prev_job_info != job_info or
696
          (log_entries and prev_log_serial != log_entries[0][0])):
697
        break
698

    
699
      logging.debug("Waiting again")
700

    
701
      # Release the queue lock while waiting
702
      job.change.wait(delta_time)
703

    
704
    logging.debug("Job %s changed", job_id)
705

    
706
    return (job_info, log_entries)
707

    
708
  @utils.LockedMethod
709
  @_RequireOpenQueue
710
  def CancelJob(self, job_id):
711
    """Cancels a job.
712

713
    @type job_id: string
714
    @param job_id: Job ID of job to be cancelled.
715

716
    """
717
    logging.debug("Cancelling job %s", job_id)
718

    
719
    job = self._LoadJobUnlocked(job_id)
720
    if not job:
721
      logging.debug("Job %s not found", job_id)
722
      return
723

    
724
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
725
      logging.debug("Job %s is no longer in the queue", job.id)
726
      return
727

    
728
    try:
729
      for op in job.ops:
730
        op.status = constants.OP_STATUS_ERROR
731
        op.result = "Job cancelled by request"
732
    finally:
733
      self.UpdateJobUnlocked(job)
734

    
735
  @_RequireOpenQueue
736
  def _ArchiveJobUnlocked(self, job_id):
737
    """Archives a job.
738

739
    @type job_id: string
740
    @param job_id: Job ID of job to be archived.
741

742
    """
743
    logging.info("Archiving job %s", job_id)
744

    
745
    job = self._LoadJobUnlocked(job_id)
746
    if not job:
747
      logging.debug("Job %s not found", job_id)
748
      return
749

    
750
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
751
                                constants.JOB_STATUS_SUCCESS,
752
                                constants.JOB_STATUS_ERROR):
753
      logging.debug("Job %s is not yet done", job.id)
754
      return
755

    
756
    old = self._GetJobPath(job.id)
757
    new = self._GetArchivedJobPath(job.id)
758

    
759
    self._RenameFileUnlocked(old, new)
760

    
761
    logging.debug("Successfully archived job %s", job.id)
762

    
763
  @utils.LockedMethod
764
  @_RequireOpenQueue
765
  def ArchiveJob(self, job_id):
766
    """Archives a job.
767

768
    @type job_id: string
769
    @param job_id: Job ID of job to be archived.
770

771
    """
772
    return self._ArchiveJobUnlocked(job_id)
773

    
774
  @utils.LockedMethod
775
  @_RequireOpenQueue
776
  def AutoArchiveJobs(self, age):
777
    """Archives all jobs based on age.
778

779
    The method will archive all jobs which are older than the age
780
    parameter. For jobs that don't have an end timestamp, the start
781
    timestamp will be considered. The special '-1' age will cause
782
    archival of all jobs (that are not running or queued).
783

784
    @type age: int
785
    @param age: the minimum age in seconds
786

787
    """
788
    logging.info("Archiving jobs with age more than %s seconds", age)
789

    
790
    now = time.time()
791
    for jid in self._GetJobIDsUnlocked(archived=False):
792
      job = self._LoadJobUnlocked(jid)
793
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
794
                                  constants.OP_STATUS_ERROR,
795
                                  constants.OP_STATUS_CANCELED):
796
        continue
797
      if job.end_timestamp is None:
798
        if job.start_timestamp is None:
799
          job_age = job.received_timestamp
800
        else:
801
          job_age = job.start_timestamp
802
      else:
803
        job_age = job.end_timestamp
804

    
805
      if age == -1 or now - job_age[0] > age:
806
        self._ArchiveJobUnlocked(jid)
807

    
808
  def _GetJobInfoUnlocked(self, job, fields):
809
    row = []
810
    for fname in fields:
811
      if fname == "id":
812
        row.append(job.id)
813
      elif fname == "status":
814
        row.append(job.CalcStatus())
815
      elif fname == "ops":
816
        row.append([op.input.__getstate__() for op in job.ops])
817
      elif fname == "opresult":
818
        row.append([op.result for op in job.ops])
819
      elif fname == "opstatus":
820
        row.append([op.status for op in job.ops])
821
      elif fname == "oplog":
822
        row.append([op.log for op in job.ops])
823
      elif fname == "opstart":
824
        row.append([op.start_timestamp for op in job.ops])
825
      elif fname == "opend":
826
        row.append([op.end_timestamp for op in job.ops])
827
      elif fname == "received_ts":
828
        row.append(job.received_timestamp)
829
      elif fname == "start_ts":
830
        row.append(job.start_timestamp)
831
      elif fname == "end_ts":
832
        row.append(job.end_timestamp)
833
      elif fname == "summary":
834
        row.append([op.input.Summary() for op in job.ops])
835
      else:
836
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
837
    return row
838

    
839
  @utils.LockedMethod
840
  @_RequireOpenQueue
841
  def QueryJobs(self, job_ids, fields):
842
    """Returns a list of jobs in queue.
843

844
    Args:
845
    - job_ids: Sequence of job identifiers or None for all
846
    - fields: Names of fields to return
847

848
    """
849
    jobs = []
850

    
851
    for job in self._GetJobsUnlocked(job_ids):
852
      if job is None:
853
        jobs.append(None)
854
      else:
855
        jobs.append(self._GetJobInfoUnlocked(job, fields))
856

    
857
    return jobs
858

    
859
  @utils.LockedMethod
860
  @_RequireOpenQueue
861
  def Shutdown(self):
862
    """Stops the job queue.
863

864
    """
865
    self._wpool.TerminateWorkers()
866

    
867
    self._queue_lock.Close()
868
    self._queue_lock = None