Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6c2549d6

History | View | Annotate | Download (45.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
try:
41
  # pylint: disable-msg=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56

    
57

    
58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60

    
61

    
62
class CancelJob(Exception):
63
  """Special exception to cancel a job.
64

65
  """
66

    
67

    
68
def TimeStampNow():
69
  """Returns the current timestamp.
70

71
  @rtype: tuple
72
  @return: the current time in the (seconds, microseconds) format
73

74
  """
75
  return utils.SplitTime(time.time())
76

    
77

    
78
class _QueuedOpCode(object):
79
  """Encapsulates an opcode object.
80

81
  @ivar log: holds the execution log and consists of tuples
82
  of the form C{(log_serial, timestamp, level, message)}
83
  @ivar input: the OpCode we encapsulate
84
  @ivar status: the current status
85
  @ivar result: the result of the LU execution
86
  @ivar start_timestamp: timestamp for the start of the execution
87
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
88
  @ivar stop_timestamp: timestamp for the end of the execution
89

90
  """
91
  __slots__ = ["input", "status", "result", "log",
92
               "start_timestamp", "exec_timestamp", "end_timestamp",
93
               "__weakref__"]
94

    
95
  def __init__(self, op):
96
    """Constructor for the _QuededOpCode.
97

98
    @type op: L{opcodes.OpCode}
99
    @param op: the opcode we encapsulate
100

101
    """
102
    self.input = op
103
    self.status = constants.OP_STATUS_QUEUED
104
    self.result = None
105
    self.log = []
106
    self.start_timestamp = None
107
    self.exec_timestamp = None
108
    self.end_timestamp = None
109

    
110
  @classmethod
111
  def Restore(cls, state):
112
    """Restore the _QueuedOpCode from the serialized form.
113

114
    @type state: dict
115
    @param state: the serialized state
116
    @rtype: _QueuedOpCode
117
    @return: a new _QueuedOpCode instance
118

119
    """
120
    obj = _QueuedOpCode.__new__(cls)
121
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
122
    obj.status = state["status"]
123
    obj.result = state["result"]
124
    obj.log = state["log"]
125
    obj.start_timestamp = state.get("start_timestamp", None)
126
    obj.exec_timestamp = state.get("exec_timestamp", None)
127
    obj.end_timestamp = state.get("end_timestamp", None)
128
    return obj
129

    
130
  def Serialize(self):
131
    """Serializes this _QueuedOpCode.
132

133
    @rtype: dict
134
    @return: the dictionary holding the serialized state
135

136
    """
137
    return {
138
      "input": self.input.__getstate__(),
139
      "status": self.status,
140
      "result": self.result,
141
      "log": self.log,
142
      "start_timestamp": self.start_timestamp,
143
      "exec_timestamp": self.exec_timestamp,
144
      "end_timestamp": self.end_timestamp,
145
      }
146

    
147

    
148
class _QueuedJob(object):
149
  """In-memory job representation.
150

151
  This is what we use to track the user-submitted jobs. Locking must
152
  be taken care of by users of this class.
153

154
  @type queue: L{JobQueue}
155
  @ivar queue: the parent queue
156
  @ivar id: the job ID
157
  @type ops: list
158
  @ivar ops: the list of _QueuedOpCode that constitute the job
159
  @type log_serial: int
160
  @ivar log_serial: holds the index for the next log entry
161
  @ivar received_timestamp: the timestamp for when the job was received
162
  @ivar start_timestmap: the timestamp for start of execution
163
  @ivar end_timestamp: the timestamp for end of execution
164
  @ivar lock_status: In-memory locking information for debugging
165
  @ivar change: a Condition variable we use for waiting for job changes
166

167
  """
168
  # pylint: disable-msg=W0212
169
  __slots__ = ["queue", "id", "ops", "log_serial",
170
               "received_timestamp", "start_timestamp", "end_timestamp",
171
               "lock_status", "change",
172
               "__weakref__"]
173

    
174
  def __init__(self, queue, job_id, ops):
175
    """Constructor for the _QueuedJob.
176

177
    @type queue: L{JobQueue}
178
    @param queue: our parent queue
179
    @type job_id: job_id
180
    @param job_id: our job id
181
    @type ops: list
182
    @param ops: the list of opcodes we hold, which will be encapsulated
183
        in _QueuedOpCodes
184

185
    """
186
    if not ops:
187
      raise errors.GenericError("A job needs at least one opcode")
188

    
189
    self.queue = queue
190
    self.id = job_id
191
    self.ops = [_QueuedOpCode(op) for op in ops]
192
    self.log_serial = 0
193
    self.received_timestamp = TimeStampNow()
194
    self.start_timestamp = None
195
    self.end_timestamp = None
196

    
197
    # In-memory attributes
198
    self.lock_status = None
199

    
200
    # Condition to wait for changes
201
    self.change = threading.Condition(self.queue._lock)
202

    
203
  def __repr__(self):
204
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
205
              "id=%s" % self.id,
206
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
207

    
208
    return "<%s at %#x>" % (" ".join(status), id(self))
209

    
210
  @classmethod
211
  def Restore(cls, queue, state):
212
    """Restore a _QueuedJob from serialized state:
213

214
    @type queue: L{JobQueue}
215
    @param queue: to which queue the restored job belongs
216
    @type state: dict
217
    @param state: the serialized state
218
    @rtype: _JobQueue
219
    @return: the restored _JobQueue instance
220

221
    """
222
    obj = _QueuedJob.__new__(cls)
223
    obj.queue = queue
224
    obj.id = state["id"]
225
    obj.received_timestamp = state.get("received_timestamp", None)
226
    obj.start_timestamp = state.get("start_timestamp", None)
227
    obj.end_timestamp = state.get("end_timestamp", None)
228

    
229
    # In-memory attributes
230
    obj.lock_status = None
231

    
232
    obj.ops = []
233
    obj.log_serial = 0
234
    for op_state in state["ops"]:
235
      op = _QueuedOpCode.Restore(op_state)
236
      for log_entry in op.log:
237
        obj.log_serial = max(obj.log_serial, log_entry[0])
238
      obj.ops.append(op)
239

    
240
    # Condition to wait for changes
241
    obj.change = threading.Condition(obj.queue._lock)
242

    
243
    return obj
244

    
245
  def Serialize(self):
246
    """Serialize the _JobQueue instance.
247

248
    @rtype: dict
249
    @return: the serialized state
250

251
    """
252
    return {
253
      "id": self.id,
254
      "ops": [op.Serialize() for op in self.ops],
255
      "start_timestamp": self.start_timestamp,
256
      "end_timestamp": self.end_timestamp,
257
      "received_timestamp": self.received_timestamp,
258
      }
259

    
260
  def CalcStatus(self):
261
    """Compute the status of this job.
262

263
    This function iterates over all the _QueuedOpCodes in the job and
264
    based on their status, computes the job status.
265

266
    The algorithm is:
267
      - if we find a cancelled, or finished with error, the job
268
        status will be the same
269
      - otherwise, the last opcode with the status one of:
270
          - waitlock
271
          - canceling
272
          - running
273

274
        will determine the job status
275

276
      - otherwise, it means either all opcodes are queued, or success,
277
        and the job status will be the same
278

279
    @return: the job status
280

281
    """
282
    status = constants.JOB_STATUS_QUEUED
283

    
284
    all_success = True
285
    for op in self.ops:
286
      if op.status == constants.OP_STATUS_SUCCESS:
287
        continue
288

    
289
      all_success = False
290

    
291
      if op.status == constants.OP_STATUS_QUEUED:
292
        pass
293
      elif op.status == constants.OP_STATUS_WAITLOCK:
294
        status = constants.JOB_STATUS_WAITLOCK
295
      elif op.status == constants.OP_STATUS_RUNNING:
296
        status = constants.JOB_STATUS_RUNNING
297
      elif op.status == constants.OP_STATUS_CANCELING:
298
        status = constants.JOB_STATUS_CANCELING
299
        break
300
      elif op.status == constants.OP_STATUS_ERROR:
301
        status = constants.JOB_STATUS_ERROR
302
        # The whole job fails if one opcode failed
303
        break
304
      elif op.status == constants.OP_STATUS_CANCELED:
305
        status = constants.OP_STATUS_CANCELED
306
        break
307

    
308
    if all_success:
309
      status = constants.JOB_STATUS_SUCCESS
310

    
311
    return status
312

    
313
  def GetLogEntries(self, newer_than):
314
    """Selectively returns the log entries.
315

316
    @type newer_than: None or int
317
    @param newer_than: if this is None, return all log entries,
318
        otherwise return only the log entries with serial higher
319
        than this value
320
    @rtype: list
321
    @return: the list of the log entries selected
322

323
    """
324
    if newer_than is None:
325
      serial = -1
326
    else:
327
      serial = newer_than
328

    
329
    entries = []
330
    for op in self.ops:
331
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
332

    
333
    return entries
334

    
335
  def GetInfo(self, fields):
336
    """Returns information about a job.
337

338
    @type fields: list
339
    @param fields: names of fields to return
340
    @rtype: list
341
    @return: list with one element for each field
342
    @raise errors.OpExecError: when an invalid field
343
        has been passed
344

345
    """
346
    row = []
347
    for fname in fields:
348
      if fname == "id":
349
        row.append(self.id)
350
      elif fname == "status":
351
        row.append(self.CalcStatus())
352
      elif fname == "ops":
353
        row.append([op.input.__getstate__() for op in self.ops])
354
      elif fname == "opresult":
355
        row.append([op.result for op in self.ops])
356
      elif fname == "opstatus":
357
        row.append([op.status for op in self.ops])
358
      elif fname == "oplog":
359
        row.append([op.log for op in self.ops])
360
      elif fname == "opstart":
361
        row.append([op.start_timestamp for op in self.ops])
362
      elif fname == "opexec":
363
        row.append([op.exec_timestamp for op in self.ops])
364
      elif fname == "opend":
365
        row.append([op.end_timestamp for op in self.ops])
366
      elif fname == "received_ts":
367
        row.append(self.received_timestamp)
368
      elif fname == "start_ts":
369
        row.append(self.start_timestamp)
370
      elif fname == "end_ts":
371
        row.append(self.end_timestamp)
372
      elif fname == "lock_status":
373
        row.append(self.lock_status)
374
      elif fname == "summary":
375
        row.append([op.input.Summary() for op in self.ops])
376
      else:
377
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
378
    return row
379

    
380
  def MarkUnfinishedOps(self, status, result):
381
    """Mark unfinished opcodes with a given status and result.
382

383
    This is an utility function for marking all running or waiting to
384
    be run opcodes with a given status. Opcodes which are already
385
    finalised are not changed.
386

387
    @param status: a given opcode status
388
    @param result: the opcode result
389

390
    """
391
    not_marked = True
392
    for op in self.ops:
393
      if op.status in constants.OPS_FINALIZED:
394
        assert not_marked, "Finalized opcodes found after non-finalized ones"
395
        continue
396
      op.status = status
397
      op.result = result
398
      not_marked = False
399

    
400

    
401
class _OpExecCallbacks(mcpu.OpExecCbBase):
402
  def __init__(self, queue, job, op):
403
    """Initializes this class.
404

405
    @type queue: L{JobQueue}
406
    @param queue: Job queue
407
    @type job: L{_QueuedJob}
408
    @param job: Job object
409
    @type op: L{_QueuedOpCode}
410
    @param op: OpCode
411

412
    """
413
    assert queue, "Queue is missing"
414
    assert job, "Job is missing"
415
    assert op, "Opcode is missing"
416

    
417
    self._queue = queue
418
    self._job = job
419
    self._op = op
420

    
421
  def NotifyStart(self):
422
    """Mark the opcode as running, not lock-waiting.
423

424
    This is called from the mcpu code as a notifier function, when the LU is
425
    finally about to start the Exec() method. Of course, to have end-user
426
    visible results, the opcode must be initially (before calling into
427
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
428

429
    """
430
    self._queue.acquire()
431
    try:
432
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
433
                                 constants.OP_STATUS_CANCELING)
