Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 3c0d60d0

History | View | Annotate | Download (45.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import errno
35
import re
36
import time
37
import weakref
38

    
39
try:
40
  # pylint: disable-msg=E0611
41
  from pyinotify import pyinotify
42
except ImportError:
43
  import pyinotify
44

    
45
from ganeti import asyncnotifier
46
from ganeti import constants
47
from ganeti import serializer
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56

    
57

    
58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60

    
61
# The Big JobQueue lock. As for all B*Lock conversions, it must be acquired in
62
# shared mode to ensure exclusion with legacy code, which acquires it
63
# exclusively. It can not be acquired at all only after concurrency with all
64
# new and legacy code is ensured.
65
_big_jqueue_lock = locking.SharedLock()
66

    
67

    
68
class CancelJob(Exception):
69
  """Special exception to cancel a job.
70

71
  """
72

    
73

    
74
def TimeStampNow():
75
  """Returns the current timestamp.
76

77
  @rtype: tuple
78
  @return: the current time in the (seconds, microseconds) format
79

80
  """
81
  return utils.SplitTime(time.time())
82

    
83

    
84
class _QueuedOpCode(object):
85
  """Encapsulates an opcode object.
86

87
  @ivar log: holds the execution log and consists of tuples
88
  of the form C{(log_serial, timestamp, level, message)}
89
  @ivar input: the OpCode we encapsulate
90
  @ivar status: the current status
91
  @ivar result: the result of the LU execution
92
  @ivar start_timestamp: timestamp for the start of the execution
93
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94
  @ivar stop_timestamp: timestamp for the end of the execution
95

96
  """
97
  __slots__ = ["input", "status", "result", "log",
98
               "start_timestamp", "exec_timestamp", "end_timestamp",
99
               "__weakref__"]
100

    
101
  def __init__(self, op):
102
    """Constructor for the _QuededOpCode.
103

104
    @type op: L{opcodes.OpCode}
105
    @param op: the opcode we encapsulate
106

107
    """
108
    self.input = op
109
    self.status = constants.OP_STATUS_QUEUED
110
    self.result = None
111
    self.log = []
112
    self.start_timestamp = None
113
    self.exec_timestamp = None
114
    self.end_timestamp = None
115

    
116
  @classmethod
117
  def Restore(cls, state):
118
    """Restore the _QueuedOpCode from the serialized form.
119

120
    @type state: dict
121
    @param state: the serialized state
122
    @rtype: _QueuedOpCode
123
    @return: a new _QueuedOpCode instance
124

125
    """
126
    obj = _QueuedOpCode.__new__(cls)
127
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128
    obj.status = state["status"]
129
    obj.result = state["result"]
130
    obj.log = state["log"]
131
    obj.start_timestamp = state.get("start_timestamp", None)
132
    obj.exec_timestamp = state.get("exec_timestamp", None)
133
    obj.end_timestamp = state.get("end_timestamp", None)
134
    return obj
135

    
136
  def Serialize(self):
137
    """Serializes this _QueuedOpCode.
138

139
    @rtype: dict
140
    @return: the dictionary holding the serialized state
141

142
    """
143
    return {
144
      "input": self.input.__getstate__(),
145
      "status": self.status,
146
      "result": self.result,
147
      "log": self.log,
148
      "start_timestamp": self.start_timestamp,
149
      "exec_timestamp": self.exec_timestamp,
150
      "end_timestamp": self.end_timestamp,
151
      }
152

    
153

    
154
class _QueuedJob(object):
155
  """In-memory job representation.
156

157
  This is what we use to track the user-submitted jobs. Locking must
158
  be taken care of by users of this class.
159

160
  @type queue: L{JobQueue}
161
  @ivar queue: the parent queue
162
  @ivar id: the job ID
163
  @type ops: list
164
  @ivar ops: the list of _QueuedOpCode that constitute the job
165
  @type log_serial: int
166
  @ivar log_serial: holds the index for the next log entry
167
  @ivar received_timestamp: the timestamp for when the job was received
168
  @ivar start_timestmap: the timestamp for start of execution
169
  @ivar end_timestamp: the timestamp for end of execution
170
  @ivar lock_status: In-memory locking information for debugging
171

172
  """
173
  # pylint: disable-msg=W0212
174
  __slots__ = ["queue", "id", "ops", "log_serial",
175
               "received_timestamp", "start_timestamp", "end_timestamp",
176
               "lock_status", "change",
177
               "__weakref__"]
178

    
179
  def __init__(self, queue, job_id, ops):
180
    """Constructor for the _QueuedJob.
181

182
    @type queue: L{JobQueue}
183
    @param queue: our parent queue
184
    @type job_id: job_id
185
    @param job_id: our job id
186
    @type ops: list
187
    @param ops: the list of opcodes we hold, which will be encapsulated
188
        in _QueuedOpCodes
189

190
    """
191
    if not ops:
192
      raise errors.GenericError("A job needs at least one opcode")
193

    
194
    self.queue = queue
195
    self.id = job_id
196
    self.ops = [_QueuedOpCode(op) for op in ops]
197
    self.log_serial = 0
198
    self.received_timestamp = TimeStampNow()
199
    self.start_timestamp = None
200
    self.end_timestamp = None
201

    
202
    # In-memory attributes
203
    self.lock_status = None
204

    
205
  def __repr__(self):
206
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207
              "id=%s" % self.id,
208
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209

    
210
    return "<%s at %#x>" % (" ".join(status), id(self))
211

    
212
  @classmethod
213
  def Restore(cls, queue, state):
214
    """Restore a _QueuedJob from serialized state:
215

216
    @type queue: L{JobQueue}
217
    @param queue: to which queue the restored job belongs
218
    @type state: dict
219
    @param state: the serialized state
220
    @rtype: _JobQueue
221
    @return: the restored _JobQueue instance
222

223
    """
224
    obj = _QueuedJob.__new__(cls)
225
    obj.queue = queue
226
    obj.id = state["id"]
227
    obj.received_timestamp = state.get("received_timestamp", None)
228
    obj.start_timestamp = state.get("start_timestamp", None)
229
    obj.end_timestamp = state.get("end_timestamp", None)
230

    
231
    # In-memory attributes
232
    obj.lock_status = None
233

    
234
    obj.ops = []
235
    obj.log_serial = 0
236
    for op_state in state["ops"]:
237
      op = _QueuedOpCode.Restore(op_state)
238
      for log_entry in op.log:
239
        obj.log_serial = max(obj.log_serial, log_entry[0])
240
      obj.ops.append(op)
241

    
242
    return obj
243

    
244
  def Serialize(self):
245
    """Serialize the _JobQueue instance.
246

247
    @rtype: dict
248
    @return: the serialized state
249

250
    """
251
    return {
252
      "id": self.id,
253
      "ops": [op.Serialize() for op in self.ops],
254
      "start_timestamp": self.start_timestamp,
255
      "end_timestamp": self.end_timestamp,
256
      "received_timestamp": self.received_timestamp,
257
      }
258

    
259
  def CalcStatus(self):
260
    """Compute the status of this job.
261

262
    This function iterates over all the _QueuedOpCodes in the job and
263
    based on their status, computes the job status.
264

265
    The algorithm is:
266
      - if we find a cancelled, or finished with error, the job
267
        status will be the same
268
      - otherwise, the last opcode with the status one of:
269
          - waitlock
270
          - canceling
271
          - running
272

273
        will determine the job status
274

275
      - otherwise, it means either all opcodes are queued, or success,
276
        and the job status will be the same
277

278
    @return: the job status
279

280
    """
281
    status = constants.JOB_STATUS_QUEUED
282

    
283
    all_success = True
284
    for op in self.ops:
285
      if op.status == constants.OP_STATUS_SUCCESS:
286
        continue
287

    
288
      all_success = False
289

    
290
      if op.status == constants.OP_STATUS_QUEUED:
291
        pass
292
      elif op.status == constants.OP_STATUS_WAITLOCK:
293
        status = constants.JOB_STATUS_WAITLOCK
294
      elif op.status == constants.OP_STATUS_RUNNING:
295
        status = constants.JOB_STATUS_RUNNING
296
      elif op.status == constants.OP_STATUS_CANCELING:
297
        status = constants.JOB_STATUS_CANCELING
298
        break
299
      elif op.status == constants.OP_STATUS_ERROR:
300
        status = constants.JOB_STATUS_ERROR
301
        # The whole job fails if one opcode failed
302
        break
303
      elif op.status == constants.OP_STATUS_CANCELED:
304
        status = constants.OP_STATUS_CANCELED
305
        break
306

    
307
    if all_success:
308
      status = constants.JOB_STATUS_SUCCESS
309

    
310
    return status
311

    
312
  def GetLogEntries(self, newer_than):
313
    """Selectively returns the log entries.
314

315
    @type newer_than: None or int
316
    @param newer_than: if this is None, return all log entries,
317
        otherwise return only the log entries with serial higher
318
        than this value
319
    @rtype: list
320
    @return: the list of the log entries selected
321

322
    """
323
    if newer_than is None:
324
      serial = -1
325
    else:
326
      serial = newer_than
327

    
328
    entries = []
329
    for op in self.ops:
330
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331

    
332
    return entries
333

    
334
  def GetInfo(self, fields):
335
    """Returns information about a job.
336

337
    @type fields: list
338
    @param fields: names of fields to return
339
    @rtype: list
340
    @return: list with one element for each field
341
    @raise errors.OpExecError: when an invalid field
342
        has been passed
343

344
    """
345
    row = []
346
    for fname in fields:
347
      if fname == "id":
348
        row.append(self.id)
349
      elif fname == "status":
350
        row.append(self.CalcStatus())
351
      elif fname == "ops":
352
        row.append([op.input.__getstate__() for op in self.ops])
353
      elif fname == "opresult":
354
        row.append([op.result for op in self.ops])
355
      elif fname == "opstatus":
356
        row.append([op.status for op in self.ops])
357
      elif fname == "oplog":
358
        row.append([op.log for op in self.ops])
359
      elif fname == "opstart":
360
        row.append([op.start_timestamp for op in self.ops])
361
      elif fname == "opexec":
362
        row.append([op.exec_timestamp for op in self.ops])
363
      elif fname == "opend":
364
        row.append([op.end_timestamp for op in self.ops])
365
      elif fname == "received_ts":
366
        row.append(self.received_timestamp)
367
      elif fname == "start_ts":
368
        row.append(self.start_timestamp)
369
      elif fname == "end_ts":
370
        row.append(self.end_timestamp)
371
      elif fname == "lock_status":
372
        row.append(self.lock_status)
373
      elif fname == "summary":
374
        row.append([op.input.Summary() for op in self.ops])
375
      else:
376
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
377
    return row
378

    
379
  def MarkUnfinishedOps(self, status, result):
380
    """Mark unfinished opcodes with a given status and result.
381

382
    This is an utility function for marking all running or waiting to
383
    be run opcodes with a given status. Opcodes which are already
384
    finalised are not changed.
385

386
    @param status: a given opcode status
387
    @param result: the opcode result
388

389
    """
390
    try:
391
      not_marked = True
392
      for op in self.ops:
393
        if op.status in constants.OPS_FINALIZED:
394
          assert not_marked, "Finalized opcodes found after non-finalized ones"
395
          continue
396
        op.status = status
397
        op.result = result
398
        not_marked = False
399
    finally:
400
      self.queue.UpdateJobUnlocked(self)
401

    
402

    
403
class _OpExecCallbacks(mcpu.OpExecCbBase):
404
  def __init__(self, queue, job, op):
405
    """Initializes this class.
406

407
    @type queue: L{JobQueue}
408
    @param queue: Job queue
409
    @type job: L{_QueuedJob}
410
    @param job: Job object
411
    @type op: L{_QueuedOpCode}
412
    @param op: OpCode
413

414
    """
415
    assert queue, "Queue is missing"
416
    assert job, "Job is missing"
417
    assert op, "Opcode is missing"
418

    
419
    self._queue = queue
420
    self._job = job
421
    self._op = op
422

    
423
  def NotifyStart(self):
424
    """Mark the opcode as running, not lock-waiting.
425

426
    This is called from the mcpu code as a notifier function, when the LU is
427
    finally about to start the Exec() method. Of course, to have end-user
428
    visible results, the opcode must be initially (before calling into
429
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430

431
    """
432
    self._queue.acquire(shared=1)
433
    try:
434
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
435
                                 constants.OP_STATUS_CANCELING)
