Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 99aabbed

History | View | Annotate | Download (25.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48
from ganeti.rpc import RpcRunner
49

    
50
JOBQUEUE_THREADS = 25
51

    
52

    
53
def TimeStampNow():
54
  return utils.SplitTime(time.time())
55

    
56

    
57
class _QueuedOpCode(object):
58
  """Encasulates an opcode object.
59

60
  The 'log' attribute holds the execution log and consists of tuples
61
  of the form (log_serial, timestamp, level, message).
62

63
  """
64
  def __init__(self, op):
65
    self.input = op
66
    self.status = constants.OP_STATUS_QUEUED
67
    self.result = None
68
    self.log = []
69
    self.start_timestamp = None
70
    self.end_timestamp = None
71

    
72
  @classmethod
73
  def Restore(cls, state):
74
    obj = _QueuedOpCode.__new__(cls)
75
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
76
    obj.status = state["status"]
77
    obj.result = state["result"]
78
    obj.log = state["log"]
79
    obj.start_timestamp = state.get("start_timestamp", None)
80
    obj.end_timestamp = state.get("end_timestamp", None)
81
    return obj
82

    
83
  def Serialize(self):
84
    return {
85
      "input": self.input.__getstate__(),
86
      "status": self.status,
87
      "result": self.result,
88
      "log": self.log,
89
      "start_timestamp": self.start_timestamp,
90
      "end_timestamp": self.end_timestamp,
91
      }
92

    
93

    
94
class _QueuedJob(object):
95
  """In-memory job representation.
96

97
  This is what we use to track the user-submitted jobs. Locking must be taken
98
  care of by users of this class.
99

100
  """
101
  def __init__(self, queue, job_id, ops):
102
    if not ops:
103
      # TODO
104
      raise Exception("No opcodes")
105

    
106
    self.queue = queue
107
    self.id = job_id
108
    self.ops = [_QueuedOpCode(op) for op in ops]
109
    self.run_op_index = -1
110
    self.log_serial = 0
111
    self.received_timestamp = TimeStampNow()
112
    self.start_timestamp = None
113
    self.end_timestamp = None
114

    
115
    # Condition to wait for changes
116
    self.change = threading.Condition(self.queue._lock)
117

    
118
  @classmethod
119
  def Restore(cls, queue, state):
120
    obj = _QueuedJob.__new__(cls)
121
    obj.queue = queue
122
    obj.id = state["id"]
123
    obj.run_op_index = state["run_op_index"]
124
    obj.received_timestamp = state.get("received_timestamp", None)
125
    obj.start_timestamp = state.get("start_timestamp", None)
126
    obj.end_timestamp = state.get("end_timestamp", None)
127

    
128
    obj.ops = []
129
    obj.log_serial = 0
130
    for op_state in state["ops"]:
131
      op = _QueuedOpCode.Restore(op_state)
132
      for log_entry in op.log:
133
        obj.log_serial = max(obj.log_serial, log_entry[0])
134
      obj.ops.append(op)
135

    
136
    # Condition to wait for changes
137
    obj.change = threading.Condition(obj.queue._lock)
138

    
139
    return obj
140

    
141
  def Serialize(self):
142
    return {
143
      "id": self.id,
144
      "ops": [op.Serialize() for op in self.ops],
145
      "run_op_index": self.run_op_index,
146
      "start_timestamp": self.start_timestamp,
147
      "end_timestamp": self.end_timestamp,
148
      "received_timestamp": self.received_timestamp,
149
      }
150

    
151
  def CalcStatus(self):
152
    status = constants.JOB_STATUS_QUEUED
153

    
154
    all_success = True
155
    for op in self.ops:
156
      if op.status == constants.OP_STATUS_SUCCESS:
157
        continue
158

    
159
      all_success = False
160

    
161
      if op.status == constants.OP_STATUS_QUEUED:
162
        pass
163
      elif op.status == constants.OP_STATUS_WAITLOCK:
164
        status = constants.JOB_STATUS_WAITLOCK
165
      elif op.status == constants.OP_STATUS_RUNNING:
166
        status = constants.JOB_STATUS_RUNNING
167
      elif op.status == constants.OP_STATUS_ERROR:
168
        status = constants.JOB_STATUS_ERROR
169
        # The whole job fails if one opcode failed
170
        break
171
      elif op.status == constants.OP_STATUS_CANCELED:
172
        status = constants.OP_STATUS_CANCELED
173
        break
174

    
175
    if all_success:
176
      status = constants.JOB_STATUS_SUCCESS
177

    
178
    return status
179

    
180
  def GetLogEntries(self, newer_than):
181
    if newer_than is None:
182
      serial = -1
183
    else:
184
      serial = newer_than
185

    
186
    entries = []
187
    for op in self.ops:
188
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
189

    
190
    return entries
191

    
192

    
193
class _JobQueueWorker(workerpool.BaseWorker):
194
  def _NotifyStart(self):
195
    """Mark the opcode as running, not lock-waiting.
196

197
    This is called from the mcpu code as a notifier function, when the
198
    LU is finally about to start the Exec() method. Of course, to have
199
    end-user visible results, the opcode must be initially (before
200
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
201

202
    """
203
    assert self.queue, "Queue attribute is missing"
204
    assert self.opcode, "Opcode attribute is missing"
205

    
206
    self.queue.acquire()
207
    try:
208
      self.opcode.status = constants.OP_STATUS_RUNNING
209
    finally:
210
      self.queue.release()
211

    
212
  def RunTask(self, job):
213
    """Job executor.
214

215
    This functions processes a job. It is closely tied to the _QueuedJob and
216
    _QueuedOpCode classes.
217

218
    """
219
    logging.debug("Worker %s processing job %s",
220
                  self.worker_id, job.id)
221
    proc = mcpu.Processor(self.pool.queue.context)
222
    self.queue = queue = job.queue
223
    try:
224
      try:
225
        count = len(job.ops)
226
        for idx, op in enumerate(job.ops):
227
          try:
228
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
229

    
230
            queue.acquire()
231
            try:
232
              job.run_op_index = idx
233
              op.status = constants.OP_STATUS_WAITLOCK
234
              op.result = None
235
              op.start_timestamp = TimeStampNow()
236
              if idx == 0: # first opcode
237
                job.start_timestamp = op.start_timestamp
238
              queue.UpdateJobUnlocked(job)
239

    
240
              input_opcode = op.input
241
            finally:
242
              queue.release()
243

    
244
            def _Log(*args):
245
              """Append a log entry.
246

247
              """
248
              assert len(args) < 3
249

    
250
              if len(args) == 1:
251
                log_type = constants.ELOG_MESSAGE
252
                log_msg = args[0]
253
              else:
254
                log_type, log_msg = args
255

    
256
              # The time is split to make serialization easier and not lose
257
              # precision.
258
              timestamp = utils.SplitTime(time.time())
259

    
260
              queue.acquire()
261
              try:
262
                job.log_serial += 1
263
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
264

    
265
                job.change.notifyAll()
266
              finally:
267
                queue.release()
268

    
269
            # Make sure not to hold lock while _Log is called
270
            self.opcode = op
271
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
272

    
273
            queue.acquire()
274
            try:
275
              op.status = constants.OP_STATUS_SUCCESS
276
              op.result = result
277
              op.end_timestamp = TimeStampNow()
278
              queue.UpdateJobUnlocked(job)
279
            finally:
280
              queue.release()
281

    
282
            logging.debug("Op %s/%s: Successfully finished %s",
283
                          idx + 1, count, op)
284
          except Exception, err:
285
            queue.acquire()
286
            try:
287
              try:
288
                op.status = constants.OP_STATUS_ERROR
289
                op.result = str(err)
290
                op.end_timestamp = TimeStampNow()
291
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
292
              finally:
293
                queue.UpdateJobUnlocked(job)
294
            finally:
295
              queue.release()
296
            raise
297

    
298
      except errors.GenericError, err:
299
        logging.exception("Ganeti exception")
300
      except:
301
        logging.exception("Unhandled exception")
302
    finally:
303
      queue.acquire()
304
      try:
305
        try:
306
          job.run_op_idx = -1
307
          job.end_timestamp = TimeStampNow()
308
          queue.UpdateJobUnlocked(job)
309
        finally:
310
          job_id = job.id
311
          status = job.CalcStatus()
312
      finally:
313
        queue.release()
314
      logging.debug("Worker %s finished job %s, status = %s",
315
                    self.worker_id, job_id, status)
316

    
317

    
318
class _JobQueueWorkerPool(workerpool.WorkerPool):
319
  def __init__(self, queue):
320
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
321
                                              _JobQueueWorker)
