Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 686d7433

History | View | Annotate | Download (23.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48
from ganeti.rpc import RpcRunner
49

    
50
JOBQUEUE_THREADS = 25
51

    
52

    
53
def TimeStampNow():
54
  return utils.SplitTime(time.time())
55

    
56

    
57
class _QueuedOpCode(object):
58
  """Encasulates an opcode object.
59

60
  The 'log' attribute holds the execution log and consists of tuples
61
  of the form (log_serial, timestamp, level, message).
62

63
  """
64
  def __init__(self, op):
65
    self.input = op
66
    self.status = constants.OP_STATUS_QUEUED
67
    self.result = None
68
    self.log = []
69
    self.start_timestamp = None
70
    self.end_timestamp = None
71

    
72
  @classmethod
73
  def Restore(cls, state):
74
    obj = _QueuedOpCode.__new__(cls)
75
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76
    obj.status = state["status"]
77
    obj.result = state["result"]
78
    obj.log = state["log"]
79
    obj.start_timestamp = state.get("start_timestamp", None)
80
    obj.end_timestamp = state.get("end_timestamp", None)
81
    return obj
82

    
83
  def Serialize(self):
84
    return {
85
      "input": self.input.__getstate__(),
86
      "status": self.status,
87
      "result": self.result,
88
      "log": self.log,
89
      "start_timestamp": self.start_timestamp,
90
      "end_timestamp": self.end_timestamp,
91
      }
92

    
93

    
94
class _QueuedJob(object):
95
  """In-memory job representation.
96

97
  This is what we use to track the user-submitted jobs. Locking must be taken
98
  care of by users of this class.
99

100
  """
101
  def __init__(self, queue, job_id, ops):
102
    if not ops:
103
      # TODO
104
      raise Exception("No opcodes")
105

    
106
    self.queue = queue
107
    self.id = job_id
108
    self.ops = [_QueuedOpCode(op) for op in ops]
109
    self.run_op_index = -1
110
    self.log_serial = 0
111
    self.received_timestamp = TimeStampNow()
112
    self.start_timestamp = None
113
    self.end_timestamp = None
114

    
115
    # Condition to wait for changes
116
    self.change = threading.Condition(self.queue._lock)
117

    
118
  @classmethod
119
  def Restore(cls, queue, state):
120
    obj = _QueuedJob.__new__(cls)
121
    obj.queue = queue
122
    obj.id = state["id"]
123
    obj.run_op_index = state["run_op_index"]
124
    obj.received_timestamp = state.get("received_timestamp", None)
125
    obj.start_timestamp = state.get("start_timestamp", None)
126
    obj.end_timestamp = state.get("end_timestamp", None)
127

    
128
    obj.ops = []
129
    obj.log_serial = 0
130
    for op_state in state["ops"]:
131
      op = _QueuedOpCode.Restore(op_state)
132
      for log_entry in op.log:
133
        obj.log_serial = max(obj.log_serial, log_entry[0])
134
      obj.ops.append(op)
135

    
136
    # Condition to wait for changes
137
    obj.change = threading.Condition(obj.queue._lock)
138

    
139
    return obj
140

    
141
  def Serialize(self):
142
    return {
143
      "id": self.id,
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146
      "start_timestamp": self.start_timestamp,
147
      "end_timestamp": self.end_timestamp,
148
      "received_timestamp": self.received_timestamp,
149
      }
150

    
151
  def CalcStatus(self):
152
    status = constants.JOB_STATUS_QUEUED
153

    
154
    all_success = True
155
    for op in self.ops:
156
      if op.status == constants.OP_STATUS_SUCCESS:
157
        continue
158

    
159
      all_success = False
160

    
161
      if op.status == constants.OP_STATUS_QUEUED:
162
        pass
163
      elif op.status == constants.OP_STATUS_WAITLOCK:
164
        status = constants.JOB_STATUS_WAITLOCK
165
      elif op.status == constants.OP_STATUS_RUNNING:
166
        status = constants.JOB_STATUS_RUNNING
167
      elif op.status == constants.OP_STATUS_ERROR:
168
        status = constants.JOB_STATUS_ERROR
169
        # The whole job fails if one opcode failed
170
        break
171
      elif op.status == constants.OP_STATUS_CANCELED:
172
        status = constants.OP_STATUS_CANCELED
173
        break
174

    
175
    if all_success:
176
      status = constants.JOB_STATUS_SUCCESS
177

    
178
    return status
179

    
180
  def GetLogEntries(self, newer_than):
181
    if newer_than is None:
182
      serial = -1
183
    else:
184
      serial = newer_than
185

    
186
    entries = []
187
    for op in self.ops:
188
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189

    
190
    return entries
191

    
192

    
193
class _JobQueueWorker(workerpool.BaseWorker):
194
  def _NotifyStart(self):
195
    """Mark the opcode as running, not lock-waiting.
196

197
    This is called from the mcpu code as a notifier function, when the
198
    LU is finally about to start the Exec() method. Of course, to have
199
    end-user visible results, the opcode must be initially (before
200
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201

202
    """
203
    assert self.queue, "Queue attribute is missing"
204
    assert self.opcode, "Opcode attribute is missing"
205

    
206
    self.queue.acquire()
207
    try:
208
      self.opcode.status = constants.OP_STATUS_RUNNING
209
    finally:
210
      self.queue.release()
211

    
212
  def RunTask(self, job):
213
    """Job executor.
214

215
    This functions processes a job. It is closely tied to the _QueuedJob and
216
    _QueuedOpCode classes.
217

218
    """
219
    logging.debug("Worker %s processing job %s",
220
                  self.worker_id, job.id)
221
    proc = mcpu.Processor(self.pool.queue.context)
222
    self.queue = queue = job.queue
223
    try:
224
      try:
225
        count = len(job.ops)
226
        for idx, op in enumerate(job.ops):
227
          try:
228
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229

    
230
            queue.acquire()
231
            try:
232
              job.run_op_index = idx
233
              op.status = constants.OP_STATUS_WAITLOCK
234
              op.result = None
235
              op.start_timestamp = TimeStampNow()
236
              if idx == 0: # first opcode
237
                job.start_timestamp = op.start_timestamp
238
              queue.UpdateJobUnlocked(job)
239

    
240
              input_opcode = op.input
241
            finally:
242
              queue.release()
243

    
244
            def _Log(*args):
245
              """Append a log entry.
246

247
              """
248
              assert len(args) < 3
249

    
250
              if len(args) == 1:
251
                log_type = constants.ELOG_MESSAGE
252
                log_msg = args[0]
253
              else:
254
                log_type, log_msg = args
255

    
256
              # The time is split to make serialization easier and not lose
257
              # precision.
258
              timestamp = utils.SplitTime(time.time())
259

    
260
              queue.acquire()
261
              try:
262
                job.log_serial += 1
263
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
264

    
265
                job.change.notifyAll()
266
              finally:
267
                queue.release()
268

    
269
            # Make sure not to hold lock while _Log is called
270
            self.opcode = op
271
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272

    
273
            queue.acquire()
274
            try:
275
              op.status = constants.OP_STATUS_SUCCESS
276
              op.result = result
277
              op.end_timestamp = TimeStampNow()
278
              queue.UpdateJobUnlocked(job)
279
            finally:
280
              queue.release()
281

    
282
            logging.debug("Op %s/%s: Successfully finished %s",
283
                          idx + 1, count, op)
284
          except Exception, err:
285
            queue.acquire()
286
            try:
287
              try:
288
                op.status = constants.OP_STATUS_ERROR
289
                op.result = str(err)
290
                op.end_timestamp = TimeStampNow()
291
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292
              finally:
293
                queue.UpdateJobUnlocked(job)
294
            finally:
295
              queue.release()
296
            raise
297

    
298
      except errors.GenericError, err:
299
        logging.exception("Ganeti exception")
300
      except:
301
        logging.exception("Unhandled exception")
302
    finally:
303
      queue.acquire()
304
      try:
305
        try:
306
          job.run_op_idx = -1
307
          job.end_timestamp = TimeStampNow()
308
          queue.UpdateJobUnlocked(job)
309
        finally:
310
          job_id = job.id
311
          status = job.CalcStatus()
312
      finally:
313
        queue.release()
314
      logging.debug("Worker %s finished job %s, status = %s",
315
                    self.worker_id, job_id, status)
316

    
317

    
318
class _JobQueueWorkerPool(workerpool.WorkerPool):
319
  def __init__(self, queue):
320
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321
                                              _JobQueueWorker)
