Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 0f9c08dc

History | View | Annotate | Download (43.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81
  @ivar stop_timestamp: timestamp for the end of the execution
82

83
  """
84
  __slots__ = ["input", "status", "result", "log",
85
               "start_timestamp", "exec_timestamp", "end_timestamp",
86
               "__weakref__"]
87

    
88
  def __init__(self, op):
89
    """Constructor for the _QuededOpCode.
90

91
    @type op: L{opcodes.OpCode}
92
    @param op: the opcode we encapsulate
93

94
    """
95
    self.input = op
96
    self.status = constants.OP_STATUS_QUEUED
97
    self.result = None
98
    self.log = []
99
    self.start_timestamp = None
100
    self.exec_timestamp = None
101
    self.end_timestamp = None
102

    
103
  @classmethod
104
  def Restore(cls, state):
105
    """Restore the _QueuedOpCode from the serialized form.
106

107
    @type state: dict
108
    @param state: the serialized state
109
    @rtype: _QueuedOpCode
110
    @return: a new _QueuedOpCode instance
111

112
    """
113
    obj = _QueuedOpCode.__new__(cls)
114
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115
    obj.status = state["status"]
116
    obj.result = state["result"]
117
    obj.log = state["log"]
118
    obj.start_timestamp = state.get("start_timestamp", None)
119
    obj.exec_timestamp = state.get("exec_timestamp", None)
120
    obj.end_timestamp = state.get("end_timestamp", None)
121
    return obj
122

    
123
  def Serialize(self):
124
    """Serializes this _QueuedOpCode.
125

126
    @rtype: dict
127
    @return: the dictionary holding the serialized state
128

129
    """
130
    return {
131
      "input": self.input.__getstate__(),
132
      "status": self.status,
133
      "result": self.result,
134
      "log": self.log,
135
      "start_timestamp": self.start_timestamp,
136
      "exec_timestamp": self.exec_timestamp,
137
      "end_timestamp": self.end_timestamp,
138
      }
139

    
140

    
141
class _QueuedJob(object):
142
  """In-memory job representation.
143

144
  This is what we use to track the user-submitted jobs. Locking must
145
  be taken care of by users of this class.
146

147
  @type queue: L{JobQueue}
148
  @ivar queue: the parent queue
149
  @ivar id: the job ID
150
  @type ops: list
151
  @ivar ops: the list of _QueuedOpCode that constitute the job
152
  @type log_serial: int
153
  @ivar log_serial: holds the index for the next log entry
154
  @ivar received_timestamp: the timestamp for when the job was received
155
  @ivar start_timestmap: the timestamp for start of execution
156
  @ivar end_timestamp: the timestamp for end of execution
157
  @ivar lock_status: In-memory locking information for debugging
158
  @ivar change: a Condition variable we use for waiting for job changes
159

160
  """
161
  # pylint: disable-msg=W0212
162
  __slots__ = ["queue", "id", "ops", "log_serial",
163
               "received_timestamp", "start_timestamp", "end_timestamp",
164
               "lock_status", "change",
165
               "__weakref__"]
166

    
167
  def __init__(self, queue, job_id, ops):
168
    """Constructor for the _QueuedJob.
169

170
    @type queue: L{JobQueue}
171
    @param queue: our parent queue
172
    @type job_id: job_id
173
    @param job_id: our job id
174
    @type ops: list
175
    @param ops: the list of opcodes we hold, which will be encapsulated
176
        in _QueuedOpCodes
177

178
    """
179
    if not ops:
180
      raise errors.GenericError("A job needs at least one opcode")
181

    
182
    self.queue = queue
183
    self.id = job_id
184
    self.ops = [_QueuedOpCode(op) for op in ops]
185
    self.log_serial = 0
186
    self.received_timestamp = TimeStampNow()
187
    self.start_timestamp = None
188
    self.end_timestamp = None
189

    
190
    # In-memory attributes
191
    self.lock_status = None
192

    
193
    # Condition to wait for changes
194
    self.change = threading.Condition(self.queue._lock)
195

    
196
  def __repr__(self):
197
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
198
              "id=%s" % self.id,
199
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
200

    
201
    return "<%s at %#x>" % (" ".join(status), id(self))
202

    
203
  @classmethod
204
  def Restore(cls, queue, state):
205
    """Restore a _QueuedJob from serialized state:
206

207
    @type queue: L{JobQueue}
208
    @param queue: to which queue the restored job belongs
209
    @type state: dict
210
    @param state: the serialized state
211
    @rtype: _JobQueue
212
    @return: the restored _JobQueue instance
213

214
    """
215
    obj = _QueuedJob.__new__(cls)
216
    obj.queue = queue
217
    obj.id = state["id"]
218
    obj.received_timestamp = state.get("received_timestamp", None)
219
    obj.start_timestamp = state.get("start_timestamp", None)
220
    obj.end_timestamp = state.get("end_timestamp", None)
221

    
222
    # In-memory attributes
223
    obj.lock_status = None
224

    
225
    obj.ops = []
226
    obj.log_serial = 0
227
    for op_state in state["ops"]:
228
      op = _QueuedOpCode.Restore(op_state)
229
      for log_entry in op.log:
230
        obj.log_serial = max(obj.log_serial, log_entry[0])
231
      obj.ops.append(op)
232

    
233
    # Condition to wait for changes
234
    obj.change = threading.Condition(obj.queue._lock)
235

    
236
    return obj
237

    
238
  def Serialize(self):
239
    """Serialize the _JobQueue instance.
240

241
    @rtype: dict
242
    @return: the serialized state
243

244
    """
245
    return {
246
      "id": self.id,
247
      "ops": [op.Serialize() for op in self.ops],
248
      "start_timestamp": self.start_timestamp,
249
      "end_timestamp": self.end_timestamp,
250
      "received_timestamp": self.received_timestamp,
251
      }
252

    
253
  def CalcStatus(self):
254
    """Compute the status of this job.
255

256
    This function iterates over all the _QueuedOpCodes in the job and
257
    based on their status, computes the job status.
258

259
    The algorithm is:
260
      - if we find a cancelled, or finished with error, the job
261
        status will be the same
262
      - otherwise, the last opcode with the status one of:
263
          - waitlock
264
          - canceling
265
          - running
266

267
        will determine the job status
268

269
      - otherwise, it means either all opcodes are queued, or success,
270
        and the job status will be the same
271

272
    @return: the job status
273

274
    """
275
    status = constants.JOB_STATUS_QUEUED
276

    
277
    all_success = True
278
    for op in self.ops:
279
      if op.status == constants.OP_STATUS_SUCCESS:
280
        continue
281

    
282
      all_success = False
283

    
284
      if op.status == constants.OP_STATUS_QUEUED:
285
        pass
286
      elif op.status == constants.OP_STATUS_WAITLOCK:
287
        status = constants.JOB_STATUS_WAITLOCK
288
      elif op.status == constants.OP_STATUS_RUNNING:
289
        status = constants.JOB_STATUS_RUNNING
290
      elif op.status == constants.OP_STATUS_CANCELING:
291
        status = constants.JOB_STATUS_CANCELING
292
        break
293
      elif op.status == constants.OP_STATUS_ERROR:
294
        status = constants.JOB_STATUS_ERROR
295
        # The whole job fails if one opcode failed
296
        break
297
      elif op.status == constants.OP_STATUS_CANCELED:
298
        status = constants.OP_STATUS_CANCELED
299
        break
300

    
301
    if all_success:
302
      status = constants.JOB_STATUS_SUCCESS
303

    
304
    return status
305

    
306
  def GetLogEntries(self, newer_than):
307
    """Selectively returns the log entries.
308

309
    @type newer_than: None or int
310
    @param newer_than: if this is None, return all log entries,
311
        otherwise return only the log entries with serial higher
312
        than this value
313
    @rtype: list
314
    @return: the list of the log entries selected
315

316
    """
317
    if newer_than is None:
318
      serial = -1
319
    else:
320
      serial = newer_than
321

    
322
    entries = []
323
    for op in self.ops:
324
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325

    
326
    return entries
327

    
328
  def GetInfo(self, fields):
329
    """Returns information about a job.
330

331
    @type fields: list
332
    @param fields: names of fields to return
333
    @rtype: list
334
    @return: list with one element for each field
335
    @raise errors.OpExecError: when an invalid field
336
        has been passed
337

338
    """
339
    row = []
340
    for fname in fields:
341
      if fname == "id":
342
        row.append(self.id)
343
      elif fname == "status":
344
        row.append(self.CalcStatus())
345
      elif fname == "ops":
346
        row.append([op.input.__getstate__() for op in self.ops])
347
      elif fname == "opresult":
348
        row.append([op.result for op in self.ops])
349
      elif fname == "opstatus":
350
        row.append([op.status for op in self.ops])
351
      elif fname == "oplog":
352
        row.append([op.log for op in self.ops])
353
      elif fname == "opstart":
354
        row.append([op.start_timestamp for op in self.ops])
355
      elif fname == "opexec":
356
        row.append([op.exec_timestamp for op in self.ops])
357
      elif fname == "opend":
358
        row.append([op.end_timestamp for op in self.ops])
359
      elif fname == "received_ts":
360
        row.append(self.received_timestamp)
361
      elif fname == "start_ts":
362
        row.append(self.start_timestamp)
363
      elif fname == "end_ts":
364
        row.append(self.end_timestamp)
365
      elif fname == "lock_status":
366
        row.append(self.lock_status)
367
      elif fname == "summary":
368
        row.append([op.input.Summary() for op in self.ops])
369
      else:
370
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
371
    return row
372

    
373
  def MarkUnfinishedOps(self, status, result):
374
    """Mark unfinished opcodes with a given status and result.
375

376
    This is an utility function for marking all running or waiting to
377
    be run opcodes with a given status. Opcodes which are already
378
    finalised are not changed.
379

380
    @param status: a given opcode status
381
    @param result: the opcode result
382

383
    """
384
    not_marked = True
385
    for op in self.ops:
386
      if op.status in constants.OPS_FINALIZED:
387
        assert not_marked, "Finalized opcodes found after non-finalized ones"
388
        continue
389
      op.status = status
390
      op.result = result
391
      not_marked = False
392

    
393

    
394
class _OpExecCallbacks(mcpu.OpExecCbBase):
395
  def __init__(self, queue, job, op):
396
    """Initializes this class.
397

398
    @type queue: L{JobQueue}
399
    @param queue: Job queue
400
    @type job: L{_QueuedJob}
401
    @param job: Job object
402
    @type op: L{_QueuedOpCode}
403
    @param op: OpCode
404

405
    """
406
    assert queue, "Queue is missing"
407
    assert job, "Job is missing"
408
    assert op, "Opcode is missing"
409

    
410
    self._queue = queue
411
    self._job = job
412
    self._op = op
413

    
414
  def NotifyStart(self):
415
    """Mark the opcode as running, not lock-waiting.
416

417
    This is called from the mcpu code as a notifier function, when the LU is
418
    finally about to start the Exec() method. Of course, to have end-user
419
    visible results, the opcode must be initially (before calling into
420
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
421

422
    """
423
    self._queue.acquire()
424
    try:
425
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426
                                 constants.OP_STATUS_CANCELING)
427

    
428
      # All locks are acquired by now
429
      self._job.lock_status = None
430

    
431
      # Cancel here if we were asked to
432
      if self._op.status == constants.OP_STATUS_CANCELING:
433
        raise CancelJob()
434

    
435
      self._op.status = constants.OP_STATUS_RUNNING
436
      self._op.exec_timestamp = TimeStampNow()
437
    finally:
438
      self._queue.release()
439

    
440
  def Feedback(self, *args):
441
    """Append a log entry.
442

443
    """
444
    assert len(args) < 3
445

    
446
    if len(args) == 1:
447
      log_type = constants.ELOG_MESSAGE
448
      log_msg = args[0]
449
    else:
450
      (log_type, log_msg) = args
451

    
452
    # The time is split to make serialization easier and not lose
453
    # precision.
454
    timestamp = utils.SplitTime(time.time())
455

    
456
    self._queue.acquire()
457
    try:
458
      self._job.log_serial += 1
459
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460

    
461
      self._job.change.notifyAll()
462
    finally:
463
      self._queue.release()
464

    
465
  def ReportLocks(self, msg):
466
    """Write locking information to the job.
467

468
    Called whenever the LU processor is waiting for a lock or has acquired one.
469

470
    """
471
    # Not getting the queue lock because this is a single assignment
472
    self._job.lock_status = msg
473

    
474

    
475
class _JobQueueWorker(workerpool.BaseWorker):
476
  """The actual job workers.
477

478
  """
479
  def RunTask(self, job): # pylint: disable-msg=W0221
480
    """Job executor.
481

482
    This functions processes a job. It is closely tied to the _QueuedJob and
483
    _QueuedOpCode classes.
484

485
    @type job: L{_QueuedJob}
486
    @param job: the job to be processed
487

488
    """
489
    logging.info("Processing job %s", job.id)
490
    proc = mcpu.Processor(self.pool.queue.context, job.id)
491
    queue = job.queue
492
    try:
493
      try:
494
        count = len(job.ops)
495
        for idx, op in enumerate(job.ops):
496
          op_summary = op.input.Summary()
497
          if op.status == constants.OP_STATUS_SUCCESS:
498
            # this is a job that was partially completed before master
499
            # daemon shutdown, so it can be expected that some opcodes
500
            # are already completed successfully (if any did error
501
            # out, then the whole job should have been aborted and not
502
            # resubmitted for processing)
503
            logging.info("Op %s/%s: opcode %s already processed, skipping",
504
                         idx + 1, count, op_summary)
505
            continue
506
          try:
507
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
508
                         op_summary)
509

    
510
            queue.acquire()
511
            try:
512
              if op.status == constants.OP_STATUS_CANCELED:
513
                raise CancelJob()
514
              assert op.status == constants.OP_STATUS_QUEUED
515
              op.status = constants.OP_STATUS_WAITLOCK
516
              op.result = None
517
              op.start_timestamp = TimeStampNow()
518
              if idx == 0: # first opcode
519
                job.start_timestamp = op.start_timestamp
520
              queue.UpdateJobUnlocked(job)
521

    
522
              input_opcode = op.input
523
            finally:
524
              queue.release()
525

    
526
            # Make sure not to hold queue lock while calling ExecOpCode
527
            result = proc.ExecOpCode(input_opcode,
528
                                     _OpExecCallbacks(queue, job, op))
529

    
530
            queue.acquire()
531
            try:
532
              op.status = constants.OP_STATUS_SUCCESS
533
              op.result = result
534
              op.end_timestamp = TimeStampNow()
535
              queue.UpdateJobUnlocked(job)
536
            finally:
537
              queue.release()
538

    
539
            logging.info("Op %s/%s: Successfully finished opcode %s",
540
                         idx + 1, count, op_summary)
541
          except CancelJob:
542
            # Will be handled further up
543
            raise
544
          except Exception, err:
545
            queue.acquire()
546
            try:
547
              try:
548
                op.status = constants.OP_STATUS_ERROR
549
                if isinstance(err, errors.GenericError):
550
                  op.result = errors.EncodeException(err)
551
                else:
552
                  op.result = str(err)
553
                op.end_timestamp = TimeStampNow()
554
                logging.info("Op %s/%s: Error in opcode %s: %s",
555
                             idx + 1, count, op_summary, err)
556
              finally:
557
                queue.UpdateJobUnlocked(job)
558
            finally:
559
              queue.release()
560
            raise
561

    
562
      except CancelJob:
563
        queue.acquire()
564
        try:
565
          queue.CancelJobUnlocked(job)
566
        finally:
567
          queue.release()
568
      except errors.GenericError, err:
569
        logging.exception("Ganeti exception")
570
      except:
571
        logging.exception("Unhandled exception")
572
    finally:
573
      queue.acquire()
574
      try:
575
        try:
576
          job.lock_status = None
577
          job.end_timestamp = TimeStampNow()
578
          queue.UpdateJobUnlocked(job)
579
        finally:
580
          job_id = job.id
581
          status = job.CalcStatus()
582
      finally:
583
        queue.release()
584

    
585
      logging.info("Finished job %s, status = %s", job_id, status)
586

    
587

    
588
class _JobQueueWorkerPool(workerpool.WorkerPool):
589
  """Simple class implementing a job-processing workerpool.
590

591
  """
592
  def __init__(self, queue):
593
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
594
                                              JOBQUEUE_THREADS,
595
                                              _JobQueueWorker)
596
    self.queue = queue
597

    
598

    
599
def _RequireOpenQueue(fn):
600
  """Decorator for "public" functions.
601

602
  This function should be used for all 'public' functions. That is,
603
  functions usually called from other classes. Note that this should
604
  be applied only to methods (not plain functions), since it expects
605
  that the decorated function is called with a first argument that has
606
  a '_queue_filelock' argument.
607

608
  @warning: Use this decorator only after utils.LockedMethod!
609

610
  Example::
611
    @utils.LockedMethod
612
    @_RequireOpenQueue
613
    def Example(self):
614
      pass
615

616
  """
617
  def wrapper(self, *args, **kwargs):
618
    # pylint: disable-msg=W0212
619
    assert self._queue_filelock is not None, "Queue should be open"
620
    return fn(self, *args, **kwargs)
621
  return wrapper
622

    
623

    
624
class JobQueue(object):
625
  """Queue used to manage the jobs.
626

627
  @cvar _RE_JOB_FILE: regex matching the valid job file names
628

629
  """
630
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
631

    
632
  def __init__(self, context):
633
    """Constructor for JobQueue.
634

635
    The constructor will initialize the job queue object and then
636
    start loading the current jobs from disk, either for starting them
637
    (if they were queue) or for aborting them (if they were already
638
    running).
639

640
    @type context: GanetiContext
641
    @param context: the context object for access to the configuration
642
        data and other ganeti objects
643

644
    """
645
    self.context = context
646
    self._memcache = weakref.WeakValueDictionary()
647
    self._my_hostname = utils.HostInfo().name
648

    
649
    # Locking
650
    self._lock = threading.Lock()
651
    self.acquire = self._lock.acquire
652
    self.release = self._lock.release
653

    
654
    # Initialize the queue, and acquire the filelock.
655
    # This ensures no other process is working on the job queue.
656
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
657

    
658
    # Read serial file
659
    self._last_serial = jstore.ReadSerial()
660
    assert self._last_serial is not None, ("Serial file was modified between"
661
                                           " check in jstore and here")
662

    
663
    # Get initial list of nodes
664
    self._nodes = dict((n.name, n.primary_ip)
665
                       for n in self.context.cfg.GetAllNodesInfo().values()
666
                       if n.master_candidate)
667

    
668
    # Remove master node
669
    self._nodes.pop(self._my_hostname, None)
670

    
671
    # TODO: Check consistency across nodes
672

    
673
    self._queue_size = 0
674
    self._UpdateQueueSizeUnlocked()
675
    self._drained = self._IsQueueMarkedDrain()
676

    
677
    # Setup worker pool
678
    self._wpool = _JobQueueWorkerPool(self)
679
    try:
680
      # We need to lock here because WorkerPool.AddTask() may start a job while
681
      # we're still doing our work.
682
      self.acquire()
683
      try:
684
        logging.info("Inspecting job queue")
685

    
686
        all_job_ids = self._GetJobIDsUnlocked()
687
        jobs_count = len(all_job_ids)
688
        lastinfo = time.time()
689
        for idx, job_id in enumerate(all_job_ids):
690
          # Give an update every 1000 jobs or 10 seconds
691
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
692
              idx == (jobs_count - 1)):
693
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
694
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
695
            lastinfo = time.time()
696

    
697
          job = self._LoadJobUnlocked(job_id)
698

    
699
          # a failure in loading the job can cause 'None' to be returned
700
          if job is None:
701
            continue
702

    
703
          status = job.CalcStatus()
704

    
705
          if status in (constants.JOB_STATUS_QUEUED, ):
706
            self._wpool.AddTask(job)
707

    
708
          elif status in (constants.JOB_STATUS_RUNNING,
709
                          constants.JOB_STATUS_WAITLOCK,
710
                          constants.JOB_STATUS_CANCELING):
711
            logging.warning("Unfinished job %s found: %s", job.id, job)
712
            try:
713
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
714
                                    "Unclean master daemon shutdown")
715
            finally:
716
              self.UpdateJobUnlocked(job)
717

    
718
        logging.info("Job queue inspection finished")
719
      finally:
720
        self.release()
721
    except:
722
      self._wpool.TerminateWorkers()
723
      raise
724

    
725
  @utils.LockedMethod
726
  @_RequireOpenQueue
727
  def AddNode(self, node):
728
    """Register a new node with the queue.
729

730
    @type node: L{objects.Node}
731
    @param node: the node object to be added
732

733
    """
734
    node_name = node.name
735
    assert node_name != self._my_hostname
736

    
737
    # Clean queue directory on added node
738
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
739
    msg = result.fail_msg
740
    if msg:
741
      logging.warning("Cannot cleanup queue directory on node %s: %s",
742
                      node_name, msg)
743

    
744
    if not node.master_candidate:
745
      # remove if existing, ignoring errors
746
      self._nodes.pop(node_name, None)
747
      # and skip the replication of the job ids
748
      return
749

    
750
    # Upload the whole queue excluding archived jobs
751
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
752

    
753
    # Upload current serial file
754
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
755

    
756
    for file_name in files:
757
      # Read file content
758
      content = utils.ReadFile(file_name)
759

    
760
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
761
                                                  [node.primary_ip],
762
                                                  file_name, content)
763
      msg = result[node_name].fail_msg
764
      if msg:
765
        logging.error("Failed to upload file %s to node %s: %s",
766
                      file_name, node_name, msg)
767

    
768
    self._nodes[node_name] = node.primary_ip
769

    
770
  @utils.LockedMethod
771
  @_RequireOpenQueue
772
  def RemoveNode(self, node_name):
773
    """Callback called when removing nodes from the cluster.
774

775
    @type node_name: str
776
    @param node_name: the name of the node to remove
777

778
    """
779
    self._nodes.pop(node_name, None)
780

    
781
  @staticmethod
782
  def _CheckRpcResult(result, nodes, failmsg):
783
    """Verifies the status of an RPC call.
784

785
    Since we aim to keep consistency should this node (the current
786
    master) fail, we will log errors if our rpc fail, and especially
787
    log the case when more than half of the nodes fails.
788

789
    @param result: the data as returned from the rpc call
790
    @type nodes: list
791
    @param nodes: the list of nodes we made the call to
792
    @type failmsg: str
793
    @param failmsg: the identifier to be used for logging
794

795
    """
796
    failed = []
797
    success = []
798

    
799
    for node in nodes:
800
      msg = result[node].fail_msg
801
      if msg:
802
        failed.append(node)
803
        logging.error("RPC call %s (%s) failed on node %s: %s",
804
                      result[node].call, failmsg, node, msg)
805
      else:
806
        success.append(node)
807

    
808
    # +1 for the master node
809
    if (len(success) + 1) < len(failed):
810
      # TODO: Handle failing nodes
811
      logging.error("More than half of the nodes failed")
812

    
813
  def _GetNodeIp(self):
814
    """Helper for returning the node name/ip list.
815

816
    @rtype: (list, list)
817
    @return: a tuple of two lists, the first one with the node
818
        names and the second one with the node addresses
819

820
    """
821
    name_list = self._nodes.keys()
822
    addr_list = [self._nodes[name] for name in name_list]
823
    return name_list, addr_list
824

    
825
  def _UpdateJobQueueFile(self, file_name, data, replicate):
826
    """Writes a file locally and then replicates it to all nodes.
827

828
    This function will replace the contents of a file on the local
829
    node and then replicate it to all the other nodes we have.
830

831
    @type file_name: str
832
    @param file_name: the path of the file to be replicated
833
    @type data: str
834
    @param data: the new contents of the file
835
    @type replicate: boolean
836
    @param replicate: whether to spread the changes to the remote nodes
837

838
    """
839
    utils.WriteFile(file_name, data=data)
840

    
841
    if replicate:
842
      names, addrs = self._GetNodeIp()
843
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
844
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
845

    
846
  def _RenameFilesUnlocked(self, rename):
847
    """Renames a file locally and then replicate the change.
848

849
    This function will rename a file in the local queue directory
850
    and then replicate this rename to all the other nodes we have.
851

852
    @type rename: list of (old, new)
853
    @param rename: List containing tuples mapping old to new names
854

855
    """
856
    # Rename them locally
857
    for old, new in rename:
858
      utils.RenameFile(old, new, mkdir=True)
859

    
860
    # ... and on all nodes
861
    names, addrs = self._GetNodeIp()
862
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
863
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
864

    
865
  @staticmethod
866
  def _FormatJobID(job_id):
867
    """Convert a job ID to string format.
868

869
    Currently this just does C{str(job_id)} after performing some
870
    checks, but if we want to change the job id format this will
871
    abstract this change.
872

873
    @type job_id: int or long
874
    @param job_id: the numeric job id
875
    @rtype: str
876
    @return: the formatted job id
877

878
    """
879
    if not isinstance(job_id, (int, long)):
880
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
881
    if job_id < 0:
882
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
883

    
884
    return str(job_id)
885

    
886
  @classmethod
887
  def _GetArchiveDirectory(cls, job_id):
888
    """Returns the archive directory for a job.
889

890
    @type job_id: str
891
    @param job_id: Job identifier
892
    @rtype: str
893
    @return: Directory name
894

895
    """
896
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
897

    
898
  def _NewSerialsUnlocked(self, count):
899
    """Generates a new job identifier.
900

901
    Job identifiers are unique during the lifetime of a cluster.
902

903
    @type count: integer
904
    @param count: how many serials to return
905
    @rtype: str
906
    @return: a string representing the job identifier.
907

908
    """
909
    assert count > 0
910
    # New number
911
    serial = self._last_serial + count
912

    
913
    # Write to file
914
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
915
                             "%s\n" % serial, True)
916

    
917
    result = [self._FormatJobID(v)
918
              for v in range(self._last_serial, serial + 1)]
919
    # Keep it only if we were able to write the file
920
    self._last_serial = serial
921

    
922
    return result
923

    
924
  @staticmethod
925
  def _GetJobPath(job_id):
926
    """Returns the job file for a given job id.
927

928
    @type job_id: str
929
    @param job_id: the job identifier
930
    @rtype: str
931
    @return: the path to the job file
932

933
    """
934
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
935

    
936
  @classmethod
937
  def _GetArchivedJobPath(cls, job_id):
938
    """Returns the archived job file for a give job id.
939

940
    @type job_id: str
941
    @param job_id: the job identifier
942
    @rtype: str
943
    @return: the path to the archived job file
944

945
    """
946
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
947
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
948

    
949
  def _GetJobIDsUnlocked(self, sort=True):
950
    """Return all known job IDs.
951

952
    The method only looks at disk because it's a requirement that all
953
    jobs are present on disk (so in the _memcache we don't have any
954
    extra IDs).
955

956
    @type sort: boolean
957
    @param sort: perform sorting on the returned job ids
958
    @rtype: list
959
    @return: the list of job IDs
960

961
    """
962
    jlist = []
963
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
964
      m = self._RE_JOB_FILE.match(filename)
965
      if m:
966
        jlist.append(m.group(1))
967
    if sort:
968
      jlist = utils.NiceSort(jlist)
969
    return jlist
970

    
971
  def _LoadJobUnlocked(self, job_id):
972
    """Loads a job from the disk or memory.
973

974
    Given a job id, this will return the cached job object if
975
    existing, or try to load the job from the disk. If loading from
976
    disk, it will also add the job to the cache.
977

978
    @param job_id: the job id
979
    @rtype: L{_QueuedJob} or None
980
    @return: either None or the job object
981

982
    """
983
    job = self._memcache.get(job_id, None)
984
    if job:
985
      logging.debug("Found job %s in memcache", job_id)
986
      return job
987

    
988
    try:
989
      job = self._LoadJobFromDisk(job_id)
990
    except errors.JobFileCorrupted:
991
      old_path = self._GetJobPath(job_id)
992
      new_path = self._GetArchivedJobPath(job_id)
993
      if old_path == new_path:
994
        # job already archived (future case)
995
        logging.exception("Can't parse job %s", job_id)
996
      else:
997
        # non-archived case
998
        logging.exception("Can't parse job %s, will archive.", job_id)
999
        self._RenameFilesUnlocked([(old_path, new_path)])
1000
      return None
1001

    
1002
    self._memcache[job_id] = job
1003
    logging.debug("Added job %s to the cache", job_id)
1004
    return job
1005

    
1006
  def _LoadJobFromDisk(self, job_id):
1007
    """Load the given job file from disk.
1008

1009
    Given a job file, read, load and restore it in a _QueuedJob format.
1010

1011
    @type job_id: string
1012
    @param job_id: job identifier
1013
    @rtype: L{_QueuedJob} or None
1014
    @return: either None or the job object
1015

1016
    """
1017
    filepath = self._GetJobPath(job_id)
1018
    logging.debug("Loading job from %s", filepath)
1019
    try:
1020
      raw_data = utils.ReadFile(filepath)
1021
    except EnvironmentError, err:
1022
      if err.errno in (errno.ENOENT, ):
1023
        return None
1024
      raise
1025

    
1026
    try:
1027
      data = serializer.LoadJson(raw_data)
1028
      job = _QueuedJob.Restore(self, data)
1029
    except Exception, err: # pylint: disable-msg=W0703
1030
      raise errors.JobFileCorrupted(err)
1031

    
1032
    return job
1033

    
1034
  def _GetJobsUnlocked(self, job_ids):
1035
    """Return a list of jobs based on their IDs.
1036

1037
    @type job_ids: list
1038
    @param job_ids: either an empty list (meaning all jobs),
1039
        or a list of job IDs
1040
    @rtype: list
1041
    @return: the list of job objects
1042

1043
    """
1044
    if not job_ids:
1045
      job_ids = self._GetJobIDsUnlocked()
1046

    
1047
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1048

    
1049
  def SafeLoadJobFromDisk(self, job_id):
1050
    """Load the given job file from disk.
1051

1052
    Given a job file, read, load and restore it in a _QueuedJob format.
1053
    In case of error reading the job, it gets returned as None, and the
1054
    exception is logged.
1055

1056
    @type job_id: string
1057
    @param job_id: job identifier
1058
    @rtype: L{_QueuedJob} or None
1059
    @return: either None or the job object
1060

1061
    """
1062
    try:
1063
      return self._LoadJobFromDisk(job_id)
1064
    except (errors.JobFileCorrupted, EnvironmentError):
1065
      logging.exception("Can't load/parse job %s", job_id)
1066
      return None
1067

    
1068
  @staticmethod
1069
  def _IsQueueMarkedDrain():
1070
    """Check if the queue is marked from drain.
1071

1072
    This currently uses the queue drain file, which makes it a
1073
    per-node flag. In the future this can be moved to the config file.
1074

1075
    @rtype: boolean
1076
    @return: True of the job queue is marked for draining
1077

1078
    """
1079
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1080

    
1081
  def _UpdateQueueSizeUnlocked(self):
1082
    """Update the queue size.
1083

1084
    """
1085
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1086

    
1087
  @utils.LockedMethod
1088
  @_RequireOpenQueue
1089
  def SetDrainFlag(self, drain_flag):
1090
    """Sets the drain flag for the queue.
1091

1092
    @type drain_flag: boolean
1093
    @param drain_flag: Whether to set or unset the drain flag
1094

1095
    """
1096
    if drain_flag:
1097
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1098
    else:
1099
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1100

    
1101
    self._drained = drain_flag
1102

    
1103
    return True
1104

    
1105
  @_RequireOpenQueue
1106
  def _SubmitJobUnlocked(self, job_id, ops):
1107
    """Create and store a new job.
1108

1109
    This enters the job into our job queue and also puts it on the new
1110
    queue, in order for it to be picked up by the queue processors.
1111

1112
    @type job_id: job ID
1113
    @param job_id: the job ID for the new job
1114
    @type ops: list
1115
    @param ops: The list of OpCodes that will become the new job.
1116
    @rtype: L{_QueuedJob}
1117
    @return: the job object to be queued
1118
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1119
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1120

1121
    """
1122
    # Ok when sharing the big job queue lock, as the drain file is created when
1123
    # the lock is exclusive.
1124
    if self._drained:
1125
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1126

    
1127
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1128
      raise errors.JobQueueFull()
1129

    
1130
    job = _QueuedJob(self, job_id, ops)
1131

    
1132
    # Write to disk
1133
    self.UpdateJobUnlocked(job)
1134

    
1135
    self._queue_size += 1
1136

    
1137
    logging.debug("Adding new job %s to the cache", job_id)
1138
    self._memcache[job_id] = job
1139

    
1140
    return job
1141

    
1142
  @utils.LockedMethod
1143
  @_RequireOpenQueue
1144
  def SubmitJob(self, ops):
1145
    """Create and store a new job.
1146

1147
    @see: L{_SubmitJobUnlocked}
1148

1149
    """
1150
    job_id = self._NewSerialsUnlocked(1)[0]
1151
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1152
    return job_id
1153

    
1154
  @utils.LockedMethod
1155
  @_RequireOpenQueue
1156
  def SubmitManyJobs(self, jobs):
1157
    """Create and store multiple jobs.
1158

1159
    @see: L{_SubmitJobUnlocked}
1160

1161
    """
1162
    results = []
1163
    tasks = []
1164
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1165
    for job_id, ops in zip(all_job_ids, jobs):
1166
      try:
1167
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1168
        status = True
1169
        data = job_id
1170
      except errors.GenericError, err:
1171
        data = str(err)
1172
        status = False
1173
      results.append((status, data))
1174
    self._wpool.AddManyTasks(tasks)
1175

    
1176
    return results
1177

    
1178
  @_RequireOpenQueue
1179
  def UpdateJobUnlocked(self, job, replicate=True):
1180
    """Update a job's on disk storage.
1181

1182
    After a job has been modified, this function needs to be called in
1183
    order to write the changes to disk and replicate them to the other
1184
    nodes.
1185

1186
    @type job: L{_QueuedJob}
1187
    @param job: the changed job
1188
    @type replicate: boolean
1189
    @param replicate: whether to replicate the change to remote nodes
1190

1191
    """
1192
    filename = self._GetJobPath(job.id)
1193
    data = serializer.DumpJson(job.Serialize(), indent=False)
1194
    logging.debug("Writing job %s to %s", job.id, filename)
1195
    self._UpdateJobQueueFile(filename, data, replicate)
1196

    
1197
    # Notify waiters about potential changes
1198
    job.change.notifyAll()
1199

    
1200
  @utils.LockedMethod
1201
  @_RequireOpenQueue
1202
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1203
                        timeout):
1204
    """Waits for changes in a job.
1205

1206
    @type job_id: string
1207
    @param job_id: Job identifier
1208
    @type fields: list of strings
1209
    @param fields: Which fields to check for changes
1210
    @type prev_job_info: list or None
1211
    @param prev_job_info: Last job information returned
1212
    @type prev_log_serial: int
1213
    @param prev_log_serial: Last job message serial number
1214
    @type timeout: float
1215
    @param timeout: maximum time to wait
1216
    @rtype: tuple (job info, log entries)
1217
    @return: a tuple of the job information as required via
1218
        the fields parameter, and the log entries as a list
1219

1220
        if the job has not changed and the timeout has expired,
1221
        we instead return a special value,
1222
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1223
        as such by the clients
1224

1225
    """
1226
    job = self._LoadJobUnlocked(job_id)
1227
    if not job:
1228
      logging.debug("Job %s not found", job_id)
1229
      return None
1230

    
1231
    def _CheckForChanges():
1232
      logging.debug("Waiting for changes in job %s", job_id)
1233

    
1234
      status = job.CalcStatus()
1235
      job_info = job.GetInfo(fields)
1236
      log_entries = job.GetLogEntries(prev_log_serial)
1237

    
1238
      # Serializing and deserializing data can cause type changes (e.g. from
1239
      # tuple to list) or precision loss. We're doing it here so that we get
1240
      # the same modifications as the data received from the client. Without
1241
      # this, the comparison afterwards might fail without the data being
1242
      # significantly different.
1243
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1244
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1245

    
1246
      # Don't even try to wait if the job is no longer running, there will be
1247
      # no changes.
1248
      if (status not in (constants.JOB_STATUS_QUEUED,
1249
                         constants.JOB_STATUS_RUNNING,
1250
                         constants.JOB_STATUS_WAITLOCK) or
1251
          prev_job_info != job_info or
1252
          (log_entries and prev_log_serial != log_entries[0][0])):
1253
        logging.debug("Job %s changed", job_id)
1254
        return (job_info, log_entries)
1255

    
1256
      raise utils.RetryAgain()
1257

    
1258
    try:
1259
      # Setting wait function to release the queue lock while waiting
1260
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1261
                         wait_fn=job.change.wait)
1262
    except utils.RetryTimeout:
1263
      return constants.JOB_NOTCHANGED
1264

    
1265
  @utils.LockedMethod
1266
  @_RequireOpenQueue
1267
  def CancelJob(self, job_id):
1268
    """Cancels a job.
1269

1270
    This will only succeed if the job has not started yet.
1271

1272
    @type job_id: string
1273
    @param job_id: job ID of job to be cancelled.
1274

1275
    """
1276
    logging.info("Cancelling job %s", job_id)
1277

    
1278
    job = self._LoadJobUnlocked(job_id)
1279
    if not job:
1280
      logging.debug("Job %s not found", job_id)
1281
      return (False, "Job %s not found" % job_id)
1282

    
1283
    job_status = job.CalcStatus()
1284

    
1285
    if job_status not in (constants.JOB_STATUS_QUEUED,
1286
                          constants.JOB_STATUS_WAITLOCK):
1287
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1288
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1289

    
1290
    if job_status == constants.JOB_STATUS_QUEUED:
1291
      self.CancelJobUnlocked(job)
1292
      return (True, "Job %s canceled" % job.id)
1293

    
1294
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1295
      # The worker will notice the new status and cancel the job
1296
      try:
1297
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1298
      finally:
1299
        self.UpdateJobUnlocked(job)
1300
      return (True, "Job %s will be canceled" % job.id)
1301

    
1302
  @_RequireOpenQueue
1303
  def CancelJobUnlocked(self, job):
1304
    """Marks a job as canceled.
1305

1306
    """
1307
    try:
1308
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1309
                            "Job canceled by request")
1310
    finally:
1311
      self.UpdateJobUnlocked(job)
1312

    
1313
  @_RequireOpenQueue
1314
  def _ArchiveJobsUnlocked(self, jobs):
1315
    """Archives jobs.
1316

1317
    @type jobs: list of L{_QueuedJob}
1318
    @param jobs: Job objects
1319
    @rtype: int
1320
    @return: Number of archived jobs
1321

1322
    """
1323
    archive_jobs = []
1324
    rename_files = []
1325
    for job in jobs:
1326
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1327
                                  constants.JOB_STATUS_SUCCESS,
1328
                                  constants.JOB_STATUS_ERROR):
1329
        logging.debug("Job %s is not yet done", job.id)
1330
        continue
1331

    
1332
      archive_jobs.append(job)
1333

    
1334
      old = self._GetJobPath(job.id)
1335
      new = self._GetArchivedJobPath(job.id)
1336
      rename_files.append((old, new))
1337

    
1338
    # TODO: What if 1..n files fail to rename?
1339
    self._RenameFilesUnlocked(rename_files)
1340

    
1341
    logging.debug("Successfully archived job(s) %s",
1342
                  utils.CommaJoin(job.id for job in archive_jobs))
1343

    
1344
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1345
    # the files, we update the cached queue size from the filesystem. When we
1346
    # get around to fix the TODO: above, we can use the number of actually
1347
    # archived jobs to fix this.
1348
    self._UpdateQueueSizeUnlocked()
1349
    return len(archive_jobs)
1350

    
1351
  @utils.LockedMethod
1352
  @_RequireOpenQueue
1353
  def ArchiveJob(self, job_id):
1354
    """Archives a job.
1355

1356
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1357

1358
    @type job_id: string
1359
    @param job_id: Job ID of job to be archived.
1360
    @rtype: bool
1361
    @return: Whether job was archived
1362

1363
    """
1364
    logging.info("Archiving job %s", job_id)
1365

    
1366
    job = self._LoadJobUnlocked(job_id)
1367
    if not job:
1368
      logging.debug("Job %s not found", job_id)
1369
      return False
1370

    
1371
    return self._ArchiveJobsUnlocked([job]) == 1
1372

    
1373
  @utils.LockedMethod
1374
  @_RequireOpenQueue
1375
  def AutoArchiveJobs(self, age, timeout):
1376
    """Archives all jobs based on age.
1377

1378
    The method will archive all jobs which are older than the age
1379
    parameter. For jobs that don't have an end timestamp, the start
1380
    timestamp will be considered. The special '-1' age will cause
1381
    archival of all jobs (that are not running or queued).
1382

1383
    @type age: int
1384
    @param age: the minimum age in seconds
1385

1386
    """
1387
    logging.info("Archiving jobs with age more than %s seconds", age)
1388

    
1389
    now = time.time()
1390
    end_time = now + timeout
1391
    archived_count = 0
1392
    last_touched = 0
1393

    
1394
    all_job_ids = self._GetJobIDsUnlocked()
1395
    pending = []
1396
    for idx, job_id in enumerate(all_job_ids):
1397
      last_touched = idx + 1
1398

    
1399
      # Not optimal because jobs could be pending
1400
      # TODO: Measure average duration for job archival and take number of
1401
      # pending jobs into account.
1402
      if time.time() > end_time:
1403
        break
1404

    
1405
      # Returns None if the job failed to load
1406
      job = self._LoadJobUnlocked(job_id)
1407
      if job:
1408
        if job.end_timestamp is None:
1409
          if job.start_timestamp is None:
1410
            job_age = job.received_timestamp
1411
          else:
1412
            job_age = job.start_timestamp
1413
        else:
1414
          job_age = job.end_timestamp
1415

    
1416
        if age == -1 or now - job_age[0] > age:
1417
          pending.append(job)
1418

    
1419
          # Archive 10 jobs at a time
1420
          if len(pending) >= 10:
1421
            archived_count += self._ArchiveJobsUnlocked(pending)
1422
            pending = []
1423

    
1424
    if pending:
1425
      archived_count += self._ArchiveJobsUnlocked(pending)
1426

    
1427
    return (archived_count, len(all_job_ids) - last_touched)
1428

    
1429
  @utils.LockedMethod
1430
  @_RequireOpenQueue
1431
  def QueryJobs(self, job_ids, fields):
1432
    """Returns a list of jobs in queue.
1433

1434
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1435
    processing for each job.
1436

1437
    @type job_ids: list
1438
    @param job_ids: sequence of job identifiers or None for all
1439
    @type fields: list
1440
    @param fields: names of fields to return
1441
    @rtype: list
1442
    @return: list one element per job, each element being list with
1443
        the requested fields
1444

1445
    """
1446
    jobs = []
1447

    
1448
    for job in self._GetJobsUnlocked(job_ids):
1449
      if job is None:
1450
        jobs.append(None)
1451
      else:
1452
        jobs.append(job.GetInfo(fields))
1453

    
1454
    return jobs
1455

    
1456
  @utils.LockedMethod
1457
  @_RequireOpenQueue
1458
  def Shutdown(self):
1459
    """Stops the job queue.
1460

1461
    This shutdowns all the worker threads an closes the queue.
1462

1463
    """
1464
    self._wpool.TerminateWorkers()
1465

    
1466
    self._queue_filelock.Close()
1467
    self._queue_filelock = None