434

    
435
      # All locks are acquired by now
436
      self._job.lock_status = None
437

    
438
      # Cancel here if we were asked to
439
      if self._op.status == constants.OP_STATUS_CANCELING:
440
        raise CancelJob()
441

    
442
      self._op.status = constants.OP_STATUS_RUNNING
443
      self._op.exec_timestamp = TimeStampNow()
444
    finally:
445
      self._queue.release()
446

    
447
  def Feedback(self, *args):
448
    """Append a log entry.
449

450
    """
451
    assert len(args) < 3
452

    
453
    if len(args) == 1:
454
      log_type = constants.ELOG_MESSAGE
455
      log_msg = args[0]
456
    else:
457
      (log_type, log_msg) = args
458

    
459
    # The time is split to make serialization easier and not lose
460
    # precision.
461
    timestamp = utils.SplitTime(time.time())
462

    
463
    self._queue.acquire()
464
    try:
465
      self._job.log_serial += 1
466
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
467
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
468

    
469
      self._job.change.notifyAll()
470
    finally:
471
      self._queue.release()
472

    
473
  def ReportLocks(self, msg):
474
    """Write locking information to the job.
475

476
    Called whenever the LU processor is waiting for a lock or has acquired one.
477

478
    """
479
    # Not getting the queue lock because this is a single assignment
