Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c56ec146

History | View | Annotate | Download (21.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48

    
49
JOBQUEUE_THREADS = 5
50

    
51

    
52
def TimeStampNow():
53
  return utils.SplitTime(time.time())
54

    
55

    
56
class _QueuedOpCode(object):
57
  """Encasulates an opcode object.
58

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

62
  """
63
  def __init__(self, op):
64
    self.input = op
65
    self.status = constants.OP_STATUS_QUEUED
66
    self.result = None
67
    self.log = []
68
    self.start_timestamp = None
69
    self.end_timestamp = None
70

    
71
  @classmethod
72
  def Restore(cls, state):
73
    obj = _QueuedOpCode.__new__(cls)
74
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75
    obj.status = state["status"]
76
    obj.result = state["result"]
77
    obj.log = state["log"]
78
    obj.start_timestamp = state.get("start_timestamp", None)
79
    obj.end_timestamp = state.get("end_timestamp", None)
80
    return obj
81

    
82
  def Serialize(self):
83
    return {
84
      "input": self.input.__getstate__(),
85
      "status": self.status,
86
      "result": self.result,
87
      "log": self.log,
88
      "start_timestamp": self.start_timestamp,
89
      "end_timestamp": self.end_timestamp,
90
      }
91

    
92

    
93
class _QueuedJob(object):
94
  """In-memory job representation.
95

96
  This is what we use to track the user-submitted jobs. Locking must be taken
97
  care of by users of this class.
98

99
  """
100
  def __init__(self, queue, job_id, ops):
101
    if not ops:
102
      # TODO
103
      raise Exception("No opcodes")
104

    
105
    self.queue = queue
106
    self.id = job_id
107
    self.ops = [_QueuedOpCode(op) for op in ops]
108
    self.run_op_index = -1
109
    self.log_serial = 0
110
    self.received_timestamp = TimeStampNow()
111
    self.start_timestamp = None
112
    self.end_timestamp = None
113

    
114
    # Condition to wait for changes
115
    self.change = threading.Condition(self.queue._lock)
116

    
117
  @classmethod
118
  def Restore(cls, queue, state):
119
    obj = _QueuedJob.__new__(cls)
120
    obj.queue = queue
121
    obj.id = state["id"]
122
    obj.run_op_index = state["run_op_index"]
123
    obj.received_timestamp = state.get("received_timestamp", None)
124
    obj.start_timestamp = state.get("start_timestamp", None)
125
    obj.end_timestamp = state.get("end_timestamp", None)
126

    
127
    obj.ops = []
128
    obj.log_serial = 0
129
    for op_state in state["ops"]:
130
      op = _QueuedOpCode.Restore(op_state)
131
      for log_entry in op.log:
132
        obj.log_serial = max(obj.log_serial, log_entry[0])
133
      obj.ops.append(op)
134

    
135
    # Condition to wait for changes
136
    obj.change = threading.Condition(obj.queue._lock)
137

    
138
    return obj
139

    
140
  def Serialize(self):
141
    return {
142
      "id": self.id,
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145
      "start_timestamp": self.start_timestamp,
146
      "end_timestamp": self.end_timestamp,
147
      "received_timestamp": self.received_timestamp,
148
      }
149

    
150
  def CalcStatus(self):
151
    status = constants.JOB_STATUS_QUEUED
152

    
153
    all_success = True
154
    for op in self.ops:
155
      if op.status == constants.OP_STATUS_SUCCESS:
156
        continue
157

    
158
      all_success = False
159

    
160
      if op.status == constants.OP_STATUS_QUEUED:
161
        pass
162
      elif op.status == constants.OP_STATUS_RUNNING:
163
        status = constants.JOB_STATUS_RUNNING
164
      elif op.status == constants.OP_STATUS_ERROR:
165
        status = constants.JOB_STATUS_ERROR
166
        # The whole job fails if one opcode failed
167
        break
168
      elif op.status == constants.OP_STATUS_CANCELED:
169
        status = constants.OP_STATUS_CANCELED
170
        break
171

    
172
    if all_success:
173
      status = constants.JOB_STATUS_SUCCESS
174

    
175
    return status
176

    
177
  def GetLogEntries(self, newer_than):
178
    if newer_than is None:
179
      serial = -1
180
    else:
181
      serial = newer_than
182

    
183
    entries = []
184
    for op in self.ops:
185
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
186

    
187
    return entries
188

    
189

    
190
class _JobQueueWorker(workerpool.BaseWorker):
191
  def RunTask(self, job):
192
    """Job executor.
193

194
    This functions processes a job. It is closely tied to the _QueuedJob and
195
    _QueuedOpCode classes.
196

197
    """
198
    logging.debug("Worker %s processing job %s",
199
                  self.worker_id, job.id)
200
    proc = mcpu.Processor(self.pool.queue.context)
201
    queue = job.queue
202
    try:
203
      try:
204
        count = len(job.ops)
205
        for idx, op in enumerate(job.ops):
206
          try:
207
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
208

    
209
            queue.acquire()
210
            try:
211
              job.run_op_index = idx
212
              op.status = constants.OP_STATUS_RUNNING
213
              op.result = None
214
              op.start_timestamp = TimeStampNow()
215
              if idx == 0: # first opcode
216
                job.start_timestamp = op.start_timestamp
217
              queue.UpdateJobUnlocked(job)
218

    
219
              input_opcode = op.input
220
            finally:
221
              queue.release()
222

    
223
            def _Log(*args):
224
              """Append a log entry.
225

226
              """
227
              assert len(args) < 3
228

    
229
              if len(args) == 1:
230
                log_type = constants.ELOG_MESSAGE
231
                log_msg = args[0]
232
              else:
233
                log_type, log_msg = args
234

    
235
              # The time is split to make serialization easier and not lose
236
              # precision.
237
              timestamp = utils.SplitTime(time.time())
238

    
239
              queue.acquire()
240
              try:
241
                job.log_serial += 1
242
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
243

    
244
                job.change.notifyAll()
245
              finally:
246
                queue.release()
247

    
248
            # Make sure not to hold lock while _Log is called
249
            result = proc.ExecOpCode(input_opcode, _Log)
250

    
251
            queue.acquire()
252
            try:
253
              op.status = constants.OP_STATUS_SUCCESS
254
              op.result = result
255
              op.end_timestamp = TimeStampNow()
256
              queue.UpdateJobUnlocked(job)
257
            finally:
258
              queue.release()
259

    
260
            logging.debug("Op %s/%s: Successfully finished %s",
261
                          idx + 1, count, op)
262
          except Exception, err:
263
            queue.acquire()
264
            try:
265
              try:
266
                op.status = constants.OP_STATUS_ERROR
267
                op.result = str(err)
268
                op.end_timestamp = TimeStampNow()
269
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
270
              finally:
271
                queue.UpdateJobUnlocked(job)
272
            finally:
273
              queue.release()
274
            raise
275

    
276
      except errors.GenericError, err:
277
        logging.exception("Ganeti exception")
278
      except:
279
        logging.exception("Unhandled exception")
280
    finally:
281
      queue.acquire()
282
      try:
283
        try:
284
          job.run_op_idx = -1
285
          job.end_timestamp = TimeStampNow()
286
          queue.UpdateJobUnlocked(job)
287
        finally:
288
          job_id = job.id
289
          status = job.CalcStatus()
290
      finally:
291
        queue.release()
292
      logging.debug("Worker %s finished job %s, status = %s",
293
                    self.worker_id, job_id, status)
294

    
295

    
296
class _JobQueueWorkerPool(workerpool.WorkerPool):
297
  def __init__(self, queue):
298
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
299
                                              _JobQueueWorker)