322
    self.queue = queue
323

    
324

    
325
class JobQueue(object):
326
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
327

    
328
  def _RequireOpenQueue(fn):
329
    """Decorator for "public" functions.
330

331
    This function should be used for all "public" functions. That is, functions
332
    usually called from other classes.
333

334
    Important: Use this decorator only after utils.LockedMethod!
335

336
    Example:
337
      @utils.LockedMethod
338
      @_RequireOpenQueue
339
      def Example(self):
340
        pass
341

342
    """
343
    def wrapper(self, *args, **kwargs):
344
      assert self._queue_lock is not None, "Queue should be open"
345
      return fn(self, *args, **kwargs)
346
    return wrapper
347

    
348
  def __init__(self, context):
349
    self.context = context
350
    self._memcache = weakref.WeakValueDictionary()
351
    self._my_hostname = utils.HostInfo().name
352

    
353
    # Locking
354
    self._lock = threading.Lock()
355
    self.acquire = self._lock.acquire
356
    self.release = self._lock.release
357

    
358
    # Initialize
359
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
360

    
361
    # Read serial file
362
    self._last_serial = jstore.ReadSerial()
363
    assert self._last_serial is not None, ("Serial file was modified between"
364
                                           " check in jstore and here")
365

    
366
    # Get initial list of nodes