480
    self._job.lock_status = msg
481

    
482

    
483
class _WaitForJobChangesHelper(object):
484
  """Helper class using initofy to wait for changes in a job file.
485

486
  This class takes a previous job status and serial, and alerts the client when
487
  the current job status has changed.
488

489
  @type job_id: string
490
  @ivar job_id: id of the job we're watching
491
  @type prev_job_info: string
492
  @ivar prev_job_info: previous job info, as passed by the luxi client
493
  @type prev_log_serial: string
494
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
495
  @type queue: L{JobQueue}
496
  @ivar queue: job queue (used for a few utility functions)
497
  @type job_path: string
498
  @ivar job_path: absolute path of the job file
499
  @type wm: pyinotify.WatchManager (or None)
500
  @ivar wm: inotify watch manager to watch for changes
501
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
502
  @ivar inotify_handler: single file event handler, used for watching
503
  @type notifier: pyinotify.Notifier
504
  @ivar notifier: inotify single-threaded notifier, used for watching
505

506
  """
507

    
508
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
509
    self.job_id = job_id
510
    self.fields = fields
511
    self.prev_job_info = prev_job_info
512
    self.prev_log_serial = prev_log_serial
513
    self.queue = queue
514
    # pylint: disable-msg=W0212
515
    self.job_path = self.queue._GetJobPath(self.job_id)
516
    self.wm = None
517
    self.inotify_handler = None
518
    self.notifier = None
519

    
520
  def _SetupInotify(self):
521
    """Create the inotify
522

523
    @raises errors.InotifyError: if the notifier cannot be setup
524

525
    """
526
    if self.wm:
527
      return
528
    self.wm = pyinotify.WatchManager()
529
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
530
                                                                self.OnInotify,
531
                                                                self.job_path)
532
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
533
    self.inotify_handler.enable()
534

    
535
  def _LoadDiskStatus(self):
536
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
537
    if not job:
538
      raise errors.JobLost()
539
    self.job_status = job.CalcStatus()
540

    
541
    job_info = job.GetInfo(self.fields)
542
    log_entries = job.GetLogEntries(self.prev_log_serial)
543
    # Serializing and deserializing data can cause type changes (e.g. from
544
    # tuple to list) or precision loss. We're doing it here so that we get
545
    # the same modifications as the data received from the client. Without
546
    # this, the comparison afterwards might fail without the data being
547
    # significantly different.
548
    # TODO: we just deserialized from disk, investigate how to make sure that
549
    # the job info and log entries are compatible to avoid this further step.
550
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
551
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
552

    
553
  def _CheckForChanges(self):
554
    self._LoadDiskStatus()
555
    # Don't even try to wait if the job is no longer running, there will be
556
    # no changes.
557
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
558
                                constants.JOB_STATUS_RUNNING,
559
                                constants.JOB_STATUS_WAITLOCK) or
560
        self.prev_job_info != self.job_info or
561
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
562
      logging.debug("Job %s changed", self.job_id)
563
      return (self.job_info, self.log_entries)
564

    
565
    raise utils.RetryAgain()
566

    
567
  def OnInotify(self, notifier_enabled):
568
    if not notifier_enabled:
569
      self.inotify_handler.enable()
570

    
571
  def WaitFn(self, timeout):
572
    self._SetupInotify()
573
    if self.notifier.check_events(timeout*1000):
574
      self.notifier.read_events()
575
    self.notifier.process_events()
576

    
577
  def WaitForChanges(self, timeout):
578
    try:
579
      return utils.Retry(self._CheckForChanges,
580
                         utils.RETRY_REMAINING_TIME,
581
                         timeout,
582
                         wait_fn=self.WaitFn)
583
    except (errors.InotifyError, errors.JobLost):
584
      return None
585
    except utils.RetryTimeout:
586
      return constants.JOB_NOTCHANGED
587

    
588
  def Close(self):
589
    if self.wm:
590
      self.notifier.stop()
591

    
592

    
593
class _JobQueueWorker(workerpool.BaseWorker):
594
  """The actual job workers.
595

596
  """
597
  def RunTask(self, job): # pylint: disable-msg=W0221
598
    """Job executor.
599

600
    This functions processes a job. It is closely tied to the _QueuedJob and
601
    _QueuedOpCode classes.
602

603
    @type job: L{_QueuedJob}
604
    @param job: the job to be processed
605

606
    """
607
    logging.info("Processing job %s", job.id)
608
    proc = mcpu.Processor(self.pool.queue.context, job.id)
609
    queue = job.queue
610
    try:
611
      try:
612
        count = len(job.ops)
613
        for idx, op in enumerate(job.ops):
614
          op_summary = op.input.Summary()
615
          if op.status == constants.OP_STATUS_SUCCESS:
616
            # this is a job that was partially completed before master
617
            # daemon shutdown, so it can be expected that some opcodes
618
            # are already completed successfully (if any did error
619
            # out, then the whole job should have been aborted and not
620
            # resubmitted for processing)
621
            logging.info("Op %s/%s: opcode %s already processed, skipping",
622
                         idx + 1, count, op_summary)
623
            continue
624
          try:
625
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
626
                         op_summary)
627

    
628
            queue.acquire()
629
            try:
630
              if op.status == constants.OP_STATUS_CANCELED:
631
                raise CancelJob()
632
              assert op.status == constants.OP_STATUS_QUEUED
633
              op.status = constants.OP_STATUS_WAITLOCK
634
              op.result = None
635
              op.start_timestamp = TimeStampNow()
636
              if idx == 0: # first opcode
637
                job.start_timestamp = op.start_timestamp
638
              queue.UpdateJobUnlocked(job)
639

    
640
              input_opcode = op.input
641
            finally:
642
              queue.release()
643

    
644
            # Make sure not to hold queue lock while calling ExecOpCode
645
            result = proc.ExecOpCode(input_opcode,
646
                                     _OpExecCallbacks(queue, job, op))
647

    
648
            queue.acquire()
649
            try:
650
              op.status = constants.OP_STATUS_SUCCESS
651
              op.result = result
652
              op.end_timestamp = TimeStampNow()
653
              queue.UpdateJobUnlocked(job)
654
            finally:
655
              queue.release()
656

    
657
            logging.info("Op %s/%s: Successfully finished opcode %s",
658
                         idx + 1, count, op_summary)
659
          except CancelJob:
660
            # Will be handled further up
661
            raise
662
          except Exception, err:
663
            queue.acquire()
664
            try:
665
              try:
666
                op.status = constants.OP_STATUS_ERROR
667
                if isinstance(err, errors.GenericError):
668
                  op.result = errors.EncodeException(err)
669
                else:
670
                  op.result = str(err)
671
                op.end_timestamp = TimeStampNow()
672
                logging.info("Op %s/%s: Error in opcode %s: %s",
673
                             idx + 1, count, op_summary, err)
674
              finally:
675
                queue.UpdateJobUnlocked(job)
676
            finally:
677
              queue.release()
678
            raise
679

    
680
      except CancelJob:
681
        queue.acquire()
682
        try:
683
          queue.CancelJobUnlocked(job)
684
        finally:
685
          queue.release()
686
      except errors.GenericError, err:
687
        logging.exception("Ganeti exception")
688
      except:
689
        logging.exception("Unhandled exception")
690
    finally:
691
      queue.acquire()
692
      try:
693
        try:
694
          job.lock_status = None
695
          job.end_timestamp = TimeStampNow()
696
          queue.UpdateJobUnlocked(job)
697
        finally:
698
          job_id = job.id
699
          status = job.CalcStatus()
700
      finally:
701
        queue.release()
702

    
703
      logging.info("Finished job %s, status = %s", job_id, status)
704

    
705

    
706
class _JobQueueWorkerPool(workerpool.WorkerPool):
707
  """Simple class implementing a job-processing workerpool.
708

709
  """
710
  def __init__(self, queue):
711
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
712
                                              JOBQUEUE_THREADS,
713
                                              _JobQueueWorker)
714
    self.queue = queue
715

    
716

    
717
def _RequireOpenQueue(fn):
718
  """Decorator for "public" functions.
719

720
  This function should be used for all 'public' functions. That is,
721
  functions usually called from other classes. Note that this should
722
  be applied only to methods (not plain functions), since it expects
723
  that the decorated function is called with a first argument that has
724
  a '_queue_filelock' argument.
725

726
  @warning: Use this decorator only after utils.LockedMethod!
727

728
  Example::
729
    @utils.LockedMethod
730
    @_RequireOpenQueue
731
    def Example(self):
732
      pass
733

734
  """
735
  def wrapper(self, *args, **kwargs):
736
    # pylint: disable-msg=W0212
737
    assert self._queue_filelock is not None, "Queue should be open"
738
    return fn(self, *args, **kwargs)
739
  return wrapper
740

    
741

    
742
class JobQueue(object):
743
  """Queue used to manage the jobs.
744

745
  @cvar _RE_JOB_FILE: regex matching the valid job file names
746

747
  """
748
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
749

    
750
  def __init__(self, context):
751
    """Constructor for JobQueue.
752

753
    The constructor will initialize the job queue object and then
754
    start loading the current jobs from disk, either for starting them
755
    (if they were queue) or for aborting them (if they were already
756
    running).
757

758
    @type context: GanetiContext
759
    @param context: the context object for access to the configuration
760
        data and other ganeti objects
761

762
    """
763
    self.context = context
764
    self._memcache = weakref.WeakValueDictionary()
765
    self._my_hostname = utils.HostInfo().name
766

    
767
    # Locking
768
    self._lock = threading.Lock()
769
    self.acquire = self._lock.acquire
770
    self.release = self._lock.release
771

    
772
    # Initialize the queue, and acquire the filelock.
773
    # This ensures no other process is working on the job queue.
774
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
775

    
776
    # Read serial file
777
    self._last_serial = jstore.ReadSerial()
778
    assert self._last_serial is not None, ("Serial file was modified between"
779
                                           " check in jstore and here")
780

    
781
    # Get initial list of nodes
782
    self._nodes = dict((n.name, n.primary_ip)
783
                       for n in self.context.cfg.GetAllNodesInfo().values()
784
                       if n.master_candidate)
785

    
786
    # Remove master node
787
    self._nodes.pop(self._my_hostname, None)
788

    
789
    # TODO: Check consistency across nodes
790

    
791
    self._queue_size = 0
792
    self._UpdateQueueSizeUnlocked()
793
    self._drained = self._IsQueueMarkedDrain()