322
    self.queue = queue
323

    
324

    
325
class JobQueue(object):
326
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327

    
328
  def _RequireOpenQueue(fn):
329
    """Decorator for "public" functions.
330

331
    This function should be used for all "public" functions. That is, functions
332
    usually called from other classes.
333

334
    Important: Use this decorator only after utils.LockedMethod!
335

336
    Example:
337
      @utils.LockedMethod
338
      @_RequireOpenQueue
339
      def Example(self):
340
        pass
341

342
    """
343
    def wrapper(self, *args, **kwargs):
344
      assert self._queue_lock is not None, "Queue should be open"
345
      return fn(self, *args, **kwargs)
346
    return wrapper
347

    
348
  def __init__(self, context):
349
    self.context = context
350
    self._memcache = weakref.WeakValueDictionary()
351
    self._my_hostname = utils.HostInfo().name
352

    
353
    # Locking
354
    self._lock = threading.Lock()
355
    self.acquire = self._lock.acquire
356
    self.release = self._lock.release
357

    
358
    # Initialize
359
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360

    
361
    # Read serial file
362
    self._last_serial = jstore.ReadSerial()
363
    assert self._last_serial is not None, ("Serial file was modified between"
364
                                           " check in jstore and here")
365

    
366
    # Get initial list of nodes