436

    
437
      # All locks are acquired by now
438
      self._job.lock_status = None
439

    
440
      # Cancel here if we were asked to
441
      if self._op.status == constants.OP_STATUS_CANCELING:
442
        raise CancelJob()
443

    
444
      self._op.status = constants.OP_STATUS_RUNNING
445
      self._op.exec_timestamp = TimeStampNow()
446
    finally:
447
      self._queue.release()
448

    
449
  @locking.ssynchronized(_big_jqueue_lock, shared=1)
450
  def _AppendFeedback(self, timestamp, log_type, log_msg):
451
    """Internal feedback append function, with locks
452

453
    """
454
    self._job.log_serial += 1
455
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
456
    self._queue.UpdateJobUnlocked(self._job, replicate=False)
457

    
458
  def Feedback(self, *args):
459
    """Append a log entry.
460

461
    """
462
    assert len(args) < 3
463

    
464
    if len(args) == 1:
465
      log_type = constants.ELOG_MESSAGE
466
      log_msg = args[0]
467
    else:
468
      (log_type, log_msg) = args
469

    
470
    # The time is split to make serialization easier and not lose
471
    # precision.
472
    timestamp = utils.SplitTime(time.time())
473
    self._AppendFeedback(timestamp, log_type, log_msg)
474

    
475
  def ReportLocks(self, msg):