300
    self.queue = queue
301

    
302

    
303
class JobQueue(object):
304
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
305

    
306
  def _RequireOpenQueue(fn):
307
    """Decorator for "public" functions.
308

309
    This function should be used for all "public" functions. That is, functions
310
    usually called from other classes.
311

312
    Important: Use this decorator only after utils.LockedMethod!
313

314
    Example:
315
      @utils.LockedMethod
316
      @_RequireOpenQueue
317
      def Example(self):
318
        pass
319

320
    """
321
    def wrapper(self, *args, **kwargs):
322
      assert self._queue_lock is not None, "Queue should be open"
323
      return fn(self, *args, **kwargs)
324
    return wrapper
325

    
326
  def __init__(self, context):
327
    self.context = context
328
    self._memcache = weakref.WeakValueDictionary()
329
    self._my_hostname = utils.HostInfo().name
330

    
331
    # Locking
332
    self._lock = threading.Lock()
333
    self.acquire = self._lock.acquire
334
    self.release = self._lock.release
335

    
336
    # Initialize
337
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
338

    
339
    # Read serial file
340
    self._last_serial = jstore.ReadSerial()
341
    assert self._last_serial is not None, ("Serial file was modified between"
342
                                           " check in jstore and here")
343

    
344
    # Get initial list of nodes