367
    self._nodes = set(self.context.cfg.GetNodeList())
368

    
369
    # Remove master node
370
    try:
371
      self._nodes.remove(self._my_hostname)
372
    except ValueError:
373
      pass
374

    
375
    # TODO: Check consistency across nodes
376

    
377
    # Setup worker pool
378
    self._wpool = _JobQueueWorkerPool(self)
379

    
380
    # We need to lock here because WorkerPool.AddTask() may start a job while
381
    # we're still doing our work.
382
    self.acquire()
383
    try:
384
      for job in self._GetJobsUnlocked(None):
385
        status = job.CalcStatus()
386

    
387
        if status in (constants.JOB_STATUS_QUEUED, ):
388
          self._wpool.AddTask(job)
389

    
390
        elif status in (constants.JOB_STATUS_RUNNING,
391
                        constants.JOB_STATUS_WAITLOCK):
392
          logging.warning("Unfinished job %s found: %s", job.id, job)
393
          try:
394
            for op in job.ops:
395
              op.status = constants.OP_STATUS_ERROR
396
              op.result = "Unclean master daemon shutdown"
397
          finally:
398
            self.UpdateJobUnlocked(job)
399
    finally:
400
      self.release()
401

    
402
  @utils.LockedMethod
403
  @_RequireOpenQueue
404
  def AddNode(self, node_name):
405
    assert node_name != self._my_hostname
406

    
407
    # Clean queue directory on added node
408
    RpcRunner.call_jobqueue_purge(node_name)
409

    
410
    # Upload the whole queue excluding archived jobs
411
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
412

    
413
    # Upload current serial file
414
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
415

    
416
    for file_name in files:
417
      # Read file content
418
      fd = open(file_name, "r")
419
      try:
420
        content = fd.read()
421
      finally:
422
        fd.close()
423

    
424
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
425
      if not result[node_name]:
426
        logging.error("Failed to upload %s to %s", file_name, node_name)
427

    
428
    self._nodes.add(node_name)
429

    
430
  @utils.LockedMethod
431
  @_RequireOpenQueue
432
  def RemoveNode(self, node_name):
433
    try:
434
      # The queue is removed by the "leave node" RPC call.
435
      self._nodes.remove(node_name)
436
    except KeyError:
437
      pass
438

    
439
  def _CheckRpcResult(self, result, nodes, failmsg):
440
    failed = []
441
    success = []
442

    
443
    for node in nodes:
444
      if result[node]:
445
        success.append(node)
446
      else:
447
        failed.append(node)
448

    
449
    if failed:
450
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
451

    
452
    # +1 for the master node
453
    if (len(success) + 1) < len(failed):
454
      # TODO: Handle failing nodes
455
      logging.error("More than half of the nodes failed")
456

    
457
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
458
    """Writes a file locally and then replicates it to all nodes.
459

460
    """
461
    utils.WriteFile(file_name, data=data)
462

    
463
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
464
    self._CheckRpcResult(result, self._nodes,
465
                         "Updating %s" % file_name)
466

    
467
  def _RenameFileUnlocked(self, old, new):
468
    os.rename(old, new)
469

    
470
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
471
    self._CheckRpcResult(result, self._nodes,
472
                         "Moving %s to %s" % (old, new))
473

    
474
  def _FormatJobID(self, job_id):
475
    if not isinstance(job_id, (int, long)):
476
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
477
    if job_id < 0:
478
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
479

    
480
    return str(job_id)
481

    
482
  def _NewSerialUnlocked(self):
483
    """Generates a new job identifier.
484

485
    Job identifiers are unique during the lifetime of a cluster.
486

487
    Returns: A string representing the job identifier.
488

489
    """
490
    # New number
491
    serial = self._last_serial + 1
492

    
493
    # Write to file
494
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
495
                                        "%s\n" % serial)
496

    
497
    # Keep it only if we were able to write the file
498
    self._last_serial = serial
499

    
500
    return self._FormatJobID(serial)
501

    
502
  @staticmethod
503
  def _GetJobPath(job_id):
504
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
505

    
506
  @staticmethod
507
  def _GetArchivedJobPath(job_id):
508
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
509

    
510
  @classmethod
511
  def _ExtractJobID(cls, name):
512
    m = cls._RE_JOB_FILE.match(name)
513
    if m:
514
      return m.group(1)
515
    else:
516
      return None
517

    
518
  def _GetJobIDsUnlocked(self, archived=False):
519
    """Return all known job IDs.
520

521
    If the parameter archived is True, archived jobs IDs will be
522
    included. Currently this argument is unused.
523

524
    The method only looks at disk because it's a requirement that all
525
    jobs are present on disk (so in the _memcache we don't have any
526
    extra IDs).
527

528
    """
529
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
530
    jlist = utils.NiceSort(jlist)
531
    return jlist
532

    
533
  def _ListJobFiles(self):
534
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
535
            if self._RE_JOB_FILE.match(name)]
536

    
537
  def _LoadJobUnlocked(self, job_id):
538
    job = self._memcache.get(job_id, None)
539
    if job:
540
      logging.debug("Found job %s in memcache", job_id)
541
      return job
542

    
543
    filepath = self._GetJobPath(job_id)
544
    logging.debug("Loading job from %s", filepath)
545
    try:
546
      fd = open(filepath, "r")
547
    except IOError, err:
548
      if err.errno in (errno.ENOENT, ):
549
        return None
550
      raise
551
    try:
552
      data = serializer.LoadJson(fd.read())
553
    finally:
554
      fd.close()
555

    
556
    job = _QueuedJob.Restore(self, data)
557
    self._memcache[job_id] = job
558
    logging.debug("Added job %s to the cache", job_id)
559
    return job
560

    
561
  def _GetJobsUnlocked(self, job_ids):
562
    if not job_ids:
563
      job_ids = self._GetJobIDsUnlocked()
564

    
565
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
566

    
567
  @staticmethod
568
  def _IsQueueMarkedDrain():
569
    """Check if the queue is marked from drain.
570

571
    This currently uses the queue drain file, which makes it a
572
    per-node flag. In the future this can be moved to the config file.
573

574
    """
575
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
576

    
577
  @utils.LockedMethod
578
  @_RequireOpenQueue
579
  def SubmitJob(self, ops):
580
    """Create and store a new job.
581

582
    This enters the job into our job queue and also puts it on the new
583
    queue, in order for it to be picked up by the queue processors.
584

585
    @type ops: list
586
    @param ops: The list of OpCodes that will become the new job.
587

588
    """
589
    if self._IsQueueMarkedDrain():
590
      raise errors.JobQueueDrainError()
591
    # Get job identifier