476
    """Write locking information to the job.
477

478
    Called whenever the LU processor is waiting for a lock or has acquired one.
479

480
    """
481
    # Not getting the queue lock because this is a single assignment
482
    self._job.lock_status = msg
483

    
484

    
485
class _WaitForJobChangesHelper(object):
486
  """Helper class using initofy to wait for changes in a job file.
487

488
  This class takes a previous job status and serial, and alerts the client when
489
  the current job status has changed.
490

491
  @type job_id: string
492
  @ivar job_id: id of the job we're watching
493
  @type prev_job_info: string
494
  @ivar prev_job_info: previous job info, as passed by the luxi client
495
  @type prev_log_serial: string
496
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
497
  @type queue: L{JobQueue}
498
  @ivar queue: job queue (used for a few utility functions)
499
  @type job_path: string
500
  @ivar job_path: absolute path of the job file
501
  @type wm: pyinotify.WatchManager (or None)
502
  @ivar wm: inotify watch manager to watch for changes
503
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
504
  @ivar inotify_handler: single file event handler, used for watching
505
  @type notifier: pyinotify.Notifier
506
  @ivar notifier: inotify single-threaded notifier, used for watching
507

508
  """
509
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
510
    self.job_id = job_id
511
    self.fields = fields
512
    self.prev_job_info = prev_job_info
513
    self.prev_log_serial = prev_log_serial
514
    self.queue = queue
515
    # pylint: disable-msg=W0212
516
    self.job_path = self.queue._GetJobPath(self.job_id)
517
    self.wm = None
518
    self.inotify_handler = None
519
    self.notifier = None
520

    
521
  def _SetupInotify(self):
522
    """Create the inotify
523

524
    @raises errors.InotifyError: if the notifier cannot be setup
525

526
    """
527
    if self.wm:
528
      return
529
    self.wm = pyinotify.WatchManager()
530
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
531
                                                                self.OnInotify,
532
                                                                self.job_path)
533
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
534
    self.inotify_handler.enable()
535

    
536
  def _LoadDiskStatus(self):
537
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
538
    if not job:
539
      raise errors.JobLost()
540
    self.job_status = job.CalcStatus()
541

    
542
    job_info = job.GetInfo(self.fields)
543
    log_entries = job.GetLogEntries(self.prev_log_serial)
544
    # Serializing and deserializing data can cause type changes (e.g. from
545
    # tuple to list) or precision loss. We're doing it here so that we get
546
    # the same modifications as the data received from the client. Without
547
    # this, the comparison afterwards might fail without the data being
548
    # significantly different.
549
    # TODO: we just deserialized from disk, investigate how to make sure that
550
    # the job info and log entries are compatible to avoid this further step.
551
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
552
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
553

    
554
  def _CheckForChanges(self):
555
    self._LoadDiskStatus()
556
    # Don't even try to wait if the job is no longer running, there will be
557
    # no changes.
558
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
559
                                constants.JOB_STATUS_RUNNING,
560
                                constants.JOB_STATUS_WAITLOCK) or
561
        self.prev_job_info != self.job_info or
562
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
563
      logging.debug("Job %s changed", self.job_id)
564
      return (self.job_info, self.log_entries)
565

    
566
    raise utils.RetryAgain()
567

    
568
  def OnInotify(self, notifier_enabled):
569
    if not notifier_enabled:
570
      self.inotify_handler.enable()
571

    
572
  def WaitFn(self, timeout):
573
    self._SetupInotify()
574
    if self.notifier.check_events(timeout*1000):
575
      self.notifier.read_events()
576
    self.notifier.process_events()
577

    
578
  def WaitForChanges(self, timeout):
579
    try:
580
      return utils.Retry(self._CheckForChanges,
581
                         utils.RETRY_REMAINING_TIME,
582
                         timeout,
583
                         wait_fn=self.WaitFn)
584
    except (errors.InotifyError, errors.JobLost):
585
      return None
586
    except utils.RetryTimeout:
587
      return constants.JOB_NOTCHANGED
588

    
589
  def Close(self):
590
    if self.wm:
591
      self.notifier.stop()
592

    
593

    
594
class _JobQueueWorker(workerpool.BaseWorker):
595
  """The actual job workers.
596

597
  """