345
    self._nodes = set(self.context.cfg.GetNodeList())
346

    
347
    # Remove master node
348
    try:
349
      self._nodes.remove(self._my_hostname)
350
    except ValueError:
351
      pass
352

    
353
    # TODO: Check consistency across nodes
354

    
355
    # Setup worker pool
356
    self._wpool = _JobQueueWorkerPool(self)
357

    
358
    # We need to lock here because WorkerPool.AddTask() may start a job while
359
    # we're still doing our work.
360
    self.acquire()
361
    try:
362
      for job in self._GetJobsUnlocked(None):
363
        status = job.CalcStatus()
364

    
365
        if status in (constants.JOB_STATUS_QUEUED, ):
366
          self._wpool.AddTask(job)
367

    
368
        elif status in (constants.JOB_STATUS_RUNNING, ):
369
          logging.warning("Unfinished job %s found: %s", job.id, job)
370
          try:
371
            for op in job.ops:
372
              op.status = constants.OP_STATUS_ERROR
373
              op.result = "Unclean master daemon shutdown"
374
          finally:
375
            self.UpdateJobUnlocked(job)
376
    finally:
377
      self.release()
378

    
379
  @utils.LockedMethod
380
  @_RequireOpenQueue
381
  def AddNode(self, node_name):
382
    assert node_name != self._my_hostname
383

    
384
    # Clean queue directory on added node
385
    rpc.call_jobqueue_purge(node_name)
386

    
387
    # Upload the whole queue excluding archived jobs
388
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
389

    
390
    # Upload current serial file
391
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
392

    
393
    for file_name in files:
394
      # Read file content
395
      fd = open(file_name, "r")
396
      try:
397
        content = fd.read()
398
      finally:
399
        fd.close()
400

    
401
      result = rpc.call_jobqueue_update([node_name], file_name, content)
402
      if not result[node_name]:
403
        logging.error("Failed to upload %s to %s", file_name, node_name)
404

    
405
    self._nodes.add(node_name)
406

    
407
  @utils.LockedMethod
408
  @_RequireOpenQueue
409
  def RemoveNode(self, node_name):
410
    try:
411
      # The queue is removed by the "leave node" RPC call.
412
      self._nodes.remove(node_name)
413
    except KeyError:
414
      pass
415

    
416
  def _CheckRpcResult(self, result, nodes, failmsg):
417
    failed = []
418
    success = []
419

    
420
    for node in nodes:
421
      if result[node]:
422
        success.append(node)
423
      else:
424
        failed.append(node)
425

    
426
    if failed:
427
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
428

    
429
    # +1 for the master node
430
    if (len(success) + 1) < len(failed):
431
      # TODO: Handle failing nodes
432
      logging.error("More than half of the nodes failed")
433

    
434
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
435
    """Writes a file locally and then replicates it to all nodes.
436

437
    """
438
    utils.WriteFile(file_name, data=data)
439

    
440
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
441
    self._CheckRpcResult(result, self._nodes,
442
                         "Updating %s" % file_name)
443

    
444
  def _RenameFileUnlocked(self, old, new):
445
    os.rename(old, new)
446

    
447
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
448
    self._CheckRpcResult(result, self._nodes,
449
                         "Moving %s to %s" % (old, new))
450

    
451
  def _FormatJobID(self, job_id):
452
    if not isinstance(job_id, (int, long)):
453
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
454
    if job_id < 0:
455
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
456

    
457
    return str(job_id)
458

    
459
  def _NewSerialUnlocked(self):
460
    """Generates a new job identifier.
461

462
    Job identifiers are unique during the lifetime of a cluster.
463

464
    Returns: A string representing the job identifier.
465

466
    """
467
    # New number
468
    serial = self._last_serial + 1
469

    
470
    # Write to file
471
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
472
                                        "%s\n" % serial)
473

    
474
    # Keep it only if we were able to write the file
475
    self._last_serial = serial
476

    
477
    return self._FormatJobID(serial)
478

    
479
  @staticmethod
480
  def _GetJobPath(job_id):
481
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
482

    
483
  @staticmethod
484
  def _GetArchivedJobPath(job_id):
485
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
486

    
487
  @classmethod
488
  def _ExtractJobID(cls, name):
