Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b3855790

History | View | Annotate | Download (42.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81
  @ivar stop_timestamp: timestamp for the end of the execution
82

83
  """
84
  __slots__ = ["input", "status", "result", "log",
85
               "start_timestamp", "exec_timestamp", "end_timestamp",
86
               "__weakref__"]
87

    
88
  def __init__(self, op):
89
    """Constructor for the _QuededOpCode.
90

91
    @type op: L{opcodes.OpCode}
92
    @param op: the opcode we encapsulate
93

94
    """
95
    self.input = op
96
    self.status = constants.OP_STATUS_QUEUED
97
    self.result = None
98
    self.log = []
99
    self.start_timestamp = None
100
    self.exec_timestamp = None
101
    self.end_timestamp = None
102

    
103
  @classmethod
104
  def Restore(cls, state):
105
    """Restore the _QueuedOpCode from the serialized form.
106

107
    @type state: dict
108
    @param state: the serialized state
109
    @rtype: _QueuedOpCode
110
    @return: a new _QueuedOpCode instance
111

112
    """
113
    obj = _QueuedOpCode.__new__(cls)
114
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115
    obj.status = state["status"]
116
    obj.result = state["result"]
117
    obj.log = state["log"]
118
    obj.start_timestamp = state.get("start_timestamp", None)
119
    obj.exec_timestamp = state.get("exec_timestamp", None)
120
    obj.end_timestamp = state.get("end_timestamp", None)
121
    return obj
122

    
123
  def Serialize(self):
124
    """Serializes this _QueuedOpCode.
125

126
    @rtype: dict
127
    @return: the dictionary holding the serialized state
128

129
    """
130
    return {
131
      "input": self.input.__getstate__(),
132
      "status": self.status,
133
      "result": self.result,
134
      "log": self.log,
135
      "start_timestamp": self.start_timestamp,
136
      "exec_timestamp": self.exec_timestamp,
137
      "end_timestamp": self.end_timestamp,
138
      }
139

    
140

    
141
class _QueuedJob(object):
142
  """In-memory job representation.
143

144
  This is what we use to track the user-submitted jobs. Locking must
145
  be taken care of by users of this class.
146

147
  @type queue: L{JobQueue}
148
  @ivar queue: the parent queue
149
  @ivar id: the job ID
150
  @type ops: list
151
  @ivar ops: the list of _QueuedOpCode that constitute the job
152
  @type log_serial: int
153
  @ivar log_serial: holds the index for the next log entry
154
  @ivar received_timestamp: the timestamp for when the job was received
155
  @ivar start_timestmap: the timestamp for start of execution
156
  @ivar end_timestamp: the timestamp for end of execution
157
  @ivar lock_status: In-memory locking information for debugging
158
  @ivar change: a Condition variable we use for waiting for job changes
159

160
  """
161
  # pylint: disable-msg=W0212
162
  __slots__ = ["queue", "id", "ops", "log_serial",
163
               "received_timestamp", "start_timestamp", "end_timestamp",
164
               "lock_status", "change",
165
               "__weakref__"]
166

    
167
  def __init__(self, queue, job_id, ops):
168
    """Constructor for the _QueuedJob.
169

170
    @type queue: L{JobQueue}
171
    @param queue: our parent queue
172
    @type job_id: job_id
173
    @param job_id: our job id
174
    @type ops: list
175
    @param ops: the list of opcodes we hold, which will be encapsulated
176
        in _QueuedOpCodes
177

178
    """
179
    if not ops:
180
      raise errors.GenericError("A job needs at least one opcode")
181

    
182
    self.queue = queue
183
    self.id = job_id
184
    self.ops = [_QueuedOpCode(op) for op in ops]
185
    self.log_serial = 0
186
    self.received_timestamp = TimeStampNow()
187
    self.start_timestamp = None
188
    self.end_timestamp = None
189

    
190
    # In-memory attributes
191
    self.lock_status = None
192

    
193
    # Condition to wait for changes
194
    self.change = threading.Condition(self.queue._lock)
195

    
196
  def __repr__(self):
197
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
198
              "id=%s" % self.id,
199
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
200

    
201
    return "<%s at %#x>" % (" ".join(status), id(self))
202

    
203
  @classmethod
204
  def Restore(cls, queue, state):
205
    """Restore a _QueuedJob from serialized state:
206

207
    @type queue: L{JobQueue}
208
    @param queue: to which queue the restored job belongs
209
    @type state: dict
210
    @param state: the serialized state
211
    @rtype: _JobQueue
212
    @return: the restored _JobQueue instance
213

214
    """
215
    obj = _QueuedJob.__new__(cls)
216
    obj.queue = queue
217
    obj.id = state["id"]
218
    obj.received_timestamp = state.get("received_timestamp", None)
219
    obj.start_timestamp = state.get("start_timestamp", None)
220
    obj.end_timestamp = state.get("end_timestamp", None)
221

    
222
    # In-memory attributes
223
    obj.lock_status = None
224

    
225
    obj.ops = []
226
    obj.log_serial = 0
227
    for op_state in state["ops"]:
228
      op = _QueuedOpCode.Restore(op_state)
229
      for log_entry in op.log:
230
        obj.log_serial = max(obj.log_serial, log_entry[0])
231
      obj.ops.append(op)
232

    
233
    # Condition to wait for changes
234
    obj.change = threading.Condition(obj.queue._lock)
235

    
236
    return obj
237

    
238
  def Serialize(self):
239
    """Serialize the _JobQueue instance.
240

241
    @rtype: dict
242
    @return: the serialized state
243

244
    """
245
    return {
246
      "id": self.id,
247
      "ops": [op.Serialize() for op in self.ops],
248
      "start_timestamp": self.start_timestamp,
249
      "end_timestamp": self.end_timestamp,
250
      "received_timestamp": self.received_timestamp,
251
      }
252

    
253
  def CalcStatus(self):
254
    """Compute the status of this job.
255

256
    This function iterates over all the _QueuedOpCodes in the job and
257
    based on their status, computes the job status.
258

259
    The algorithm is:
260
      - if we find a cancelled, or finished with error, the job
261
        status will be the same
262
      - otherwise, the last opcode with the status one of:
263
          - waitlock
264
          - canceling
265
          - running
266

267
        will determine the job status
268

269
      - otherwise, it means either all opcodes are queued, or success,
270
        and the job status will be the same
271

272
    @return: the job status
273

274
    """
275
    status = constants.JOB_STATUS_QUEUED
276

    
277
    all_success = True
278
    for op in self.ops:
279
      if op.status == constants.OP_STATUS_SUCCESS:
280
        continue
281

    
282
      all_success = False
283

    
284
      if op.status == constants.OP_STATUS_QUEUED:
285
        pass
286
      elif op.status == constants.OP_STATUS_WAITLOCK:
287
        status = constants.JOB_STATUS_WAITLOCK
288
      elif op.status == constants.OP_STATUS_RUNNING:
289
        status = constants.JOB_STATUS_RUNNING
290
      elif op.status == constants.OP_STATUS_CANCELING:
291
        status = constants.JOB_STATUS_CANCELING
292
        break
293
      elif op.status == constants.OP_STATUS_ERROR:
294
        status = constants.JOB_STATUS_ERROR
295
        # The whole job fails if one opcode failed
296
        break
297
      elif op.status == constants.OP_STATUS_CANCELED:
298
        status = constants.OP_STATUS_CANCELED
299
        break
300

    
301
    if all_success:
302
      status = constants.JOB_STATUS_SUCCESS
303

    
304
    return status
305

    
306
  def GetLogEntries(self, newer_than):
307
    """Selectively returns the log entries.
308

309
    @type newer_than: None or int
310
    @param newer_than: if this is None, return all log entries,
311
        otherwise return only the log entries with serial higher
312
        than this value
313
    @rtype: list
314
    @return: the list of the log entries selected
315

316
    """
317
    if newer_than is None:
318
      serial = -1
319
    else:
320
      serial = newer_than
321

    
322
    entries = []
323
    for op in self.ops:
324
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325

    
326
    return entries
327

    
328
  def GetInfo(self, fields):
329
    """Returns information about a job.
330

331
    @type fields: list
332
    @param fields: names of fields to return
333
    @rtype: list
334
    @return: list with one element for each field
335
    @raise errors.OpExecError: when an invalid field
336
        has been passed
337

338
    """
339
    row = []
340
    for fname in fields:
341
      if fname == "id":
342
        row.append(self.id)
343
      elif fname == "status":
344
        row.append(self.CalcStatus())
345
      elif fname == "ops":
346
        row.append([op.input.__getstate__() for op in self.ops])
347
      elif fname == "opresult":
348
        row.append([op.result for op in self.ops])
349
      elif fname == "opstatus":
350
        row.append([op.status for op in self.ops])
351
      elif fname == "oplog":
352
        row.append([op.log for op in self.ops])
353
      elif fname == "opstart":
354
        row.append([op.start_timestamp for op in self.ops])
355
      elif fname == "opexec":
356
        row.append([op.exec_timestamp for op in self.ops])
357
      elif fname == "opend":
358
        row.append([op.end_timestamp for op in self.ops])
359
      elif fname == "received_ts":
360
        row.append(self.received_timestamp)
361
      elif fname == "start_ts":
362
        row.append(self.start_timestamp)
363
      elif fname == "end_ts":
364
        row.append(self.end_timestamp)
365
      elif fname == "lock_status":
366
        row.append(self.lock_status)
367
      elif fname == "summary":
368
        row.append([op.input.Summary() for op in self.ops])
369
      else:
370
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
371
    return row
372

    
373
  def MarkUnfinishedOps(self, status, result):
374
    """Mark unfinished opcodes with a given status and result.
375

376
    This is an utility function for marking all running or waiting to
377
    be run opcodes with a given status. Opcodes which are already
378
    finalised are not changed.
379

380
    @param status: a given opcode status
381
    @param result: the opcode result
382

383
    """
384
    not_marked = True
385
    for op in self.ops:
386
      if op.status in constants.OPS_FINALIZED:
387
        assert not_marked, "Finalized opcodes found after non-finalized ones"
388
        continue
389
      op.status = status
390
      op.result = result
391
      not_marked = False
392

    
393

    
394
class _OpExecCallbacks(mcpu.OpExecCbBase):
395
  def __init__(self, queue, job, op):
396
    """Initializes this class.
397

398
    @type queue: L{JobQueue}
399
    @param queue: Job queue
400
    @type job: L{_QueuedJob}
401
    @param job: Job object
402
    @type op: L{_QueuedOpCode}
403
    @param op: OpCode
404

405
    """
406
    assert queue, "Queue is missing"
407
    assert job, "Job is missing"
408
    assert op, "Opcode is missing"
409

    
410
    self._queue = queue
411
    self._job = job
412
    self._op = op
413

    
414
  def NotifyStart(self):
415
    """Mark the opcode as running, not lock-waiting.
416

417
    This is called from the mcpu code as a notifier function, when the LU is
418
    finally about to start the Exec() method. Of course, to have end-user
419
    visible results, the opcode must be initially (before calling into
420
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
421

422
    """
423
    self._queue.acquire()
424
    try:
425
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426
                                 constants.OP_STATUS_CANCELING)
427

    
428
      # All locks are acquired by now
429
      self._job.lock_status = None
430

    
431
      # Cancel here if we were asked to
432
      if self._op.status == constants.OP_STATUS_CANCELING:
433
        raise CancelJob()
434

    
435
      self._op.status = constants.OP_STATUS_RUNNING
436
      self._op.exec_timestamp = TimeStampNow()
437
    finally:
438
      self._queue.release()
439

    
440
  def Feedback(self, *args):
441
    """Append a log entry.
442

443
    """
444
    assert len(args) < 3
445

    
446
    if len(args) == 1:
447
      log_type = constants.ELOG_MESSAGE
448
      log_msg = args[0]
449
    else:
450
      (log_type, log_msg) = args
451

    
452
    # The time is split to make serialization easier and not lose
453
    # precision.
454
    timestamp = utils.SplitTime(time.time())
455

    
456
    self._queue.acquire()
457
    try:
458
      self._job.log_serial += 1
459
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
461

    
462
      self._job.change.notifyAll()
463
    finally:
464
      self._queue.release()
465

    
466
  def ReportLocks(self, msg):
467
    """Write locking information to the job.
468

469
    Called whenever the LU processor is waiting for a lock or has acquired one.
470

471
    """
472
    # Not getting the queue lock because this is a single assignment
473
    self._job.lock_status = msg
474

    
475

    
476
class _JobQueueWorker(workerpool.BaseWorker):
477
  """The actual job workers.
478

479
  """
480
  def RunTask(self, job): # pylint: disable-msg=W0221
481
    """Job executor.
482

483
    This functions processes a job. It is closely tied to the _QueuedJob and
484
    _QueuedOpCode classes.
485

486
    @type job: L{_QueuedJob}
487
    @param job: the job to be processed
488

489
    """
490
    logging.info("Processing job %s", job.id)
491
    proc = mcpu.Processor(self.pool.queue.context, job.id)
492
    queue = job.queue
493
    try:
494
      try:
495
        count = len(job.ops)
496
        for idx, op in enumerate(job.ops):
497
          op_summary = op.input.Summary()
498
          if op.status == constants.OP_STATUS_SUCCESS:
499
            # this is a job that was partially completed before master
500
            # daemon shutdown, so it can be expected that some opcodes
501
            # are already completed successfully (if any did error
502
            # out, then the whole job should have been aborted and not
503
            # resubmitted for processing)
504
            logging.info("Op %s/%s: opcode %s already processed, skipping",
505
                         idx + 1, count, op_summary)
506
            continue
507
          try:
508
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
509
                         op_summary)
510

    
511
            queue.acquire()
512
            try:
513
              if op.status == constants.OP_STATUS_CANCELED:
514
                raise CancelJob()
515
              assert op.status == constants.OP_STATUS_QUEUED
516
              op.status = constants.OP_STATUS_WAITLOCK
517
              op.result = None
518
              op.start_timestamp = TimeStampNow()
519
              if idx == 0: # first opcode
520
                job.start_timestamp = op.start_timestamp
521
              queue.UpdateJobUnlocked(job)
522

    
523
              input_opcode = op.input
524
            finally:
525
              queue.release()
526

    
527
            # Make sure not to hold queue lock while calling ExecOpCode
528
            result = proc.ExecOpCode(input_opcode,
529
                                     _OpExecCallbacks(queue, job, op))
530

    
531
            queue.acquire()
532
            try:
533
              op.status = constants.OP_STATUS_SUCCESS
534
              op.result = result
535
              op.end_timestamp = TimeStampNow()
536
              queue.UpdateJobUnlocked(job)
537
            finally:
538
              queue.release()
539

    
540
            logging.info("Op %s/%s: Successfully finished opcode %s",
541
                         idx + 1, count, op_summary)
542
          except CancelJob:
543
            # Will be handled further up
544
            raise
545
          except Exception, err:
546
            queue.acquire()
547
            try:
548
              try:
549
                op.status = constants.OP_STATUS_ERROR
550
                if isinstance(err, errors.GenericError):
551
                  op.result = errors.EncodeException(err)
552
                else:
553
                  op.result = str(err)
554
                op.end_timestamp = TimeStampNow()
555
                logging.info("Op %s/%s: Error in opcode %s: %s",
556
                             idx + 1, count, op_summary, err)
557
              finally:
558
                queue.UpdateJobUnlocked(job)
559
            finally:
560
              queue.release()
561
            raise
562

    
563
      except CancelJob:
564
        queue.acquire()
565
        try:
566
          queue.CancelJobUnlocked(job)
567
        finally:
568
          queue.release()
569
      except errors.GenericError, err:
570
        logging.exception("Ganeti exception")
571
      except:
572
        logging.exception("Unhandled exception")
573
    finally:
574
      queue.acquire()
575
      try:
576
        try:
577
          job.lock_status = None
578
          job.end_timestamp = TimeStampNow()
579
          queue.UpdateJobUnlocked(job)
580
        finally:
581
          job_id = job.id
582
          status = job.CalcStatus()
583
      finally:
584
        queue.release()
585

    
586
      logging.info("Finished job %s, status = %s", job_id, status)
587

    
588

    
589
class _JobQueueWorkerPool(workerpool.WorkerPool):
590
  """Simple class implementing a job-processing workerpool.
591

592
  """
593
  def __init__(self, queue):
594
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
595
                                              JOBQUEUE_THREADS,
596
                                              _JobQueueWorker)
597
    self.queue = queue
598

    
599

    
600
def _RequireOpenQueue(fn):
601
  """Decorator for "public" functions.
602

603
  This function should be used for all 'public' functions. That is,
604
  functions usually called from other classes. Note that this should
605
  be applied only to methods (not plain functions), since it expects
606
  that the decorated function is called with a first argument that has
607
  a '_queue_filelock' argument.
608

609
  @warning: Use this decorator only after utils.LockedMethod!
610

611
  Example::
612
    @utils.LockedMethod
613
    @_RequireOpenQueue
614
    def Example(self):
615
      pass
616

617
  """
618
  def wrapper(self, *args, **kwargs):
619
    # pylint: disable-msg=W0212
620
    assert self._queue_filelock is not None, "Queue should be open"
621
    return fn(self, *args, **kwargs)
622
  return wrapper
623

    
624

    
625
class JobQueue(object):
626
  """Queue used to manage the jobs.
627

628
  @cvar _RE_JOB_FILE: regex matching the valid job file names
629

630
  """
631
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
632

    
633
  def __init__(self, context):
634
    """Constructor for JobQueue.
635

636
    The constructor will initialize the job queue object and then
637
    start loading the current jobs from disk, either for starting them
638
    (if they were queue) or for aborting them (if they were already
639
    running).
640

641
    @type context: GanetiContext
642
    @param context: the context object for access to the configuration
643
        data and other ganeti objects
644

645
    """
646
    self.context = context
647
    self._memcache = weakref.WeakValueDictionary()
648
    self._my_hostname = utils.HostInfo().name
649

    
650
    # Locking
651
    self._lock = threading.Lock()
652
    self.acquire = self._lock.acquire
653
    self.release = self._lock.release
654

    
655
    # Initialize the queue, and acquire the filelock.
656
    # This ensures no other process is working on the job queue.
657
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
658

    
659
    # Read serial file
660
    self._last_serial = jstore.ReadSerial()
661
    assert self._last_serial is not None, ("Serial file was modified between"
662
                                           " check in jstore and here")
663

    
664
    # Get initial list of nodes
665
    self._nodes = dict((n.name, n.primary_ip)
666
                       for n in self.context.cfg.GetAllNodesInfo().values()
667
                       if n.master_candidate)
668

    
669
    # Remove master node
670
    self._nodes.pop(self._my_hostname, None)
671

    
672
    # TODO: Check consistency across nodes
673

    
674
    self._queue_size = 0
675
    self._UpdateQueueSizeUnlocked()
676
    self._drained = self._IsQueueMarkedDrain()
677

    
678
    # Setup worker pool
679
    self._wpool = _JobQueueWorkerPool(self)
680
    try:
681
      # We need to lock here because WorkerPool.AddTask() may start a job while
682
      # we're still doing our work.
683
      self.acquire()
684
      try:
685
        logging.info("Inspecting job queue")
686

    
687
        all_job_ids = self._GetJobIDsUnlocked()
688
        jobs_count = len(all_job_ids)
689
        lastinfo = time.time()
690
        for idx, job_id in enumerate(all_job_ids):
691
          # Give an update every 1000 jobs or 10 seconds
692
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
693
              idx == (jobs_count - 1)):
694
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
695
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
696
            lastinfo = time.time()
697

    
698
          job = self._LoadJobUnlocked(job_id)
699

    
700
          # a failure in loading the job can cause 'None' to be returned
701
          if job is None:
702
            continue
703

    
704
          status = job.CalcStatus()
705

    
706
          if status in (constants.JOB_STATUS_QUEUED, ):
707
            self._wpool.AddTask(job)
708

    
709
          elif status in (constants.JOB_STATUS_RUNNING,
710
                          constants.JOB_STATUS_WAITLOCK,
711
                          constants.JOB_STATUS_CANCELING):
712
            logging.warning("Unfinished job %s found: %s", job.id, job)
713
            try:
714
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
715
                                    "Unclean master daemon shutdown")
716
            finally:
717
              self.UpdateJobUnlocked(job)
718

    
719
        logging.info("Job queue inspection finished")
720
      finally:
721
        self.release()
722
    except:
723
      self._wpool.TerminateWorkers()
724
      raise
725

    
726
  @utils.LockedMethod
727
  @_RequireOpenQueue
728
  def AddNode(self, node):
729
    """Register a new node with the queue.
730

731
    @type node: L{objects.Node}
732
    @param node: the node object to be added
733

734
    """
735
    node_name = node.name
736
    assert node_name != self._my_hostname
737

    
738
    # Clean queue directory on added node
739
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
740
    msg = result.fail_msg
741
    if msg:
742
      logging.warning("Cannot cleanup queue directory on node %s: %s",
743
                      node_name, msg)
744

    
745
    if not node.master_candidate:
746
      # remove if existing, ignoring errors
747
      self._nodes.pop(node_name, None)
748
      # and skip the replication of the job ids
749
      return
750

    
751
    # Upload the whole queue excluding archived jobs
752
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
753

    
754
    # Upload current serial file
755
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
756

    
757
    for file_name in files:
758
      # Read file content
759
      content = utils.ReadFile(file_name)
760

    
761
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
762
                                                  [node.primary_ip],
763
                                                  file_name, content)
764
      msg = result[node_name].fail_msg
765
      if msg:
766
        logging.error("Failed to upload file %s to node %s: %s",
767
                      file_name, node_name, msg)
768

    
769
    self._nodes[node_name] = node.primary_ip
770

    
771
  @utils.LockedMethod
772
  @_RequireOpenQueue
773
  def RemoveNode(self, node_name):
774
    """Callback called when removing nodes from the cluster.
775

776
    @type node_name: str
777
    @param node_name: the name of the node to remove
778

779
    """
780
    self._nodes.pop(node_name, None)
781

    
782
  @staticmethod
783
  def _CheckRpcResult(result, nodes, failmsg):
784
    """Verifies the status of an RPC call.
785

786
    Since we aim to keep consistency should this node (the current
787
    master) fail, we will log errors if our rpc fail, and especially
788
    log the case when more than half of the nodes fails.
789

790
    @param result: the data as returned from the rpc call
791
    @type nodes: list
792
    @param nodes: the list of nodes we made the call to
793
    @type failmsg: str
794
    @param failmsg: the identifier to be used for logging
795

796
    """
797
    failed = []
798
    success = []
799

    
800
    for node in nodes:
801
      msg = result[node].fail_msg
802
      if msg:
803
        failed.append(node)
804
        logging.error("RPC call %s (%s) failed on node %s: %s",
805
                      result[node].call, failmsg, node, msg)
806
      else:
807
        success.append(node)
808

    
809
    # +1 for the master node
810
    if (len(success) + 1) < len(failed):
811
      # TODO: Handle failing nodes
812
      logging.error("More than half of the nodes failed")
813

    
814
  def _GetNodeIp(self):
815
    """Helper for returning the node name/ip list.
816

817
    @rtype: (list, list)
818
    @return: a tuple of two lists, the first one with the node
819
        names and the second one with the node addresses
820

821
    """
822
    name_list = self._nodes.keys()
823
    addr_list = [self._nodes[name] for name in name_list]
824
    return name_list, addr_list
825

    
826
  def _UpdateJobQueueFile(self, file_name, data, replicate):
827
    """Writes a file locally and then replicates it to all nodes.
828

829
    This function will replace the contents of a file on the local
830
    node and then replicate it to all the other nodes we have.
831

832
    @type file_name: str
833
    @param file_name: the path of the file to be replicated
834
    @type data: str
835
    @param data: the new contents of the file
836
    @type replicate: boolean
837
    @param replicate: whether to spread the changes to the remote nodes
838

839
    """
840
    utils.WriteFile(file_name, data=data)
841

    
842
    if replicate:
843
      names, addrs = self._GetNodeIp()
844
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
845
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
846

    
847
  def _RenameFilesUnlocked(self, rename):
848
    """Renames a file locally and then replicate the change.
849

850
    This function will rename a file in the local queue directory
851
    and then replicate this rename to all the other nodes we have.
852

853
    @type rename: list of (old, new)
854
    @param rename: List containing tuples mapping old to new names
855

856
    """
857
    # Rename them locally
858
    for old, new in rename:
859
      utils.RenameFile(old, new, mkdir=True)
860

    
861
    # ... and on all nodes
862
    names, addrs = self._GetNodeIp()
863
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
864
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
865

    
866
  @staticmethod
867
  def _FormatJobID(job_id):
868
    """Convert a job ID to string format.
869

870
    Currently this just does C{str(job_id)} after performing some
871
    checks, but if we want to change the job id format this will
872
    abstract this change.
873

874
    @type job_id: int or long
875
    @param job_id: the numeric job id
876
    @rtype: str
877
    @return: the formatted job id
878

879
    """
880
    if not isinstance(job_id, (int, long)):
881
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
882
    if job_id < 0:
883
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
884

    
885
    return str(job_id)
886

    
887
  @classmethod
888
  def _GetArchiveDirectory(cls, job_id):
889
    """Returns the archive directory for a job.
890

891
    @type job_id: str
892
    @param job_id: Job identifier
893
    @rtype: str
894
    @return: Directory name
895

896
    """
897
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
898

    
899
  def _NewSerialsUnlocked(self, count):
900
    """Generates a new job identifier.
901

902
    Job identifiers are unique during the lifetime of a cluster.
903

904
    @type count: integer
905
    @param count: how many serials to return
906
    @rtype: str
907
    @return: a string representing the job identifier.
908

909
    """
910
    assert count > 0
911
    # New number
912
    serial = self._last_serial + count
913

    
914
    # Write to file
915
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
916
                             "%s\n" % serial, True)
917

    
918
    result = [self._FormatJobID(v)
919
              for v in range(self._last_serial, serial + 1)]
920
    # Keep it only if we were able to write the file
921
    self._last_serial = serial
922

    
923
    return result
924

    
925
  @staticmethod
926
  def _GetJobPath(job_id):
927
    """Returns the job file for a given job id.
928

929
    @type job_id: str
930
    @param job_id: the job identifier
931
    @rtype: str
932
    @return: the path to the job file
933

934
    """
935
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
936

    
937
  @classmethod
938
  def _GetArchivedJobPath(cls, job_id):
939
    """Returns the archived job file for a give job id.
940

941
    @type job_id: str
942
    @param job_id: the job identifier
943
    @rtype: str
944
    @return: the path to the archived job file
945

946
    """
947
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
948
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
949

    
950
  def _GetJobIDsUnlocked(self, sort=True):
951
    """Return all known job IDs.
952

953
    The method only looks at disk because it's a requirement that all
954
    jobs are present on disk (so in the _memcache we don't have any
955
    extra IDs).
956

957
    @type sort: boolean
958
    @param sort: perform sorting on the returned job ids
959
    @rtype: list
960
    @return: the list of job IDs
961

962
    """
963
    jlist = []
964
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
965
      m = self._RE_JOB_FILE.match(filename)
966
      if m:
967
        jlist.append(m.group(1))
968
    if sort:
969
      jlist = utils.NiceSort(jlist)
970
    return jlist
971

    
972
  def _LoadJobUnlocked(self, job_id):
973
    """Loads a job from the disk or memory.
974

975
    Given a job id, this will return the cached job object if
976
    existing, or try to load the job from the disk. If loading from
977
    disk, it will also add the job to the cache.
978

979
    @param job_id: the job id
980
    @rtype: L{_QueuedJob} or None
981
    @return: either None or the job object
982

983
    """
984
    job = self._memcache.get(job_id, None)
985
    if job:
986
      logging.debug("Found job %s in memcache", job_id)
987
      return job
988

    
989
    try:
990
      job = self._LoadJobFromDisk(job_id)
991
    except errors.JobFileCorrupted:
992
      old_path = self._GetJobPath(job_id)
993
      new_path = self._GetArchivedJobPath(job_id)
994
      if old_path == new_path:
995
        # job already archived (future case)
996
        logging.exception("Can't parse job %s", job_id)
997
      else:
998
        # non-archived case
999
        logging.exception("Can't parse job %s, will archive.", job_id)
1000
        self._RenameFilesUnlocked([(old_path, new_path)])
1001
      return None
1002

    
1003
    self._memcache[job_id] = job
1004
    logging.debug("Added job %s to the cache", job_id)
1005
    return job
1006

    
1007
  def _LoadJobFromDisk(self, job_id):
1008
    """Load the given job file from disk.
1009

1010
    Given a job file, read, load and restore it in a _QueuedJob format.
1011

1012
    @type job_id: string
1013
    @param job_id: job identifier
1014
    @rtype: L{_QueuedJob} or None
1015
    @return: either None or the job object
1016

1017
    """
1018
    filepath = self._GetJobPath(job_id)
1019
    logging.debug("Loading job from %s", filepath)
1020
    try:
1021
      raw_data = utils.ReadFile(filepath)
1022
    except EnvironmentError, err:
1023
      if err.errno in (errno.ENOENT, ):
1024
        return None
1025
      raise
1026

    
1027
    try:
1028
      data = serializer.LoadJson(raw_data)
1029
      job = _QueuedJob.Restore(self, data)
1030
    except Exception, err: # pylint: disable-msg=W0703
1031
      raise errors.JobFileCorrupted(err)
1032

    
1033
    return job
1034

    
1035
  def SafeLoadJobFromDisk(self, job_id):
1036
    """Load the given job file from disk.
1037

1038
    Given a job file, read, load and restore it in a _QueuedJob format.
1039
    In case of error reading the job, it gets returned as None, and the
1040
    exception is logged.
1041

1042
    @type job_id: string
1043
    @param job_id: job identifier
1044
    @rtype: L{_QueuedJob} or None
1045
    @return: either None or the job object
1046

1047
    """
1048
    try:
1049
      return self._LoadJobFromDisk(job_id)
1050
    except (errors.JobFileCorrupted, EnvironmentError):
1051
      logging.exception("Can't load/parse job %s", job_id)
1052
      return None
1053

    
1054
  @staticmethod
1055
  def _IsQueueMarkedDrain():
1056
    """Check if the queue is marked from drain.
1057

1058
    This currently uses the queue drain file, which makes it a
1059
    per-node flag. In the future this can be moved to the config file.
1060

1061
    @rtype: boolean
1062
    @return: True of the job queue is marked for draining
1063

1064
    """
1065
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1066

    
1067
  def _UpdateQueueSizeUnlocked(self):
1068
    """Update the queue size.
1069

1070
    """
1071
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1072

    
1073
  @utils.LockedMethod
1074
  @_RequireOpenQueue
1075
  def SetDrainFlag(self, drain_flag):
1076
    """Sets the drain flag for the queue.
1077

1078
    @type drain_flag: boolean
1079
    @param drain_flag: Whether to set or unset the drain flag
1080

1081
    """
1082
    if drain_flag:
1083
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1084
    else:
1085
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1086

    
1087
    self._drained = drain_flag
1088

    
1089
    return True
1090

    
1091
  @_RequireOpenQueue
1092
  def _SubmitJobUnlocked(self, job_id, ops):
1093
    """Create and store a new job.
1094

1095
    This enters the job into our job queue and also puts it on the new
1096
    queue, in order for it to be picked up by the queue processors.
1097

1098
    @type job_id: job ID
1099
    @param job_id: the job ID for the new job
1100
    @type ops: list
1101
    @param ops: The list of OpCodes that will become the new job.
1102
    @rtype: L{_QueuedJob}
1103
    @return: the job object to be queued
1104
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1105
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1106

1107
    """
1108
    # Ok when sharing the big job queue lock, as the drain file is created when
1109
    # the lock is exclusive.
1110
    if self._drained:
1111
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1112

    
1113
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1114
      raise errors.JobQueueFull()
1115

    
1116
    job = _QueuedJob(self, job_id, ops)
1117

    
1118
    # Write to disk
1119
    self.UpdateJobUnlocked(job)
1120

    
1121
    self._queue_size += 1
1122

    
1123
    logging.debug("Adding new job %s to the cache", job_id)
1124
    self._memcache[job_id] = job
1125

    
1126
    return job
1127

    
1128
  @utils.LockedMethod
1129
  @_RequireOpenQueue
1130
  def SubmitJob(self, ops):
1131
    """Create and store a new job.
1132

1133
    @see: L{_SubmitJobUnlocked}
1134

1135
    """
1136
    job_id = self._NewSerialsUnlocked(1)[0]
1137
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1138
    return job_id
1139

    
1140
  @utils.LockedMethod
1141
  @_RequireOpenQueue
1142
  def SubmitManyJobs(self, jobs):
1143
    """Create and store multiple jobs.
1144

1145
    @see: L{_SubmitJobUnlocked}
1146

1147
    """
1148
    results = []
1149
    tasks = []
1150
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1151
    for job_id, ops in zip(all_job_ids, jobs):
1152
      try:
1153
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1154
        status = True
1155
        data = job_id
1156
      except errors.GenericError, err:
1157
        data = str(err)
1158
        status = False
1159
      results.append((status, data))
1160
    self._wpool.AddManyTasks(tasks)
1161

    
1162
    return results
1163

    
1164
  @_RequireOpenQueue
1165
  def UpdateJobUnlocked(self, job, replicate=True):
1166
    """Update a job's on disk storage.
1167

1168
    After a job has been modified, this function needs to be called in
1169
    order to write the changes to disk and replicate them to the other
1170
    nodes.
1171

1172
    @type job: L{_QueuedJob}
1173
    @param job: the changed job
1174
    @type replicate: boolean
1175
    @param replicate: whether to replicate the change to remote nodes
1176

1177
    """
1178
    filename = self._GetJobPath(job.id)
1179
    data = serializer.DumpJson(job.Serialize(), indent=False)
1180
    logging.debug("Writing job %s to %s", job.id, filename)
1181
    self._UpdateJobQueueFile(filename, data, replicate)
1182

    
1183
    # Notify waiters about potential changes
1184
    job.change.notifyAll()
1185

    
1186
  @utils.LockedMethod
1187
  @_RequireOpenQueue
1188
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1189
                        timeout):
1190
    """Waits for changes in a job.
1191

1192
    @type job_id: string
1193
    @param job_id: Job identifier
1194
    @type fields: list of strings
1195
    @param fields: Which fields to check for changes
1196
    @type prev_job_info: list or None
1197
    @param prev_job_info: Last job information returned
1198
    @type prev_log_serial: int
1199
    @param prev_log_serial: Last job message serial number
1200
    @type timeout: float
1201
    @param timeout: maximum time to wait
1202
    @rtype: tuple (job info, log entries)
1203
    @return: a tuple of the job information as required via
1204
        the fields parameter, and the log entries as a list
1205

1206
        if the job has not changed and the timeout has expired,
1207
        we instead return a special value,
1208
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1209
        as such by the clients
1210

1211
    """
1212
    job = self._LoadJobUnlocked(job_id)
1213
    if not job:
1214
      logging.debug("Job %s not found", job_id)
1215
      return None
1216

    
1217
    def _CheckForChanges():
1218
      logging.debug("Waiting for changes in job %s", job_id)
1219

    
1220
      status = job.CalcStatus()
1221
      job_info = job.GetInfo(fields)
1222
      log_entries = job.GetLogEntries(prev_log_serial)
1223

    
1224
      # Serializing and deserializing data can cause type changes (e.g. from
1225
      # tuple to list) or precision loss. We're doing it here so that we get
1226
      # the same modifications as the data received from the client. Without
1227
      # this, the comparison afterwards might fail without the data being
1228
      # significantly different.
1229
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1230
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1231

    
1232
      # Don't even try to wait if the job is no longer running, there will be
1233
      # no changes.
1234
      if (status not in (constants.JOB_STATUS_QUEUED,
1235
                         constants.JOB_STATUS_RUNNING,
1236
                         constants.JOB_STATUS_WAITLOCK) or
1237
          prev_job_info != job_info or
1238
          (log_entries and prev_log_serial != log_entries[0][0])):
1239
        logging.debug("Job %s changed", job_id)
1240
        return (job_info, log_entries)
1241

    
1242
      raise utils.RetryAgain()
1243

    
1244
    try:
1245
      # Setting wait function to release the queue lock while waiting
1246
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1247
                         wait_fn=job.change.wait)
1248
    except utils.RetryTimeout:
1249
      return constants.JOB_NOTCHANGED
1250

    
1251
  @utils.LockedMethod
1252
  @_RequireOpenQueue
1253
  def CancelJob(self, job_id):
1254
    """Cancels a job.
1255

1256
    This will only succeed if the job has not started yet.
1257

1258
    @type job_id: string
1259
    @param job_id: job ID of job to be cancelled.
1260

1261
    """
1262
    logging.info("Cancelling job %s", job_id)
1263

    
1264
    job = self._LoadJobUnlocked(job_id)
1265
    if not job:
1266
      logging.debug("Job %s not found", job_id)
1267
      return (False, "Job %s not found" % job_id)
1268

    
1269
    job_status = job.CalcStatus()
1270

    
1271
    if job_status not in (constants.JOB_STATUS_QUEUED,
1272
                          constants.JOB_STATUS_WAITLOCK):
1273
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1274
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1275

    
1276
    if job_status == constants.JOB_STATUS_QUEUED:
1277
      self.CancelJobUnlocked(job)
1278
      return (True, "Job %s canceled" % job.id)
1279

    
1280
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1281
      # The worker will notice the new status and cancel the job
1282
      try:
1283
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1284
      finally:
1285
        self.UpdateJobUnlocked(job)
1286
      return (True, "Job %s will be canceled" % job.id)
1287

    
1288
  @_RequireOpenQueue
1289
  def CancelJobUnlocked(self, job):
1290
    """Marks a job as canceled.
1291

1292
    """
1293
    try:
1294
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1295
                            "Job canceled by request")
1296
    finally:
1297
      self.UpdateJobUnlocked(job)
1298

    
1299
  @_RequireOpenQueue
1300
  def _ArchiveJobsUnlocked(self, jobs):
1301
    """Archives jobs.
1302

1303
    @type jobs: list of L{_QueuedJob}
1304
    @param jobs: Job objects
1305
    @rtype: int
1306
    @return: Number of archived jobs
1307

1308
    """
1309
    archive_jobs = []
1310
    rename_files = []
1311
    for job in jobs:
1312
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1313
                                  constants.JOB_STATUS_SUCCESS,
1314
                                  constants.JOB_STATUS_ERROR):
1315
        logging.debug("Job %s is not yet done", job.id)
1316
        continue
1317

    
1318
      archive_jobs.append(job)
1319

    
1320
      old = self._GetJobPath(job.id)
1321
      new = self._GetArchivedJobPath(job.id)
1322
      rename_files.append((old, new))
1323

    
1324
    # TODO: What if 1..n files fail to rename?
1325
    self._RenameFilesUnlocked(rename_files)
1326

    
1327
    logging.debug("Successfully archived job(s) %s",
1328
                  utils.CommaJoin(job.id for job in archive_jobs))
1329

    
1330
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1331
    # the files, we update the cached queue size from the filesystem. When we
1332
    # get around to fix the TODO: above, we can use the number of actually
1333
    # archived jobs to fix this.
1334
    self._UpdateQueueSizeUnlocked()
1335
    return len(archive_jobs)
1336

    
1337
  @utils.LockedMethod
1338
  @_RequireOpenQueue
1339
  def ArchiveJob(self, job_id):
1340
    """Archives a job.
1341

1342
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1343

1344
    @type job_id: string
1345
    @param job_id: Job ID of job to be archived.
1346
    @rtype: bool
1347
    @return: Whether job was archived
1348

1349
    """
1350
    logging.info("Archiving job %s", job_id)
1351

    
1352
    job = self._LoadJobUnlocked(job_id)
1353
    if not job:
1354
      logging.debug("Job %s not found", job_id)
1355
      return False
1356

    
1357
    return self._ArchiveJobsUnlocked([job]) == 1
1358

    
1359
  @utils.LockedMethod
1360
  @_RequireOpenQueue
1361
  def AutoArchiveJobs(self, age, timeout):
1362
    """Archives all jobs based on age.
1363

1364
    The method will archive all jobs which are older than the age
1365
    parameter. For jobs that don't have an end timestamp, the start
1366
    timestamp will be considered. The special '-1' age will cause
1367
    archival of all jobs (that are not running or queued).
1368

1369
    @type age: int
1370
    @param age: the minimum age in seconds
1371

1372
    """
1373
    logging.info("Archiving jobs with age more than %s seconds", age)
1374

    
1375
    now = time.time()
1376
    end_time = now + timeout
1377
    archived_count = 0
1378
    last_touched = 0
1379

    
1380
    all_job_ids = self._GetJobIDsUnlocked()
1381
    pending = []
1382
    for idx, job_id in enumerate(all_job_ids):
1383
      last_touched = idx + 1
1384

    
1385
      # Not optimal because jobs could be pending
1386
      # TODO: Measure average duration for job archival and take number of
1387
      # pending jobs into account.
1388
      if time.time() > end_time:
1389
        break
1390

    
1391
      # Returns None if the job failed to load
1392
      job = self._LoadJobUnlocked(job_id)
1393
      if job:
1394
        if job.end_timestamp is None:
1395
          if job.start_timestamp is None:
1396
            job_age = job.received_timestamp
1397
          else:
1398
            job_age = job.start_timestamp
1399
        else:
1400
          job_age = job.end_timestamp
1401

    
1402
        if age == -1 or now - job_age[0] > age:
1403
          pending.append(job)
1404

    
1405
          # Archive 10 jobs at a time
1406
          if len(pending) >= 10:
1407
            archived_count += self._ArchiveJobsUnlocked(pending)
1408
            pending = []
1409

    
1410
    if pending:
1411
      archived_count += self._ArchiveJobsUnlocked(pending)
1412

    
1413
    return (archived_count, len(all_job_ids) - last_touched)
1414

    
1415
  def QueryJobs(self, job_ids, fields):
1416
    """Returns a list of jobs in queue.
1417

1418
    @type job_ids: list
1419
    @param job_ids: sequence of job identifiers or None for all
1420
    @type fields: list
1421
    @param fields: names of fields to return
1422
    @rtype: list
1423
    @return: list one element per job, each element being list with
1424
        the requested fields
1425

1426
    """
1427
    jobs = []
1428
    list_all = False
1429
    if not job_ids:
1430
      # Since files are added to/removed from the queue atomically, there's no
1431
      # risk of getting the job ids in an inconsistent state.
1432
      job_ids = self._GetJobIDsUnlocked()
1433
      list_all = True
1434

    
1435
    for job_id in job_ids:
1436
      job = self.SafeLoadJobFromDisk(job_id)
1437
      if job is not None:
1438
        jobs.append(job.GetInfo(fields))
1439
      elif not list_all:
1440
        jobs.append(None)
1441

    
1442
    return jobs
1443

    
1444
  @utils.LockedMethod
1445
  @_RequireOpenQueue
1446
  def Shutdown(self):
1447
    """Stops the job queue.
1448

1449
    This shutdowns all the worker threads an closes the queue.
1450

1451
    """
1452
    self._wpool.TerminateWorkers()
1453

    
1454
    self._queue_filelock.Close()
1455
    self._queue_filelock = None