598
  def RunTask(self, job): # pylint: disable-msg=W0221
599
    """Job executor.
600

601
    This functions processes a job. It is closely tied to the _QueuedJob and
602
    _QueuedOpCode classes.
603

604
    @type job: L{_QueuedJob}
605
    @param job: the job to be processed
606

607
    """
608
    logging.info("Processing job %s", job.id)
609
    proc = mcpu.Processor(self.pool.queue.context, job.id)
610
    queue = job.queue
611
    try:
612
      try:
613
        count = len(job.ops)
614
        for idx, op in enumerate(job.ops):
615
          op_summary = op.input.Summary()
616
          if op.status == constants.OP_STATUS_SUCCESS:
617
            # this is a job that was partially completed before master
618
            # daemon shutdown, so it can be expected that some opcodes
619
            # are already completed successfully (if any did error
620
            # out, then the whole job should have been aborted and not
621
            # resubmitted for processing)
622
            logging.info("Op %s/%s: opcode %s already processed, skipping",
623
                         idx + 1, count, op_summary)
624
            continue
625
          try:
626
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
627
                         op_summary)
628

    
629
            queue.acquire(shared=1)
630
            try:
631
              if op.status == constants.OP_STATUS_CANCELED:
632
                raise CancelJob()
633
              assert op.status == constants.OP_STATUS_QUEUED
634
              op.status = constants.OP_STATUS_WAITLOCK
635
              op.result = None
636
              op.start_timestamp = TimeStampNow()
637
              if idx == 0: # first opcode
638
                job.start_timestamp = op.start_timestamp
639
              queue.UpdateJobUnlocked(job)
640

    
641
              input_opcode = op.input
642
            finally:
643
              queue.release()
644

    
645
            # Make sure not to hold queue lock while calling ExecOpCode
646
            result = proc.ExecOpCode(input_opcode,
647
                                     _OpExecCallbacks(queue, job, op))
648

    
649
            queue.acquire(shared=1)
650
            try:
651
              op.status = constants.OP_STATUS_SUCCESS
652
              op.result = result
653
              op.end_timestamp = TimeStampNow()
654
              queue.UpdateJobUnlocked(job)
655
            finally:
656
              queue.release()
657

    
658
            logging.info("Op %s/%s: Successfully finished opcode %s",
659
                         idx + 1, count, op_summary)
660
          except CancelJob:
661
            # Will be handled further up
662
            raise
663
          except Exception, err:
664
            queue.acquire(shared=1)
665
            try:
666
              try:
667
                op.status = constants.OP_STATUS_ERROR
668
                if isinstance(err, errors.GenericError):
669
                  op.result = errors.EncodeException(err)
670
                else:
671
                  op.result = str(err)
672
                op.end_timestamp = TimeStampNow()
673
                logging.info("Op %s/%s: Error in opcode %s: %s",
674
                             idx + 1, count, op_summary, err)
675
              finally:
676
                queue.UpdateJobUnlocked(job)
677
            finally:
678
              queue.release()
679
            raise
680

    
681
      except CancelJob:
682
        queue.acquire(shared=1)
683
        try:
684
          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
685
                                "Job canceled by request")
686
        finally:
687
          queue.release()
688
      except errors.GenericError, err:
689
        logging.exception("Ganeti exception")
690
      except:
691
        logging.exception("Unhandled exception")
692
    finally:
693
      queue.acquire(shared=1)
694
      try:
695
        try:
696
          job.lock_status = None
697
          job.end_timestamp = TimeStampNow()
698
          queue.UpdateJobUnlocked(job)
699
        finally:
700
          job_id = job.id
701
          status = job.CalcStatus()
702
      finally:
703
        queue.release()
704

    
705
      logging.info("Finished job %s, status = %s", job_id, status)
706

    
707

    
708
class _JobQueueWorkerPool(workerpool.WorkerPool):
709
  """Simple class implementing a job-processing workerpool.
710

711
  """
712
  def __init__(self, queue):
713
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
714
                                              JOBQUEUE_THREADS,
715
                                              _JobQueueWorker)
716
    self.queue = queue
717

    
718

    
719
def _RequireOpenQueue(fn):
720
  """Decorator for "public" functions.
721

722
  This function should be used for all 'public' functions. That is,
723
  functions usually called from other classes. Note that this should
724
  be applied only to methods (not plain functions), since it expects
725
  that the decorated function is called with a first argument that has
726
  a '_queue_filelock' argument.
727

728
  @warning: Use this decorator only after locking.ssynchronized
729

730
  Example::
731
    @locking.ssynchronized(_big_jqueue_lock)
732
    @_RequireOpenQueue
733
    def Example(self):
734
      pass
735

736
  """
737
  def wrapper(self, *args, **kwargs):
738
    # pylint: disable-msg=W0212
739
    assert self._queue_filelock is not None, "Queue should be open"
740
    return fn(self, *args, **kwargs)
741
  return wrapper
742

    
743

    
744
class JobQueue(object):
745
  """Queue used to manage the jobs.
746

747
  @cvar _RE_JOB_FILE: regex matching the valid job file names
748

749
  """
750
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
751

    
752
  def __init__(self, context):
753
    """Constructor for JobQueue.
754

755
    The constructor will initialize the job queue object and then
756
    start loading the current jobs from disk, either for starting them
757
    (if they were queue) or for aborting them (if they were already
758
    running).
759

760
    @type context: GanetiContext
761
    @param context: the context object for access to the configuration
762
        data and other ganeti objects
763

764
    """
765
    self.context = context
766
    self._memcache = weakref.WeakValueDictionary()
767
    self._my_hostname = utils.HostInfo().name
768

    
769
    self.acquire = _big_jqueue_lock.acquire
770
    self.release = _big_jqueue_lock.release
771

    
772
    # Initialize the queue, and acquire the filelock.
773
    # This ensures no other process is working on the job queue.
774
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
775

    
776
    # Read serial file
777
    self._last_serial = jstore.ReadSerial()
778
    assert self._last_serial is not None, ("Serial file was modified between"
779
                                           " check in jstore and here")