489
    m = cls._RE_JOB_FILE.match(name)
490
    if m:
491
      return m.group(1)
492
    else:
493
      return None
494

    
495
  def _GetJobIDsUnlocked(self, archived=False):
496
    """Return all known job IDs.
497

498
    If the parameter archived is True, archived jobs IDs will be
499
    included. Currently this argument is unused.
500

501
    The method only looks at disk because it's a requirement that all
502
    jobs are present on disk (so in the _memcache we don't have any
503
    extra IDs).
504

505
    """
506
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
507
    jlist = utils.NiceSort(jlist)
508
    return jlist
509

    
510
  def _ListJobFiles(self):
511
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
512
            if self._RE_JOB_FILE.match(name)]
513

    
514
  def _LoadJobUnlocked(self, job_id):
515
    job = self._memcache.get(job_id, None)
516
    if job:
517
      logging.debug("Found job %s in memcache", job_id)
518
      return job
519

    
520
    filepath = self._GetJobPath(job_id)
521
    logging.debug("Loading job from %s", filepath)
522
    try:
523
      fd = open(filepath, "r")
524
    except IOError, err:
525
      if err.errno in (errno.ENOENT, ):
526
        return None
527
      raise
528
    try:
529
      data = serializer.LoadJson(fd.read())
530
    finally:
531
      fd.close()
532

    
533
    job = _QueuedJob.Restore(self, data)
534
    self._memcache[job_id] = job
535
    logging.debug("Added job %s to the cache", job_id)
536
    return job
537

    
538
  def _GetJobsUnlocked(self, job_ids):
539
    if not job_ids:
540
      job_ids = self._GetJobIDsUnlocked()
541

    
542
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
543

    
544
  @utils.LockedMethod
545
  @_RequireOpenQueue
546
  def SubmitJob(self, ops):
547
    """Create and store a new job.
548

549
    This enters the job into our job queue and also puts it on the new
550
    queue, in order for it to be picked up by the queue processors.
551

552
    @type ops: list
553
    @param ops: The list of OpCodes that will become the new job.
554

555
    """
556
    # Get job identifier
557
    job_id = self._NewSerialUnlocked()
558
    job = _QueuedJob(self, job_id, ops)
559

    
560
    # Write to disk
561
    self.UpdateJobUnlocked(job)
562

    
563
    logging.debug("Adding new job %s to the cache", job_id)
564
    self._memcache[job_id] = job
565

    
566
    # Add to worker pool
567
    self._wpool.AddTask(job)
568

    
569
    return job.id
570

    
571
  @_RequireOpenQueue
572
  def UpdateJobUnlocked(self, job):
573
    filename = self._GetJobPath(job.id)
574
    data = serializer.DumpJson(job.Serialize(), indent=False)
575
    logging.debug("Writing job %s to %s", job.id, filename)
576
    self._WriteAndReplicateFileUnlocked(filename, data)
577

    
578
    # Notify waiters about potential changes
579
    job.change.notifyAll()
580

    
581
  @utils.LockedMethod
582
  @_RequireOpenQueue
583
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
584
                        timeout):
585
    """Waits for changes in a job.
586

587
    @type job_id: string
588
    @param job_id: Job identifier
589
    @type fields: list of strings
590
    @param fields: Which fields to check for changes
591
    @type prev_job_info: list or None
592
    @param prev_job_info: Last job information returned
593
    @type prev_log_serial: int
594
    @param prev_log_serial: Last job message serial number
595
    @type timeout: float
596
    @param timeout: maximum time to wait
597

598
    """
599
    logging.debug("Waiting for changes in job %s", job_id)
600
    end_time = time.time() + timeout
601
    while True:
602
      delta_time = end_time - time.time()
603
      if delta_time < 0:
604
        return constants.JOB_NOTCHANGED
605

    
606
      job = self._LoadJobUnlocked(job_id)
607
      if not job:
608
        logging.debug("Job %s not found", job_id)
609
        break
610

    
611
      status = job.CalcStatus()
612
      job_info = self._GetJobInfoUnlocked(job, fields)
613
      log_entries = job.GetLogEntries(prev_log_serial)
614

    
615
      # Serializing and deserializing data can cause type changes (e.g. from
616
      # tuple to list) or precision loss. We're doing it here so that we get
617
      # the same modifications as the data received from the client. Without
618
      # this, the comparison afterwards might fail without the data being
619
      # significantly different.