367
    self._nodes = dict((n.name, n.primary_ip)
368
                       for n in self.context.cfg.GetAllNodesInfo().values())
369

    
370
    # Remove master node
371
    try:
372
      del self._nodes[self._my_hostname]
373
    except ValueError:
374
      pass
375

    
376
    # TODO: Check consistency across nodes
377

    
378
    # Setup worker pool
379
    self._wpool = _JobQueueWorkerPool(self)
380

    
381
    # We need to lock here because WorkerPool.AddTask() may start a job while
382
    # we're still doing our work.
383
    self.acquire()
384
    try:
385
      for job in self._GetJobsUnlocked(None):
386
        # a failure in loading the job can cause 'None' to be returned
387
        if job is None:
388
          continue
389

    
390
        status = job.CalcStatus()
391

    
392
        if status in (constants.JOB_STATUS_QUEUED, ):
393
          self._wpool.AddTask(job)
394

    
395
        elif status in (constants.JOB_STATUS_RUNNING,
396
                        constants.JOB_STATUS_WAITLOCK):
397
          logging.warning("Unfinished job %s found: %s", job.id, job)
398
          try:
399
            for op in job.ops:
400
              op.status = constants.OP_STATUS_ERROR
401
              op.result = "Unclean master daemon shutdown"
402
          finally:
403
            self.UpdateJobUnlocked(job)
404
    finally:
405
      self.release()
406

    
407
  @utils.LockedMethod
408
  @_RequireOpenQueue
409
  def AddNode(self, node):
410
    """Register a new node with the queue.
411

412
    @type node: L{objects.Node}
413
    @param node: the node object to be added
414

415
    """
416
    node_name = node.name
417
    assert node_name != self._my_hostname
418

    
419
    # Clean queue directory on added node
420
    RpcRunner.call_jobqueue_purge(node_name)
421

    
422
    # Upload the whole queue excluding archived jobs
423
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
424

    
425
    # Upload current serial file
426
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
427

    
428
    for file_name in files:
429
      # Read file content
430
      fd = open(file_name, "r")
431
      try:
432
        content = fd.read()
433
      finally:
434
        fd.close()
435

    
436
      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
437
                                              file_name, content)
438
      if not result[node_name]:
439
        logging.error("Failed to upload %s to %s", file_name, node_name)
440

    
441
    self._nodes[node_name] = node.primary_ip
442

    
443
  @utils.LockedMethod
444
  @_RequireOpenQueue
445
  def RemoveNode(self, node_name):
446
    try:
447
      # The queue is removed by the "leave node" RPC call.
448
      del self._nodes[node_name]
449
    except KeyError:
450
      pass
451

    
452
  def _CheckRpcResult(self, result, nodes, failmsg):
453
    failed = []
454
    success = []
455

    
456
    for node in nodes:
457
      if result[node]:
458
        success.append(node)
459
      else:
460
        failed.append(node)
461

    
462
    if failed:
463
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
464

    
465
    # +1 for the master node
466
    if (len(success) + 1) < len(failed):
467
      # TODO: Handle failing nodes
468
      logging.error("More than half of the nodes failed")
469

    
470
  def _GetNodeIp(self):
471
    """Helper for returning the node name/ip list.
472

473
    """