780

    
781
    # Get initial list of nodes
782
    self._nodes = dict((n.name, n.primary_ip)
783
                       for n in self.context.cfg.GetAllNodesInfo().values()
784
                       if n.master_candidate)
785

    
786
    # Remove master node
787
    self._nodes.pop(self._my_hostname, None)
788

    
789
    # TODO: Check consistency across nodes
790

    
791
    self._queue_size = 0
792
    self._UpdateQueueSizeUnlocked()
793
    self._drained = self._IsQueueMarkedDrain()
794

    
795
    # Setup worker pool
796
    self._wpool = _JobQueueWorkerPool(self)
797
    try:
798
      # We need to lock here because WorkerPool.AddTask() may start a job while
799
      # we're still doing our work.
800
      self.acquire()
801
      try:
802
        logging.info("Inspecting job queue")
803

    
804
        all_job_ids = self._GetJobIDsUnlocked()
805
        jobs_count = len(all_job_ids)
806
        lastinfo = time.time()
807
        for idx, job_id in enumerate(all_job_ids):
808
          # Give an update every 1000 jobs or 10 seconds
809
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
810
              idx == (jobs_count - 1)):
811
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
812
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
813
            lastinfo = time.time()
814

    
815
          job = self._LoadJobUnlocked(job_id)
816

    
817
          # a failure in loading the job can cause 'None' to be returned
818
          if job is None:
819
            continue
820

    
821
          status = job.CalcStatus()
822

    
823
          if status in (constants.JOB_STATUS_QUEUED, ):
824
            self._wpool.AddTask(job)
825

    
826
          elif status in (constants.JOB_STATUS_RUNNING,
827
                          constants.JOB_STATUS_WAITLOCK,
828
                          constants.JOB_STATUS_CANCELING):
829
            logging.warning("Unfinished job %s found: %s", job.id, job)
830
            job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
831
                                  "Unclean master daemon shutdown")
832

    
833
        logging.info("Job queue inspection finished")
834
      finally:
835
        self.release()
836
    except:
837
      self._wpool.TerminateWorkers()
838
      raise
839

    
840
  @locking.ssynchronized(_big_jqueue_lock)
841
  @_RequireOpenQueue
842
  def AddNode(self, node):
843
    """Register a new node with the queue.
844

845
    @type node: L{objects.Node}
846
    @param node: the node object to be added
847

848
    """
849
    node_name = node.name
850
    assert node_name != self._my_hostname
851

    
852
    # Clean queue directory on added node
853
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
854
    msg = result.fail_msg
855
    if msg:
856
      logging.warning("Cannot cleanup queue directory on node %s: %s",
857
                      node_name, msg)
858

    
859
    if not node.master_candidate:
860
      # remove if existing, ignoring errors
861
      self._nodes.pop(node_name, None)
862
      # and skip the replication of the job ids
863
      return
864

    
865
    # Upload the whole queue excluding archived jobs
866
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
867

    
868
    # Upload current serial file
869
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
870

    
871
    for file_name in files:
872
      # Read file content
873
      content = utils.ReadFile(file_name)
874

    
875
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
876
                                                  [node.primary_ip],
877
                                                  file_name, content)
878
      msg = result[node_name].fail_msg
879
      if msg:
880
        logging.error("Failed to upload file %s to node %s: %s",
881
                      file_name, node_name, msg)
882

    
883
    self._nodes[node_name] = node.primary_ip
884

    
885
  @locking.ssynchronized(_big_jqueue_lock)
886
  @_RequireOpenQueue
887
  def RemoveNode(self, node_name):
888
    """Callback called when removing nodes from the cluster.
889

890
    @type node_name: str
891
    @param node_name: the name of the node to remove
892

893
    """
894
    self._nodes.pop(node_name, None)
895

    
896
  @staticmethod
897
  def _CheckRpcResult(result, nodes, failmsg):
898
    """Verifies the status of an RPC call.
899

900
    Since we aim to keep consistency should this node (the current
901
    master) fail, we will log errors if our rpc fail, and especially
902
    log the case when more than half of the nodes fails.
903

904
    @param result: the data as returned from the rpc call
905
    @type nodes: list
906
    @param nodes: the list of nodes we made the call to
907
    @type failmsg: str
908
    @param failmsg: the identifier to be used for logging
909

910
    """
911
    failed = []
912
    success = []
913

    
914
    for node in nodes:
915
      msg = result[node].fail_msg
916
      if msg:
917
        failed.append(node)
918
        logging.error("RPC call %s (%s) failed on node %s: %s",
919
                      result[node].call, failmsg, node, msg)
920
      else:
921
        success.append(node)
922

    
923
    # +1 for the master node
924
    if (len(success) + 1) < len(failed):
925
      # TODO: Handle failing nodes
926
      logging.error("More than half of the nodes failed")
927

    
928
  def _GetNodeIp(self):
929
    """Helper for returning the node name/ip list.
930

931
    @rtype: (list, list)
932
    @return: a tuple of two lists, the first one with the node
933
        names and the second one with the node addresses
934

935
    """
936
    name_list = self._nodes.keys()
937
    addr_list = [self._nodes[name] for name in name_list]
938
    return name_list, addr_list
939

    
940
  def _UpdateJobQueueFile(self, file_name, data, replicate):
941
    """Writes a file locally and then replicates it to all nodes.
942

943
    This function will replace the contents of a file on the local
944
    node and then replicate it to all the other nodes we have.
945

946
    @type file_name: str
947
    @param file_name: the path of the file to be replicated
948
    @type data: str
949
    @param data: the new contents of the file
950
    @type replicate: boolean
951
    @param replicate: whether to spread the changes to the remote nodes
952

953
    """
954
    utils.WriteFile(file_name, data=data)
955

    
956
    if replicate:
957
      names, addrs = self._GetNodeIp()
958
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
959
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
960

    
961
  def _RenameFilesUnlocked(self, rename):
962
    """Renames a file locally and then replicate the change.
963

964
    This function will rename a file in the local queue directory
965
    and then replicate this rename to all the other nodes we have.
966

967
    @type rename: list of (old, new)
968
    @param rename: List containing tuples mapping old to new names
969

970
    """