794

    
795
    # Setup worker pool
796
    self._wpool = _JobQueueWorkerPool(self)
797
    try:
798
      # We need to lock here because WorkerPool.AddTask() may start a job while
799
      # we're still doing our work.
800
      self.acquire()
801
      try:
802
        logging.info("Inspecting job queue")
803

    
804
        all_job_ids = self._GetJobIDsUnlocked()
805
        jobs_count = len(all_job_ids)
806
        lastinfo = time.time()
807
        for idx, job_id in enumerate(all_job_ids):
808
          # Give an update every 1000 jobs or 10 seconds
809
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
810
              idx == (jobs_count - 1)):
811
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
812
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
813
            lastinfo = time.time()
814

    
815
          job = self._LoadJobUnlocked(job_id)
816

    
817
          # a failure in loading the job can cause 'None' to be returned
818
          if job is None:
819
            continue
820

    
821
          status = job.CalcStatus()
822

    
823
          if status in (constants.JOB_STATUS_QUEUED, ):
824
            self._wpool.AddTask(job)
825

    
826
          elif status in (constants.JOB_STATUS_RUNNING,
827
                          constants.JOB_STATUS_WAITLOCK,
828
                          constants.JOB_STATUS_CANCELING):
829
            logging.warning("Unfinished job %s found: %s", job.id, job)
830
            try:
831
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
832
                                    "Unclean master daemon shutdown")
833
            finally:
834
              self.UpdateJobUnlocked(job)
835

    
836
        logging.info("Job queue inspection finished")
837
      finally:
838
        self.release()
839
    except:
840
      self._wpool.TerminateWorkers()
841
      raise
842

    
843
  @utils.LockedMethod
844
  @_RequireOpenQueue
845
  def AddNode(self, node):
846
    """Register a new node with the queue.
847

848
    @type node: L{objects.Node}
849
    @param node: the node object to be added
850

851
    """
852
    node_name = node.name
853
    assert node_name != self._my_hostname
854

    
855
    # Clean queue directory on added node
856
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
857
    msg = result.fail_msg
858
    if msg:
859
      logging.warning("Cannot cleanup queue directory on node %s: %s",
860
                      node_name, msg)
861

    
862
    if not node.master_candidate:
863
      # remove if existing, ignoring errors
864
      self._nodes.pop(node_name, None)
865
      # and skip the replication of the job ids
866
      return
867

    
868
    # Upload the whole queue excluding archived jobs
869
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
870

    
871
    # Upload current serial file
872
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
873

    
874
    for file_name in files:
875
      # Read file content
876
      content = utils.ReadFile(file_name)
877

    
878
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
879
                                                  [node.primary_ip],
880
                                                  file_name, content)
881
      msg = result[node_name].fail_msg
882
      if msg:
883
        logging.error("Failed to upload file %s to node %s: %s",
884
                      file_name, node_name, msg)
885

    
886
    self._nodes[node_name] = node.primary_ip
887

    
888
  @utils.LockedMethod
889
  @_RequireOpenQueue
890
  def RemoveNode(self, node_name):
891
    """Callback called when removing nodes from the cluster.
892

893
    @type node_name: str
894
    @param node_name: the name of the node to remove
895

896
    """
897
    self._nodes.pop(node_name, None)
898

    
899
  @staticmethod
900
  def _CheckRpcResult(result, nodes, failmsg):
901
    """Verifies the status of an RPC call.
902

903
    Since we aim to keep consistency should this node (the current
904
    master) fail, we will log errors if our rpc fail, and especially
905
    log the case when more than half of the nodes fails.
906

907
    @param result: the data as returned from the rpc call
908
    @type nodes: list
909
    @param nodes: the list of nodes we made the call to
910
    @type failmsg: str
911
    @param failmsg: the identifier to be used for logging
912

913
    """
914
    failed = []
915
    success = []
916

    
917
    for node in nodes:
918
      msg = result[node].fail_msg
919
      if msg:
920
        failed.append(node)
921
        logging.error("RPC call %s (%s) failed on node %s: %s",
922
                      result[node].call, failmsg, node, msg)
923
      else:
924
        success.append(node)
925

    
926
    # +1 for the master node
927
    if (len(success) + 1) < len(failed):
928
      # TODO: Handle failing nodes
929
      logging.error("More than half of the nodes failed")
930

    
931
  def _GetNodeIp(self):
932
    """Helper for returning the node name/ip list.
933

934
    @rtype: (list, list)
935
    @return: a tuple of two lists, the first one with the node
936
        names and the second one with the node addresses
937

938
    """
939
    name_list = self._nodes.keys()
940
    addr_list = [self._nodes[name] for name in name_list]
941
    return name_list, addr_list
942

    
943
  def _UpdateJobQueueFile(self, file_name, data, replicate):
944
    """Writes a file locally and then replicates it to all nodes.
945

946
    This function will replace the contents of a file on the local
947
    node and then replicate it to all the other nodes we have.
948

949
    @type file_name: str
950
    @param file_name: the path of the file to be replicated
951
    @type data: str
952
    @param data: the new contents of the file
953
    @type replicate: boolean
954
    @param replicate: whether to spread the changes to the remote nodes
955

956
    """
957
    utils.WriteFile(file_name, data=data)
958

    
959
    if replicate:
960
      names, addrs = self._GetNodeIp()
961
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
962
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
963

    
964
  def _RenameFilesUnlocked(self, rename):
965
    """Renames a file locally and then replicate the change.
966

967
    This function will rename a file in the local queue directory
968
    and then replicate this rename to all the other nodes we have.
969

970
    @type rename: list of (old, new)
971
    @param rename: List containing tuples mapping old to new names
972

973
    """
974
    # Rename them locally
975
    for old, new in rename:
976
      utils.RenameFile(old, new, mkdir=True)
977

    
978
    # ... and on all nodes
979
    names, addrs = self._GetNodeIp()
980
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
981
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
982

    
983
  @staticmethod
984
  def _FormatJobID(job_id):