592
    job_id = self._NewSerialUnlocked()
593
    job = _QueuedJob(self, job_id, ops)
594

    
595
    # Write to disk
596
    self.UpdateJobUnlocked(job)
597

    
598
    logging.debug("Adding new job %s to the cache", job_id)
599
    self._memcache[job_id] = job
600

    
601
    # Add to worker pool
602
    self._wpool.AddTask(job)
603

    
604
    return job.id
605

    
606
  @_RequireOpenQueue
607
  def UpdateJobUnlocked(self, job):
608
    filename = self._GetJobPath(job.id)
609
    data = serializer.DumpJson(job.Serialize(), indent=False)
610
    logging.debug("Writing job %s to %s", job.id, filename)
611
    self._WriteAndReplicateFileUnlocked(filename, data)
612

    
613
    # Notify waiters about potential changes
614
    job.change.notifyAll()
615

    
616
  @utils.LockedMethod
617
  @_RequireOpenQueue
618
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
619
                        timeout):
620
    """Waits for changes in a job.
621

622
    @type job_id: string
623
    @param job_id: Job identifier
624
    @type fields: list of strings
625
    @param fields: Which fields to check for changes
626
    @type prev_job_info: list or None
627
    @param prev_job_info: Last job information returned
628
    @type prev_log_serial: int
629
    @param prev_log_serial: Last job message serial number
630
    @type timeout: float
631
    @param timeout: maximum time to wait
632

633
    """
634
    logging.debug("Waiting for changes in job %s", job_id)
635
    end_time = time.time() + timeout
636
    while True:
637
      delta_time = end_time - time.time()
638
      if delta_time < 0:
639
        return constants.JOB_NOTCHANGED
640

    
641
      job = self._LoadJobUnlocked(job_id)
642
      if not job:
643
        logging.debug("Job %s not found", job_id)
644
        break
645

    
646
      status = job.CalcStatus()
647
      job_info = self._GetJobInfoUnlocked(job, fields)
648
      log_entries = job.GetLogEntries(prev_log_serial)
649

    
650
      # Serializing and deserializing data can cause type changes (e.g. from
651
      # tuple to list) or precision loss. We're doing it here so that we get
652
      # the same modifications as the data received from the client. Without
653
      # this, the comparison afterwards might fail without the data being
654
      # significantly different.
655
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
656
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
657

    
658
      if status not in (constants.JOB_STATUS_QUEUED,
659
                        constants.JOB_STATUS_RUNNING,
660
                        constants.JOB_STATUS_WAITLOCK):
661
        # Don't even try to wait if the job is no longer running, there will be
662
        # no changes.
663
        break
664

    
665
      if (prev_job_info != job_info or
666
          (log_entries and prev_log_serial != log_entries[0][0])):
667
        break
668

    
669
      logging.debug("Waiting again")
670

    
671
      # Release the queue lock while waiting
672
      job.change.wait(delta_time)
673

    
674
    logging.debug("Job %s changed", job_id)
675

    
676
    return (job_info, log_entries)
677

    
678
  @utils.LockedMethod
679
  @_RequireOpenQueue
680
  def CancelJob(self, job_id):
681
    """Cancels a job.
682

683
    @type job_id: string
684
    @param job_id: Job ID of job to be cancelled.
685

686
    """
687
    logging.debug("Cancelling job %s", job_id)
688

    
689
    job = self._LoadJobUnlocked(job_id)
690
    if not job:
691
      logging.debug("Job %s not found", job_id)
692
      return
693

    
694
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
695
      logging.debug("Job %s is no longer in the queue", job.id)
696
      return
697

    
698
    try:
699
      for op in job.ops:
700
        op.status = constants.OP_STATUS_ERROR
701
        op.result = "Job cancelled by request"
702
    finally:
703
      self.UpdateJobUnlocked(job)
704

    
705
  @_RequireOpenQueue
706
  def _ArchiveJobUnlocked(self, job_id):
707
    """Archives a job.
708

709
    @type job_id: string
710
    @param job_id: Job ID of job to be archived.
711

712
    """
713
    logging.info("Archiving job %s", job_id)
714

    
715
    job = self._LoadJobUnlocked(job_id)