971
    # Rename them locally
972
    for old, new in rename:
973
      utils.RenameFile(old, new, mkdir=True)
974

    
975
    # ... and on all nodes
976
    names, addrs = self._GetNodeIp()
977
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
978
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
979

    
980
  @staticmethod
981
  def _FormatJobID(job_id):
982
    """Convert a job ID to string format.
983

984
    Currently this just does C{str(job_id)} after performing some
985
    checks, but if we want to change the job id format this will
986
    abstract this change.
987

988
    @type job_id: int or long
989
    @param job_id: the numeric job id
990
    @rtype: str
991
    @return: the formatted job id
992

993
    """
994
    if not isinstance(job_id, (int, long)):
995
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
996
    if job_id < 0:
997
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
998

    
999
    return str(job_id)
1000

    
1001
  @classmethod
1002
  def _GetArchiveDirectory(cls, job_id):
1003
    """Returns the archive directory for a job.
1004

1005
    @type job_id: str
1006
    @param job_id: Job identifier
1007
    @rtype: str
1008
    @return: Directory name
1009

1010
    """
1011
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1012

    
1013
  def _NewSerialsUnlocked(self, count):
1014
    """Generates a new job identifier.
1015

1016
    Job identifiers are unique during the lifetime of a cluster.
1017

1018
    @type count: integer
1019
    @param count: how many serials to return
1020
    @rtype: str
1021
    @return: a string representing the job identifier.
1022

1023
    """
1024
    assert count > 0
1025
    # New number
1026
    serial = self._last_serial + count
1027

    
1028
    # Write to file
1029
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1030
                             "%s\n" % serial, True)
1031

    
1032
    result = [self._FormatJobID(v)
1033
              for v in range(self._last_serial, serial + 1)]
1034
    # Keep it only if we were able to write the file
1035
    self._last_serial = serial
1036

    
1037
    return result
1038

    
1039
  @staticmethod
1040
  def _GetJobPath(job_id):
1041
    """Returns the job file for a given job id.
1042

1043
    @type job_id: str
1044
    @param job_id: the job identifier
1045
    @rtype: str
1046
    @return: the path to the job file
1047

1048
    """
1049
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1050

    
1051
  @classmethod
1052
  def _GetArchivedJobPath(cls, job_id):
1053
    """Returns the archived job file for a give job id.
1054

1055
    @type job_id: str
1056
    @param job_id: the job identifier
1057
    @rtype: str
1058
    @return: the path to the archived job file
1059

1060
    """
1061
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1062
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1063

    
1064
  def _GetJobIDsUnlocked(self, sort=True):
1065
    """Return all known job IDs.
1066

1067
    The method only looks at disk because it's a requirement that all
1068
    jobs are present on disk (so in the _memcache we don't have any
1069
    extra IDs).
1070

1071
    @type sort: boolean
1072
    @param sort: perform sorting on the returned job ids
1073
    @rtype: list
1074
    @return: the list of job IDs
1075

1076
    """
1077
    jlist = []
1078
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1079
      m = self._RE_JOB_FILE.match(filename)
1080
      if m:
1081
        jlist.append(m.group(1))
1082
    if sort:
1083
      jlist = utils.NiceSort(jlist)
1084
    return jlist
1085

    
1086
  def _LoadJobUnlocked(self, job_id):
1087
    """Loads a job from the disk or memory.
1088

1089
    Given a job id, this will return the cached job object if
1090
    existing, or try to load the job from the disk. If loading from
1091
    disk, it will also add the job to the cache.
1092

1093
    @param job_id: the job id
1094
    @rtype: L{_QueuedJob} or None
1095
    @return: either None or the job object
1096

1097
    """
1098
    job = self._memcache.get(job_id, None)
1099
    if job:
1100
      logging.debug("Found job %s in memcache", job_id)
1101
      return job
1102

    
1103
    try:
1104
      job = self._LoadJobFromDisk(job_id)
1105
    except errors.JobFileCorrupted:
1106
      old_path = self._GetJobPath(job_id)
1107
      new_path = self._GetArchivedJobPath(job_id)
1108
      if old_path == new_path:
1109
        # job already archived (future case)
1110
        logging.exception("Can't parse job %s", job_id)
1111
      else:
1112
        # non-archived case
1113
        logging.exception("Can't parse job %s, will archive.", job_id)
1114
        self._RenameFilesUnlocked([(old_path, new_path)])
1115
      return None
1116

    
1117
    self._memcache[job_id] = job
1118
    logging.debug("Added job %s to the cache", job_id)
1119
    return job
1120

    
1121
  def _LoadJobFromDisk(self, job_id):
1122
    """Load the given job file from disk.
1123

1124
    Given a job file, read, load and restore it in a _QueuedJob format.
1125

1126
    @type job_id: string
1127
    @param job_id: job identifier
1128
    @rtype: L{_QueuedJob} or None
1129
    @return: either None or the job object
1130

1131
    """
1132
    filepath = self._GetJobPath(job_id)
1133
    logging.debug("Loading job from %s", filepath)
1134
    try:
1135
      raw_data = utils.ReadFile(filepath)
1136
    except EnvironmentError, err:
1137
      if err.errno in (errno.ENOENT, ):
1138
        return None
1139
      raise
1140

    
1141
    try:
1142
      data = serializer.LoadJson(raw_data)
1143
      job = _QueuedJob.Restore(self, data)
1144
    except Exception, err: # pylint: disable-msg=W0703
1145
      raise errors.JobFileCorrupted(err)
1146

    
1147
    return job
1148

    
1149
  def SafeLoadJobFromDisk(self, job_id):
1150
    """Load the given job file from disk.
1151

1152
    Given a job file, read, load and restore it in a _QueuedJob format.
1153
    In case of error reading the job, it gets returned as None, and the
1154
    exception is logged.
1155

1156
    @type job_id: string
1157
    @param job_id: job identifier
1158
    @rtype: L{_QueuedJob} or None
1159
    @return: either None or the job object
1160

1161
    """