985
    """Convert a job ID to string format.
986

987
    Currently this just does C{str(job_id)} after performing some
988
    checks, but if we want to change the job id format this will
989
    abstract this change.
990

991
    @type job_id: int or long
992
    @param job_id: the numeric job id
993
    @rtype: str
994
    @return: the formatted job id
995

996
    """
997
    if not isinstance(job_id, (int, long)):
998
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
999
    if job_id < 0:
1000
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1001

    
1002
    return str(job_id)
1003

    
1004
  @classmethod
1005
  def _GetArchiveDirectory(cls, job_id):
1006
    """Returns the archive directory for a job.
1007

1008
    @type job_id: str
1009
    @param job_id: Job identifier
1010
    @rtype: str
1011
    @return: Directory name
1012

1013
    """
1014
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1015

    
1016
  def _NewSerialsUnlocked(self, count):
1017
    """Generates a new job identifier.
1018

1019
    Job identifiers are unique during the lifetime of a cluster.
1020

1021
    @type count: integer
1022
    @param count: how many serials to return
1023
    @rtype: str
1024
    @return: a string representing the job identifier.
1025

1026
    """
1027
    assert count > 0
1028
    # New number
1029
    serial = self._last_serial + count
1030

    
1031
    # Write to file
1032
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1033
                             "%s\n" % serial, True)
1034

    
1035
    result = [self._FormatJobID(v)
1036
              for v in range(self._last_serial, serial + 1)]
1037
    # Keep it only if we were able to write the file
1038
    self._last_serial = serial
1039

    
1040
    return result
1041

    
1042
  @staticmethod
1043
  def _GetJobPath(job_id):
1044
    """Returns the job file for a given job id.
1045

1046
    @type job_id: str
1047
    @param job_id: the job identifier
1048
    @rtype: str
1049
    @return: the path to the job file
1050

1051
    """
1052
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1053

    
1054
  @classmethod
1055
  def _GetArchivedJobPath(cls, job_id):
1056
    """Returns the archived job file for a give job id.
1057

1058
    @type job_id: str
1059
    @param job_id: the job identifier
1060
    @rtype: str
1061
    @return: the path to the archived job file
1062

1063
    """
1064
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1065
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1066

    
1067
  def _GetJobIDsUnlocked(self, sort=True):
1068
    """Return all known job IDs.
1069

1070
    The method only looks at disk because it's a requirement that all
1071
    jobs are present on disk (so in the _memcache we don't have any
1072
    extra IDs).
1073

1074
    @type sort: boolean
1075
    @param sort: perform sorting on the returned job ids
1076
    @rtype: list
1077
    @return: the list of job IDs
1078

1079
    """
1080
    jlist = []
1081
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1082
      m = self._RE_JOB_FILE.match(filename)
1083
      if m:
1084
        jlist.append(m.group(1))
1085
    if sort:
1086
      jlist = utils.NiceSort(jlist)
1087
    return jlist
1088

    
1089
  def _LoadJobUnlocked(self, job_id):
1090
    """Loads a job from the disk or memory.
1091

1092
    Given a job id, this will return the cached job object if
1093
    existing, or try to load the job from the disk. If loading from
1094
    disk, it will also add the job to the cache.
1095

1096
    @param job_id: the job id
1097
    @rtype: L{_QueuedJob} or None
1098
    @return: either None or the job object
1099

1100
    """
1101
    job = self._memcache.get(job_id, None)
1102
    if job:
1103
      logging.debug("Found job %s in memcache", job_id)
1104
      return job
1105

    
1106
    try:
1107
      job = self._LoadJobFromDisk(job_id)
1108
    except errors.JobFileCorrupted:
1109
      old_path = self._GetJobPath(job_id)
1110
      new_path = self._GetArchivedJobPath(job_id)
1111
      if old_path == new_path:
1112
        # job already archived (future case)
1113
        logging.exception("Can't parse job %s", job_id)
1114
      else:
1115
        # non-archived case
1116
        logging.exception("Can't parse job %s, will archive.", job_id)
1117
        self._RenameFilesUnlocked([(old_path, new_path)])
1118
      return None
1119

    
1120
    self._memcache[job_id] = job
1121
    logging.debug("Added job %s to the cache", job_id)
1122
    return job
1123

    
1124
  def _LoadJobFromDisk(self, job_id):
1125
    """Load the given job file from disk.
1126

1127
    Given a job file, read, load and restore it in a _QueuedJob format.
1128

1129
    @type job_id: string
1130
    @param job_id: job identifier
1131
    @rtype: L{_QueuedJob} or None
1132
    @return: either None or the job object
1133

1134
    """
1135
    filepath = self._GetJobPath(job_id)
1136
    logging.debug("Loading job from %s", filepath)
1137
    try:
1138
      raw_data = utils.ReadFile(filepath)
1139
    except EnvironmentError, err:
1140
      if err.errno in (errno.ENOENT, ):
1141
        return None
1142
      raise
1143

    
1144
    try:
1145
      data = serializer.LoadJson(raw_data)
1146
      job = _QueuedJob.Restore(self, data)
1147
    except Exception, err: # pylint: disable-msg=W0703
1148
      raise errors.JobFileCorrupted(err)
1149

    
1150
    return job
1151

    
1152
  def SafeLoadJobFromDisk(self, job_id):
1153
    """Load the given job file from disk.
1154

1155
    Given a job file, read, load and restore it in a _QueuedJob format.
1156
    In case of error reading the job, it gets returned as None, and the
1157
    exception is logged.
1158

1159
    @type job_id: string
1160
    @param job_id: job identifier
1161
    @rtype: L{_QueuedJob} or None
1162
    @return: either None or the job object
1163

1164
    """
1165
    try:
1166
      return self._LoadJobFromDisk(job_id)
1167
    except (errors.JobFileCorrupted, EnvironmentError):
1168
      logging.exception("Can't load/parse job %s", job_id)
1169
      return None
1170

    
1171
  @staticmethod
1172
  def _IsQueueMarkedDrain():