474
    name_list = self._nodes.keys()
475
    addr_list = [self._nodes[name] for name in name_list]
476
    return name_list, addr_list
477

    
478
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
479
    """Writes a file locally and then replicates it to all nodes.
480

481
    """
482
    utils.WriteFile(file_name, data=data)
483

    
484
    names, addrs = self._GetNodeIp()
485
    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
486
    self._CheckRpcResult(result, self._nodes,
487
                         "Updating %s" % file_name)
488

    
489
  def _RenameFileUnlocked(self, old, new):
490
    os.rename(old, new)
491

    
492
    names, addrs = self._GetNodeIp()
493
    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
494
    self._CheckRpcResult(result, self._nodes,
495
                         "Moving %s to %s" % (old, new))
496

    
497
  def _FormatJobID(self, job_id):
498
    if not isinstance(job_id, (int, long)):
499
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
500
    if job_id < 0:
501
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
502

    
503
    return str(job_id)
504

    
505
  def _NewSerialUnlocked(self):
506
    """Generates a new job identifier.
507

508
    Job identifiers are unique during the lifetime of a cluster.
509

510
    Returns: A string representing the job identifier.
511

512
    """
513
    # New number
514
    serial = self._last_serial + 1
515

    
516
    # Write to file
517
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
518
                                        "%s\n" % serial)
519

    
520
    # Keep it only if we were able to write the file
521
    self._last_serial = serial
522

    
523
    return self._FormatJobID(serial)
524

    
525
  @staticmethod
526
  def _GetJobPath(job_id):
527
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
528

    
529
  @staticmethod
530
  def _GetArchivedJobPath(job_id):
531
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
532

    
533
  @classmethod
534
  def _ExtractJobID(cls, name):
535
    m = cls._RE_JOB_FILE.match(name)
536
    if m:
537
      return m.group(1)
538
    else:
539
      return None
540

    
541
  def _GetJobIDsUnlocked(self, archived=False):
542
    """Return all known job IDs.
543

544
    If the parameter archived is True, archived jobs IDs will be
545
    included. Currently this argument is unused.
546

547
    The method only looks at disk because it's a requirement that all
548
    jobs are present on disk (so in the _memcache we don't have any
549
    extra IDs).
550

551
    """
552
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
553
    jlist = utils.NiceSort(jlist)
554
    return jlist
555

    
556
  def _ListJobFiles(self):
557
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
558
            if self._RE_JOB_FILE.match(name)]
559

    
560
  def _LoadJobUnlocked(self, job_id):
561
    job = self._memcache.get(job_id, None)
562
    if job:
563
      logging.debug("Found job %s in memcache", job_id)
564
      return job
565

    
566
    filepath = self._GetJobPath(job_id)
567
    logging.debug("Loading job from %s", filepath)
568
    try:
569
      fd = open(filepath, "r")
570
    except IOError, err:
571
      if err.errno in (errno.ENOENT, ):
572
        return None
573
      raise
574
    try:
575
      data = serializer.LoadJson(fd.read())
576
    finally:
577
      fd.close()
578

    
579
    try:
580
      job = _QueuedJob.Restore(self, data)
581
    except Exception, err:
582
      new_path = self._GetArchivedJobPath(job_id)
583
      if filepath == new_path:
584
        # job already archived (future case)
585
        logging.exception("Can't parse job %s", job_id)
586
      else:
587
        # non-archived case
588
        logging.exception("Can't parse job %s, will archive.", job_id)
589
        self._RenameFileUnlocked(filepath, new_path)
590
      return None
591

    
592
    self._memcache[job_id] = job
593
    logging.debug("Added job %s to the cache", job_id)
594
    return job
595

    
596
  def _GetJobsUnlocked(self, job_ids):
597
    if not job_ids:
598
      job_ids = self._GetJobIDsUnlocked()
599

    
600
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
601

    
602
  @staticmethod
603
  def _IsQueueMarkedDrain():
604
    """Check if the queue is marked from drain.
605

606
    This currently uses the queue drain file, which makes it a
607
    per-node flag. In the future this can be moved to the config file.
608

609
    """
610
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
611

    
612
  @staticmethod
613
  def SetDrainFlag(drain_flag):
614
    """Sets the drain flag for the queue.
615

616
    This is similar to the function L{backend.JobQueueSetDrainFlag},
617
    and in the future we might merge them.
618

619
    """
620
    if drain_flag:
621
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
622
    else:
623
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
624
    return True
625

    
626
  @utils.LockedMethod
627
  @_RequireOpenQueue
628
  def SubmitJob(self, ops):
629
    """Create and store a new job.
630

631
    This enters the job into our job queue and also puts it on the new
632
    queue, in order for it to be picked up by the queue processors.
633

634
    @type ops: list
635
    @param ops: The list of OpCodes that will become the new job.
636

637
    """
638
    if self._IsQueueMarkedDrain():
639
      raise errors.JobQueueDrainError()
640
    # Get job identifier
641
    job_id = self._NewSerialUnlocked()
642
    job = _QueuedJob(self, job_id, ops)
643

    
644
    # Write to disk
645
    self.UpdateJobUnlocked(job)
646

    
647
    logging.debug("Adding new job %s to the cache", job_id)
648
    self._memcache[job_id] = job
649

    
650
    # Add to worker pool
651
    self._wpool.AddTask(job)
652

    
653
    return job.id
654

    
655
  @_RequireOpenQueue
656
  def UpdateJobUnlocked(self, job):
657
    filename = self._GetJobPath(job.id)
658
    data = serializer.DumpJson(job.Serialize(), indent=False)
659
    logging.debug("Writing job %s to %s", job.id, filename)
660
    self._WriteAndReplicateFileUnlocked(filename, data)
661

    
662
    # Notify waiters about potential changes
663
    job.change.notifyAll()
664

    
665
  @utils.LockedMethod
666
  @_RequireOpenQueue
667
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
668
                        timeout):
669
    """Waits for changes in a job.
670

671
    @type job_id: string
672
    @param job_id: Job identifier
673
    @type fields: list of strings
674
    @param fields: Which fields to check for changes
675
    @type prev_job_info: list or None
676
    @param prev_job_info: Last job information returned
677
    @type prev_log_serial: int
678
    @param prev_log_serial: Last job message serial number
679
    @type timeout: float
680
    @param timeout: maximum time to wait
681

682
    """
683
    logging.debug("Waiting for changes in job %s", job_id)
684
    end_time = time.time() + timeout
685
    while True:
686
      delta_time = end_time - time.time()
687
      if delta_time < 0:
688
        return constants.JOB_NOTCHANGED
689

    
690
      job = self._LoadJobUnlocked(job_id)
691
      if not job:
692
        logging.debug("Job %s not found", job_id)
693
        break
694

    
695
      status = job.CalcStatus()
696
      job_info = self._GetJobInfoUnlocked(job, fields)
697
      log_entries = job.GetLogEntries(prev_log_serial)
698

    
699
      # Serializing and deserializing data can cause type changes (e.g. from
700
      # tuple to list) or precision loss. We're doing it here so that we get
701
      # the same modifications as the data received from the client. Without
702
      # this, the comparison afterwards might fail without the data being
703
      # significantly different.
704
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
705
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
706

    
707
      if status not in (constants.JOB_STATUS_QUEUED,
708
                        constants.JOB_STATUS_RUNNING,
709
                        constants.JOB_STATUS_WAITLOCK):
710
        # Don't even try to wait if the job is no longer running, there will be
711
        # no changes.
712
        break
713

    
714
      if (prev_job_info != job_info or
715
          (log_entries and prev_log_serial != log_entries[0][0])):
716
        break
717

    
718
      logging.debug("Waiting again")
719

    
720
      # Release the queue lock while waiting
721
      job.change.wait(delta_time)
722

    
723
    logging.debug("Job %s changed", job_id)
724

    
725
    return (job_info, log_entries)
726

    
727
  @utils.LockedMethod
728
  @_RequireOpenQueue
729
  def CancelJob(self, job_id):
730
    """Cancels a job.
731

732
    @type job_id: string
733
    @param job_id: Job ID of job to be cancelled.
734

735
    """
736
    logging.debug("Cancelling job %s", job_id)
737

    
738
    job = self._LoadJobUnlocked(job_id)
739
    if not job:
740
      logging.debug("Job %s not found", job_id)
741
      return
742

    
743
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
744
      logging.debug("Job %s is no longer in the queue", job.id)
745
      return
746

    
747
    try:
748
      for op in job.ops:
749
        op.status = constants.OP_STATUS_ERROR
750
        op.result = "Job cancelled by request"
751
    finally:
752
      self.UpdateJobUnlocked(job)
753

    
754
  @_RequireOpenQueue