620
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
621
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
622

    
623
      if status not in (constants.JOB_STATUS_QUEUED,
624
                        constants.JOB_STATUS_RUNNING):
625
        # Don't even try to wait if the job is no longer running, there will be
626
        # no changes.
627
        break
628

    
629
      if (prev_job_info != job_info or
630
          (log_entries and prev_log_serial != log_entries[0][0])):
631
        break
632

    
633
      logging.debug("Waiting again")
634

    
635
      # Release the queue lock while waiting
636
      job.change.wait(delta_time)
637

    
638
    logging.debug("Job %s changed", job_id)
639

    
640
    return (job_info, log_entries)
641

    
642
  @utils.LockedMethod
643
  @_RequireOpenQueue
644
  def CancelJob(self, job_id):
645
    """Cancels a job.
646

647
    @type job_id: string
648
    @param job_id: Job ID of job to be cancelled.
649

650
    """
651
    logging.debug("Cancelling job %s", job_id)
652

    
653
    job = self._LoadJobUnlocked(job_id)
654
    if not job:
655
      logging.debug("Job %s not found", job_id)
656
      return
657

    
658
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
659
      logging.debug("Job %s is no longer in the queue", job.id)
660
      return
661

    
662
    try:
663
      for op in job.ops:
664
        op.status = constants.OP_STATUS_ERROR
665
        op.result = "Job cancelled by request"
666
    finally:
667
      self.UpdateJobUnlocked(job)
668

    
669
  @utils.LockedMethod
670
  @_RequireOpenQueue
671
  def ArchiveJob(self, job_id):
672
    """Archives a job.
673

674
    @type job_id: string
675
    @param job_id: Job ID of job to be archived.
676

677
    """
678
    logging.debug("Archiving job %s", job_id)
679

    
680
    job = self._LoadJobUnlocked(job_id)
681
    if not job:
682
      logging.debug("Job %s not found", job_id)
683
      return
684

    
685
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
686
                                constants.JOB_STATUS_SUCCESS,
687
                                constants.JOB_STATUS_ERROR):
688
      logging.debug("Job %s is not yet done", job.id)
689
      return
690

    
691
    old = self._GetJobPath(job.id)
692
    new = self._GetArchivedJobPath(job.id)
693

    
694
    self._RenameFileUnlocked(old, new)
695

    
696
    logging.debug("Successfully archived job %s", job.id)
697

    
698
  def _GetJobInfoUnlocked(self, job, fields):
699
    row = []
700
    for fname in fields:
701
      if fname == "id":
702
        row.append(job.id)
703
      elif fname == "status":
704
        row.append(job.CalcStatus())
705
      elif fname == "ops":
706
        row.append([op.input.__getstate__() for op in job.ops])
707
      elif fname == "opresult":
708
        row.append([op.result for op in job.ops])
709
      elif fname == "opstatus":
710
        row.append([op.status for op in job.ops])
711
      elif fname == "oplog":
712
        row.append([op.log for op in job.ops])
713
      elif fname == "opstart":
714
        row.append([op.start_timestamp for op in job.ops])
715
      elif fname == "opend":
716
        row.append([op.end_timestamp for op in job.ops])
717
      elif fname == "received_ts":
718
        row.append(job.received_timestamp)
719
      elif fname == "start_ts":
720
        row.append(job.start_timestamp)
721
      elif fname == "end_ts":
722
        row.append(job.end_timestamp)
723
      elif fname == "summary":
724
        row.append([op.input.Summary() for op in job.ops])
725
      else:
726
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
727
    return row
728

    
729
  @utils.LockedMethod
730
  @_RequireOpenQueue
731
  def QueryJobs(self, job_ids, fields):
732
    """Returns a list of jobs in queue.
733

734
    Args:
735
    - job_ids: Sequence of job identifiers or None for all
736
    - fields: Names of fields to return
737

738
    """
739
    jobs = []
740

    
741
    for job in self._GetJobsUnlocked(job_ids):
742
      if job is None:
743
        jobs.append(None)
744
      else:
745
        jobs.append(self._GetJobInfoUnlocked(job, fields))
746

    
747
    return jobs
748

    
749
  @utils.LockedMethod
750
  @_RequireOpenQueue
751
  def Shutdown(self):
752
    """Stops the job queue.
753

754
    """
755
    self._wpool.TerminateWorkers()
756

    
757
    self._queue_lock.Close()
758
    self._queue_lock = None