1173
    """Check if the queue is marked from drain.
1174

1175
    This currently uses the queue drain file, which makes it a
1176
    per-node flag. In the future this can be moved to the config file.
1177

1178
    @rtype: boolean
1179
    @return: True of the job queue is marked for draining
1180

1181
    """
1182
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1183

    
1184
  def _UpdateQueueSizeUnlocked(self):
1185
    """Update the queue size.
1186

1187
    """
1188
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1189

    
1190
  @utils.LockedMethod
1191
  @_RequireOpenQueue
1192
  def SetDrainFlag(self, drain_flag):
1193
    """Sets the drain flag for the queue.
1194

1195
    @type drain_flag: boolean
1196
    @param drain_flag: Whether to set or unset the drain flag
1197

1198
    """
1199
    if drain_flag:
1200
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1201
    else:
1202
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1203

    
1204
    self._drained = drain_flag
1205

    
1206
    return True
1207

    
1208
  @_RequireOpenQueue
1209
  def _SubmitJobUnlocked(self, job_id, ops):
1210
    """Create and store a new job.
1211

1212
    This enters the job into our job queue and also puts it on the new
1213
    queue, in order for it to be picked up by the queue processors.
1214

1215
    @type job_id: job ID
1216
    @param job_id: the job ID for the new job
1217
    @type ops: list
1218
    @param ops: The list of OpCodes that will become the new job.
1219
    @rtype: L{_QueuedJob}
1220
    @return: the job object to be queued
1221
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1222
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1223

1224
    """
1225
    # Ok when sharing the big job queue lock, as the drain file is created when
1226
    # the lock is exclusive.
1227
    if self._drained:
1228
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1229

    
1230
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1231
      raise errors.JobQueueFull()
1232

    
1233
    job = _QueuedJob(self, job_id, ops)
1234

    
1235
    # Write to disk
1236
    self.UpdateJobUnlocked(job)
1237

    
1238
    self._queue_size += 1
1239

    
1240
    logging.debug("Adding new job %s to the cache", job_id)
1241
    self._memcache[job_id] = job
1242

    
1243
    return job
1244

    
1245
  @utils.LockedMethod
1246
  @_RequireOpenQueue
1247
  def SubmitJob(self, ops):
1248
    """Create and store a new job.
1249

1250
    @see: L{_SubmitJobUnlocked}
1251

1252
    """
1253
    job_id = self._NewSerialsUnlocked(1)[0]
1254
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1255
    return job_id
1256

    
1257
  @utils.LockedMethod
1258
  @_RequireOpenQueue
1259
  def SubmitManyJobs(self, jobs):
1260
    """Create and store multiple jobs.
1261

1262
    @see: L{_SubmitJobUnlocked}
1263

1264
    """
1265
    results = []
1266
    tasks = []
1267
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1268
    for job_id, ops in zip(all_job_ids, jobs):
1269
      try:
1270
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1271
        status = True
1272
        data = job_id
1273
      except errors.GenericError, err:
1274
        data = str(err)
1275
        status = False
1276
      results.append((status, data))
1277
    self._wpool.AddManyTasks(tasks)
1278

    
1279
    return results
1280

    
1281
  @_RequireOpenQueue
1282
  def UpdateJobUnlocked(self, job, replicate=True):
1283
    """Update a job's on disk storage.
1284

1285
    After a job has been modified, this function needs to be called in
1286
    order to write the changes to disk and replicate them to the other
1287
    nodes.
1288

1289
    @type job: L{_QueuedJob}
1290
    @param job: the changed job
1291
    @type replicate: boolean
1292
    @param replicate: whether to replicate the change to remote nodes
1293

1294
    """
1295
    filename = self._GetJobPath(job.id)
1296
    data = serializer.DumpJson(job.Serialize(), indent=False)
1297
    logging.debug("Writing job %s to %s", job.id, filename)
1298
    self._UpdateJobQueueFile(filename, data, replicate)
1299

    
1300
    # Notify waiters about potential changes
1301
    job.change.notifyAll()
1302

    
1303
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1304
                        timeout):
1305
    """Waits for changes in a job.
1306

1307
    @type job_id: string
1308
    @param job_id: Job identifier
1309
    @type fields: list of strings
1310
    @param fields: Which fields to check for changes
1311
    @type prev_job_info: list or None
1312
    @param prev_job_info: Last job information returned
1313
    @type prev_log_serial: int
1314
    @param prev_log_serial: Last job message serial number
1315
    @type timeout: float
1316
    @param timeout: maximum time to wait
1317
    @rtype: tuple (job info, log entries)
1318
    @return: a tuple of the job information as required via
1319
        the fields parameter, and the log entries as a list
1320

1321
        if the job has not changed and the timeout has expired,
1322
        we instead return a special value,
1323
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1324
        as such by the clients
1325

1326
    """
1327
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1328
                                      prev_log_serial, self)
1329
    try:
1330
      return helper.WaitForChanges(timeout)
1331
    finally:
1332
      helper.Close()
1333

    
1334
  @utils.LockedMethod
1335
  @_RequireOpenQueue
1336
  def CancelJob(self, job_id):
1337
    """Cancels a job.
1338

1339
    This will only succeed if the job has not started yet.
1340

1341
    @type job_id: string
1342
    @param job_id: job ID of job to be cancelled.
1343

1344
    """
1345
    logging.info("Cancelling job %s", job_id)
1346

    
1347
    job = self._LoadJobUnlocked(job_id)
1348
    if not job:
1349
      logging.debug("Job %s not found", job_id)
1350
      return (False, "Job %s not found" % job_id)
1351

    
1352
    job_status = job.CalcStatus()
1353

    
1354
    if job_status not in (constants.JOB_STATUS_QUEUED,
1355
                          constants.JOB_STATUS_WAITLOCK):
1356
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1357
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1358

    
1359
    if job_status == constants.JOB_STATUS_QUEUED:
1360
      self.CancelJobUnlocked(job)
1361
      return (True, "Job %s canceled" % job.id)