1162
    try:
1163
      return self._LoadJobFromDisk(job_id)
1164
    except (errors.JobFileCorrupted, EnvironmentError):
1165
      logging.exception("Can't load/parse job %s", job_id)
1166
      return None
1167

    
1168
  @staticmethod
1169
  def _IsQueueMarkedDrain():
1170
    """Check if the queue is marked from drain.
1171

1172
    This currently uses the queue drain file, which makes it a
1173
    per-node flag. In the future this can be moved to the config file.
1174

1175
    @rtype: boolean
1176
    @return: True of the job queue is marked for draining
1177

1178
    """
1179
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1180

    
1181
  def _UpdateQueueSizeUnlocked(self):
1182
    """Update the queue size.
1183

1184
    """
1185
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1186

    
1187
  @locking.ssynchronized(_big_jqueue_lock)
1188
  @_RequireOpenQueue
1189
  def SetDrainFlag(self, drain_flag):
1190
    """Sets the drain flag for the queue.
1191

1192
    @type drain_flag: boolean
1193
    @param drain_flag: Whether to set or unset the drain flag
1194

1195
    """
1196
    if drain_flag:
1197
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1198
    else:
1199
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1200

    
1201
    self._drained = drain_flag
1202

    
1203
    return True
1204

    
1205
  @_RequireOpenQueue
1206
  def _SubmitJobUnlocked(self, job_id, ops):
1207
    """Create and store a new job.
1208

1209
    This enters the job into our job queue and also puts it on the new
1210
    queue, in order for it to be picked up by the queue processors.
1211

1212
    @type job_id: job ID
1213
    @param job_id: the job ID for the new job
1214
    @type ops: list
1215
    @param ops: The list of OpCodes that will become the new job.
1216
    @rtype: L{_QueuedJob}
1217
    @return: the job object to be queued
1218
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1219
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1220

1221
    """
1222
    # Ok when sharing the big job queue lock, as the drain file is created when
1223
    # the lock is exclusive.
1224
    if self._drained:
1225
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1226

    
1227
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1228
      raise errors.JobQueueFull()
1229

    
1230
    job = _QueuedJob(self, job_id, ops)
1231

    
1232
    # Write to disk
1233
    self.UpdateJobUnlocked(job)
1234

    
1235
    self._queue_size += 1
1236

    
1237
    logging.debug("Adding new job %s to the cache", job_id)
1238
    self._memcache[job_id] = job
1239

    
1240
    return job
1241

    
1242
  @locking.ssynchronized(_big_jqueue_lock)
1243
  @_RequireOpenQueue
1244
  def SubmitJob(self, ops):
1245
    """Create and store a new job.
1246

1247
    @see: L{_SubmitJobUnlocked}
1248

1249
    """
1250
    job_id = self._NewSerialsUnlocked(1)[0]
1251
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1252
    return job_id
1253

    
1254
  @locking.ssynchronized(_big_jqueue_lock)
1255
  @_RequireOpenQueue
1256
  def SubmitManyJobs(self, jobs):
1257
    """Create and store multiple jobs.
1258

1259
    @see: L{_SubmitJobUnlocked}
1260

1261
    """
1262
    results = []
1263
    tasks = []
1264
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1265
    for job_id, ops in zip(all_job_ids, jobs):
1266
      try:
1267
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1268
        status = True
1269
        data = job_id
1270
      except errors.GenericError, err:
1271
        data = str(err)
1272
        status = False
1273
      results.append((status, data))
1274
    self._wpool.AddManyTasks(tasks)
1275

    
1276
    return results
1277

    
1278
  @_RequireOpenQueue
1279
  def UpdateJobUnlocked(self, job, replicate=True):
1280
    """Update a job's on disk storage.
1281

1282
    After a job has been modified, this function needs to be called in
1283
    order to write the changes to disk and replicate them to the other
1284
    nodes.
1285

1286
    @type job: L{_QueuedJob}
1287
    @param job: the changed job
1288
    @type replicate: boolean
1289
    @param replicate: whether to replicate the change to remote nodes
1290

1291
    """
1292
    filename = self._GetJobPath(job.id)
1293
    data = serializer.DumpJson(job.Serialize(), indent=False)
1294
    logging.debug("Writing job %s to %s", job.id, filename)
1295
    self._UpdateJobQueueFile(filename, data, replicate)
1296

    
1297
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1298
                        timeout):
1299
    """Waits for changes in a job.
1300

1301
    @type job_id: string
1302
    @param job_id: Job identifier
1303
    @type fields: list of strings
1304
    @param fields: Which fields to check for changes
1305
    @type prev_job_info: list or None
1306
    @param prev_job_info: Last job information returned
1307
    @type prev_log_serial: int
1308
    @param prev_log_serial: Last job message serial number
1309
    @type timeout: float
1310
    @param timeout: maximum time to wait
1311
    @rtype: tuple (job info, log entries)
1312
    @return: a tuple of the job information as required via
1313
        the fields parameter, and the log entries as a list
1314

1315
        if the job has not changed and the timeout has expired,
1316
        we instead return a special value,
1317
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1318
        as such by the clients
1319

1320
    """
1321
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1322
                                      prev_log_serial, self)
1323
    try:
1324
      return helper.WaitForChanges(timeout)
1325
    finally:
1326
      helper.Close()
1327

    
1328
  @locking.ssynchronized(_big_jqueue_lock)
1329
  @_RequireOpenQueue
1330
  def CancelJob(self, job_id):
1331
    """Cancels a job.
1332

1333
    This will only succeed if the job has not started yet.
1334

1335
    @type job_id: string
1336
    @param job_id: job ID of job to be cancelled.
1337

1338
    """
1339
    logging.info("Cancelling job %s", job_id)
1340

    
1341
    job = self._LoadJobUnlocked(job_id)
1342
    if not job:
1343
      logging.debug("Job %s not found", job_id)
1344
      return (False, "Job %s not found" % job_id)
1345

    
1346
    job_status = job.CalcStatus()