716
    if not job:
717
      logging.debug("Job %s not found", job_id)
718
      return
719

    
720
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
721
                                constants.JOB_STATUS_SUCCESS,
722
                                constants.JOB_STATUS_ERROR):
723
      logging.debug("Job %s is not yet done", job.id)
724
      return
725

    
726
    old = self._GetJobPath(job.id)
727
    new = self._GetArchivedJobPath(job.id)
728

    
729
    self._RenameFileUnlocked(old, new)
730

    
731
    logging.debug("Successfully archived job %s", job.id)
732

    
733
  @utils.LockedMethod
734
  @_RequireOpenQueue
735
  def ArchiveJob(self, job_id):
736
    """Archives a job.
737

738
    @type job_id: string
739
    @param job_id: Job ID of job to be archived.
740

741
    """
742
    return self._ArchiveJobUnlocked(job_id)
743

    
744
  @utils.LockedMethod
745
  @_RequireOpenQueue
746
  def AutoArchiveJobs(self, age):
747
    """Archives all jobs based on age.
748

749
    The method will archive all jobs which are older than the age
750
    parameter. For jobs that don't have an end timestamp, the start
751
    timestamp will be considered. The special '-1' age will cause
752
    archival of all jobs (that are not running or queued).
753

754
    @type age: int
755
    @param age: the minimum age in seconds
756

757
    """
758
    logging.info("Archiving jobs with age more than %s seconds", age)
759

    
760
    now = time.time()
761
    for jid in self._GetJobIDsUnlocked(archived=False):
762
      job = self._LoadJobUnlocked(jid)
763
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
764
                                  constants.OP_STATUS_ERROR,
765
                                  constants.OP_STATUS_CANCELED):
766
        continue
767
      if job.end_timestamp is None:
768
        if job.start_timestamp is None:
769
          job_age = job.received_timestamp
770
        else:
771
          job_age = job.start_timestamp
772
      else:
773
        job_age = job.end_timestamp
774

    
775
      if age == -1 or now - job_age[0] > age:
776
        self._ArchiveJobUnlocked(jid)
777

    
778
  def _GetJobInfoUnlocked(self, job, fields):
779
    row = []
780
    for fname in fields:
781
      if fname == "id":
782
        row.append(job.id)
783
      elif fname == "status":
784
        row.append(job.CalcStatus())
785
      elif fname == "ops":
786
        row.append([op.input.__getstate__() for op in job.ops])
787
      elif fname == "opresult":
788
        row.append([op.result for op in job.ops])
789
      elif fname == "opstatus":
790
        row.append([op.status for op in job.ops])
791
      elif fname == "oplog":
792
        row.append([op.log for op in job.ops])
793
      elif fname == "opstart":
794
        row.append([op.start_timestamp for op in job.ops])
795
      elif fname == "opend":
796
        row.append([op.end_timestamp for op in job.ops])
797
      elif fname == "received_ts":
798
        row.append(job.received_timestamp)
799
      elif fname == "start_ts":
800
        row.append(job.start_timestamp)
801
      elif fname == "end_ts":
802
        row.append(job.end_timestamp)
803
      elif fname == "summary":
804
        row.append([op.input.Summary() for op in job.ops])
805
      else:
806
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
807
    return row
808

    
809
  @utils.LockedMethod
810
  @_RequireOpenQueue
811
  def QueryJobs(self, job_ids, fields):
812
    """Returns a list of jobs in queue.
813

814
    Args:
815
    - job_ids: Sequence of job identifiers or None for all
816
    - fields: Names of fields to return
817

818
    """
819
    jobs = []
820

    
821
    for job in self._GetJobsUnlocked(job_ids):
822
      if job is None:
823
        jobs.append(None)
824
      else:
825
        jobs.append(self._GetJobInfoUnlocked(job, fields))
826

    
827
    return jobs
828

    
829
  @utils.LockedMethod
830
  @_RequireOpenQueue
831
  def Shutdown(self):
832
    """Stops the job queue.
833

834
    """
835
    self._wpool.TerminateWorkers()
836

    
837
    self._queue_lock.Close()
838
    self._queue_lock = None