755
  def _ArchiveJobUnlocked(self, job_id):
756
    """Archives a job.
757

758
    @type job_id: string
759
    @param job_id: Job ID of job to be archived.
760

761
    """
762
    logging.info("Archiving job %s", job_id)
763

    
764
    job = self._LoadJobUnlocked(job_id)
765
    if not job:
766
      logging.debug("Job %s not found", job_id)
767
      return
768

    
769
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
770
                                constants.JOB_STATUS_SUCCESS,
771
                                constants.JOB_STATUS_ERROR):
772
      logging.debug("Job %s is not yet done", job.id)
773
      return
774

    
775
    old = self._GetJobPath(job.id)
776
    new = self._GetArchivedJobPath(job.id)
777

    
778
    self._RenameFileUnlocked(old, new)
779

    
780
    logging.debug("Successfully archived job %s", job.id)
781

    
782
  @utils.LockedMethod
783
  @_RequireOpenQueue
784
  def ArchiveJob(self, job_id):
785
    """Archives a job.
786

787
    @type job_id: string
788
    @param job_id: Job ID of job to be archived.
789

790
    """
791
    return self._ArchiveJobUnlocked(job_id)
792

    
793
  @utils.LockedMethod
794
  @_RequireOpenQueue
795
  def AutoArchiveJobs(self, age):
796
    """Archives all jobs based on age.
797

798
    The method will archive all jobs which are older than the age
799
    parameter. For jobs that don't have an end timestamp, the start
800
    timestamp will be considered. The special '-1' age will cause
801
    archival of all jobs (that are not running or queued).
802

803
    @type age: int
804
    @param age: the minimum age in seconds
805

806
    """
807
    logging.info("Archiving jobs with age more than %s seconds", age)
808

    
809
    now = time.time()
810
    for jid in self._GetJobIDsUnlocked(archived=False):
811
      job = self._LoadJobUnlocked(jid)
812
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
813
                                  constants.OP_STATUS_ERROR,
814
                                  constants.OP_STATUS_CANCELED):
815
        continue
816
      if job.end_timestamp is None:
817
        if job.start_timestamp is None:
818
          job_age = job.received_timestamp
819
        else:
820
          job_age = job.start_timestamp
821
      else:
822
        job_age = job.end_timestamp
823

    
824
      if age == -1 or now - job_age[0] > age:
825
        self._ArchiveJobUnlocked(jid)
826

    
827
  def _GetJobInfoUnlocked(self, job, fields):
828
    row = []
829
    for fname in fields:
830
      if fname == "id":
831
        row.append(job.id)
832
      elif fname == "status":
833
        row.append(job.CalcStatus())
834
      elif fname == "ops":
835
        row.append([op.input.__getstate__() for op in job.ops])
836
      elif fname == "opresult":
837
        row.append([op.result for op in job.ops])
838
      elif fname == "opstatus":
839
        row.append([op.status for op in job.ops])
840
      elif fname == "oplog":
841
        row.append([op.log for op in job.ops])
842
      elif fname == "opstart":
843
        row.append([op.start_timestamp for op in job.ops])
844
      elif fname == "opend":
845
        row.append([op.end_timestamp for op in job.ops])
846
      elif fname == "received_ts":
847
        row.append(job.received_timestamp)
848
      elif fname == "start_ts":
849
        row.append(job.start_timestamp)
850
      elif fname == "end_ts":
851
        row.append(job.end_timestamp)
852
      elif fname == "summary":
853
        row.append([op.input.Summary() for op in job.ops])
854
      else:
855
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
856
    return row
857

    
858
  @utils.LockedMethod
859
  @_RequireOpenQueue
860
  def QueryJobs(self, job_ids, fields):
861
    """Returns a list of jobs in queue.
862

863
    Args:
864
    - job_ids: Sequence of job identifiers or None for all
865
    - fields: Names of fields to return
866

867
    """
868
    jobs = []
869

    
870
    for job in self._GetJobsUnlocked(job_ids):
871
      if job is None:
872
        jobs.append(None)
873
      else:
874
        jobs.append(self._GetJobInfoUnlocked(job, fields))
875

    
876
    return jobs
877

    
878
  @utils.LockedMethod
879
  @_RequireOpenQueue
880
  def Shutdown(self):
881
    """Stops the job queue.
882

883
    """
884
    self._wpool.TerminateWorkers()
885

    
886
    self._queue_lock.Close()
887
    self._queue_lock = None