1347

    
1348
    if job_status not in (constants.JOB_STATUS_QUEUED,
1349
                          constants.JOB_STATUS_WAITLOCK):
1350
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1351
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1352

    
1353
    if job_status == constants.JOB_STATUS_QUEUED:
1354
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1355
                            "Job canceled by request")
1356
      return (True, "Job %s canceled" % job.id)
1357

    
1358
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1359
      # The worker will notice the new status and cancel the job
1360
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1361
      return (True, "Job %s will be canceled" % job.id)
1362

    
1363
  @_RequireOpenQueue
1364
  def _ArchiveJobsUnlocked(self, jobs):
1365
    """Archives jobs.
1366

1367
    @type jobs: list of L{_QueuedJob}
1368
    @param jobs: Job objects
1369
    @rtype: int
1370
    @return: Number of archived jobs
1371

1372
    """
1373
    archive_jobs = []
1374
    rename_files = []
1375
    for job in jobs:
1376
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1377
                                  constants.JOB_STATUS_SUCCESS,
1378
                                  constants.JOB_STATUS_ERROR):
1379
        logging.debug("Job %s is not yet done", job.id)
1380
        continue
1381

    
1382
      archive_jobs.append(job)
1383

    
1384
      old = self._GetJobPath(job.id)
1385
      new = self._GetArchivedJobPath(job.id)
1386
      rename_files.append((old, new))
1387

    
1388
    # TODO: What if 1..n files fail to rename?
1389
    self._RenameFilesUnlocked(rename_files)
1390

    
1391
    logging.debug("Successfully archived job(s) %s",
1392
                  utils.CommaJoin(job.id for job in archive_jobs))
1393

    
1394
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1395
    # the files, we update the cached queue size from the filesystem. When we
1396
    # get around to fix the TODO: above, we can use the number of actually
1397
    # archived jobs to fix this.
1398
    self._UpdateQueueSizeUnlocked()
1399
    return len(archive_jobs)
1400

    
1401
  @locking.ssynchronized(_big_jqueue_lock)
1402
  @_RequireOpenQueue
1403
  def ArchiveJob(self, job_id):
1404
    """Archives a job.
1405

1406
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1407

1408
    @type job_id: string
1409
    @param job_id: Job ID of job to be archived.
1410
    @rtype: bool
1411
    @return: Whether job was archived
1412

1413
    """
1414
    logging.info("Archiving job %s", job_id)
1415

    
1416
    job = self._LoadJobUnlocked(job_id)
1417
    if not job:
1418
      logging.debug("Job %s not found", job_id)
1419
      return False
1420

    
1421
    return self._ArchiveJobsUnlocked([job]) == 1
1422

    
1423
  @locking.ssynchronized(_big_jqueue_lock)
1424
  @_RequireOpenQueue
1425
  def AutoArchiveJobs(self, age, timeout):
1426
    """Archives all jobs based on age.
1427

1428
    The method will archive all jobs which are older than the age
1429
    parameter. For jobs that don't have an end timestamp, the start
1430
    timestamp will be considered. The special '-1' age will cause
1431
    archival of all jobs (that are not running or queued).
1432

1433
    @type age: int
1434
    @param age: the minimum age in seconds
1435

1436
    """
1437
    logging.info("Archiving jobs with age more than %s seconds", age)
1438

    
1439
    now = time.time()
1440
    end_time = now + timeout
1441
    archived_count = 0
1442
    last_touched = 0
1443

    
1444
    all_job_ids = self._GetJobIDsUnlocked()
1445
    pending = []
1446
    for idx, job_id in enumerate(all_job_ids):
1447
      last_touched = idx + 1
1448

    
1449
      # Not optimal because jobs could be pending
1450
      # TODO: Measure average duration for job archival and take number of
1451
      # pending jobs into account.
1452
      if time.time() > end_time:
1453
        break
1454

    
1455
      # Returns None if the job failed to load
1456
      job = self._LoadJobUnlocked(job_id)
1457
      if job:
1458
        if job.end_timestamp is None:
1459
          if job.start_timestamp is None:
1460
            job_age = job.received_timestamp
1461
          else:
1462
            job_age = job.start_timestamp
1463
        else:
1464
          job_age = job.end_timestamp
1465

    
1466
        if age == -1 or now - job_age[0] > age:
1467
          pending.append(job)
1468

    
1469
          # Archive 10 jobs at a time
1470
          if len(pending) >= 10:
1471
            archived_count += self._ArchiveJobsUnlocked(pending)
1472
            pending = []
1473

    
1474
    if pending:
1475
      archived_count += self._ArchiveJobsUnlocked(pending)
1476

    
1477
    return (archived_count, len(all_job_ids) - last_touched)
1478

    
1479
  def QueryJobs(self, job_ids, fields):
1480
    """Returns a list of jobs in queue.
1481

1482
    @type job_ids: list
1483
    @param job_ids: sequence of job identifiers or None for all
1484
    @type fields: list
1485
    @param fields: names of fields to return
1486
    @rtype: list
1487
    @return: list one element per job, each element being list with
1488
        the requested fields
1489

1490
    """
1491
    jobs = []
1492
    list_all = False
1493
    if not job_ids:
1494
      # Since files are added to/removed from the queue atomically, there's no
1495
      # risk of getting the job ids in an inconsistent state.
1496
      job_ids = self._GetJobIDsUnlocked()
1497
      list_all = True
1498

    
1499
    for job_id in job_ids:
1500
      job = self.SafeLoadJobFromDisk(job_id)
1501
      if job is not None:
1502
        jobs.append(job.GetInfo(fields))
1503
      elif not list_all:
1504
        jobs.append(None)
1505

    
1506
    return jobs
1507

    
1508
  @locking.ssynchronized(_big_jqueue_lock)
1509
  @_RequireOpenQueue
1510
  def Shutdown(self):
1511
    """Stops the job queue.
1512

1513
    This shutdowns all the worker threads an closes the queue.
1514

1515
    """
1516
    self._wpool.TerminateWorkers()
1517

    
1518
    self._queue_filelock.Close()
1519
    self._queue_filelock = None