1362

    
1363
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1364
      # The worker will notice the new status and cancel the job
1365
      try:
1366
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1367
      finally:
1368
        self.UpdateJobUnlocked(job)
1369
      return (True, "Job %s will be canceled" % job.id)
1370

    
1371
  @_RequireOpenQueue
1372
  def CancelJobUnlocked(self, job):
1373
    """Marks a job as canceled.
1374

1375
    """
1376
    try:
1377
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1378
                            "Job canceled by request")
1379
    finally:
1380
      self.UpdateJobUnlocked(job)
1381

    
1382
  @_RequireOpenQueue
1383
  def _ArchiveJobsUnlocked(self, jobs):
1384
    """Archives jobs.
1385

1386
    @type jobs: list of L{_QueuedJob}
1387
    @param jobs: Job objects
1388
    @rtype: int
1389
    @return: Number of archived jobs
1390

1391
    """
1392
    archive_jobs = []
1393
    rename_files = []
1394
    for job in jobs:
1395
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1396
                                  constants.JOB_STATUS_SUCCESS,
1397
                                  constants.JOB_STATUS_ERROR):
1398
        logging.debug("Job %s is not yet done", job.id)
1399
        continue
1400

    
1401
      archive_jobs.append(job)
1402

    
1403
      old = self._GetJobPath(job.id)
1404
      new = self._GetArchivedJobPath(job.id)
1405
      rename_files.append((old, new))
1406

    
1407
    # TODO: What if 1..n files fail to rename?
1408
    self._RenameFilesUnlocked(rename_files)
1409

    
1410
    logging.debug("Successfully archived job(s) %s",
1411
                  utils.CommaJoin(job.id for job in archive_jobs))
1412

    
1413
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1414
    # the files, we update the cached queue size from the filesystem. When we
1415
    # get around to fix the TODO: above, we can use the number of actually
1416
    # archived jobs to fix this.
1417
    self._UpdateQueueSizeUnlocked()
1418
    return len(archive_jobs)
1419

    
1420
  @utils.LockedMethod
1421
  @_RequireOpenQueue
1422
  def ArchiveJob(self, job_id):
1423
    """Archives a job.
1424

1425
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1426

1427
    @type job_id: string
1428
    @param job_id: Job ID of job to be archived.
1429
    @rtype: bool
1430
    @return: Whether job was archived
1431

1432
    """
1433
    logging.info("Archiving job %s", job_id)
1434

    
1435
    job = self._LoadJobUnlocked(job_id)
1436
    if not job:
1437
      logging.debug("Job %s not found", job_id)
1438
      return False
1439

    
1440
    return self._ArchiveJobsUnlocked([job]) == 1
1441

    
1442
  @utils.LockedMethod
1443
  @_RequireOpenQueue
1444
  def AutoArchiveJobs(self, age, timeout):
1445
    """Archives all jobs based on age.
1446

1447
    The method will archive all jobs which are older than the age
1448
    parameter. For jobs that don't have an end timestamp, the start
1449
    timestamp will be considered. The special '-1' age will cause
1450
    archival of all jobs (that are not running or queued).
1451

1452
    @type age: int
1453
    @param age: the minimum age in seconds
1454

1455
    """
1456
    logging.info("Archiving jobs with age more than %s seconds", age)
1457

    
1458
    now = time.time()
1459
    end_time = now + timeout
1460
    archived_count = 0
1461
    last_touched = 0
1462

    
1463
    all_job_ids = self._GetJobIDsUnlocked()
1464
    pending = []
1465
    for idx, job_id in enumerate(all_job_ids):
1466
      last_touched = idx + 1
1467

    
1468
      # Not optimal because jobs could be pending
1469
      # TODO: Measure average duration for job archival and take number of
1470
      # pending jobs into account.
1471
      if time.time() > end_time:
1472
        break
1473

    
1474
      # Returns None if the job failed to load
1475
      job = self._LoadJobUnlocked(job_id)
1476
      if job:
1477
        if job.end_timestamp is None:
1478
          if job.start_timestamp is None:
1479
            job_age = job.received_timestamp
1480
          else:
1481
            job_age = job.start_timestamp
1482
        else:
1483
          job_age = job.end_timestamp
1484

    
1485
        if age == -1 or now - job_age[0] > age:
1486
          pending.append(job)
1487

    
1488
          # Archive 10 jobs at a time
1489
          if len(pending) >= 10:
1490
            archived_count += self._ArchiveJobsUnlocked(pending)
1491
            pending = []
1492

    
1493
    if pending:
1494
      archived_count += self._ArchiveJobsUnlocked(pending)
1495

    
1496
    return (archived_count, len(all_job_ids) - last_touched)
1497

    
1498
  def QueryJobs(self, job_ids, fields):
1499
    """Returns a list of jobs in queue.
1500

1501
    @type job_ids: list
1502
    @param job_ids: sequence of job identifiers or None for all
1503
    @type fields: list
1504
    @param fields: names of fields to return
1505
    @rtype: list
1506
    @return: list one element per job, each element being list with
1507
        the requested fields
1508

1509
    """
1510
    jobs = []
1511
    list_all = False
1512
    if not job_ids:
1513
      # Since files are added to/removed from the queue atomically, there's no
1514
      # risk of getting the job ids in an inconsistent state.
1515
      job_ids = self._GetJobIDsUnlocked()
1516
      list_all = True
1517

    
1518
    for job_id in job_ids:
1519
      job = self.SafeLoadJobFromDisk(job_id)
1520
      if job is not None:
1521
        jobs.append(job.GetInfo(fields))
1522
      elif not list_all:
1523
        jobs.append(None)
1524

    
1525
    return jobs
1526

    
1527
  @utils.LockedMethod
1528
  @_RequireOpenQueue
1529
  def Shutdown(self):
1530
    """Stops the job queue.
1531

1532
    This shutdowns all the worker threads an closes the queue.
1533

1534
    """
1535
    self._wpool.TerminateWorkers()
1536

    
1537
    self._queue_filelock.Close()
1538
    self._queue_filelock = None