Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ adb6d685

History | View | Annotate | Download (45.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
try:
41
  # pylint: disable-msg=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

    
46
from ganeti import asyncnotifier
47
from ganeti import constants
48
from ganeti import serializer
49
from ganeti import workerpool
50
from ganeti import opcodes
51
from ganeti import errors
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56

    
57

    
58
JOBQUEUE_THREADS = 25
59
JOBS_PER_ARCHIVE_DIRECTORY = 10000
60

    
61

    
62
class CancelJob(Exception):
63
  """Special exception to cancel a job.
64

65
  """
66

    
67

    
68
def TimeStampNow():
69
  """Returns the current timestamp.
70

71
  @rtype: tuple
72
  @return: the current time in the (seconds, microseconds) format
73

74
  """
75
  return utils.SplitTime(time.time())
76

    
77

    
78
class _QueuedOpCode(object):
79
  """Encapsulates an opcode object.
80

81
  @ivar log: holds the execution log and consists of tuples
82
  of the form C{(log_serial, timestamp, level, message)}
83
  @ivar input: the OpCode we encapsulate
84
  @ivar status: the current status
85
  @ivar result: the result of the LU execution
86
  @ivar start_timestamp: timestamp for the start of the execution
87
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
88
  @ivar stop_timestamp: timestamp for the end of the execution
89

90
  """
91
  __slots__ = ["input", "status", "result", "log",
92
               "start_timestamp", "exec_timestamp", "end_timestamp",
93
               "__weakref__"]
94

    
95
  def __init__(self, op):
96
    """Constructor for the _QuededOpCode.
97

98
    @type op: L{opcodes.OpCode}
99
    @param op: the opcode we encapsulate
100

101
    """
102
    self.input = op
103
    self.status = constants.OP_STATUS_QUEUED
104
    self.result = None
105
    self.log = []
106
    self.start_timestamp = None
107
    self.exec_timestamp = None
108
    self.end_timestamp = None
109

    
110
  @classmethod
111
  def Restore(cls, state):
112
    """Restore the _QueuedOpCode from the serialized form.
113

114
    @type state: dict
115
    @param state: the serialized state
116
    @rtype: _QueuedOpCode
117
    @return: a new _QueuedOpCode instance
118

119
    """
120
    obj = _QueuedOpCode.__new__(cls)
121
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
122
    obj.status = state["status"]
123
    obj.result = state["result"]
124
    obj.log = state["log"]
125
    obj.start_timestamp = state.get("start_timestamp", None)
126
    obj.exec_timestamp = state.get("exec_timestamp", None)
127
    obj.end_timestamp = state.get("end_timestamp", None)
128
    return obj
129

    
130
  def Serialize(self):
131
    """Serializes this _QueuedOpCode.
132

133
    @rtype: dict
134
    @return: the dictionary holding the serialized state
135

136
    """
137
    return {
138
      "input": self.input.__getstate__(),
139
      "status": self.status,
140
      "result": self.result,
141
      "log": self.log,
142
      "start_timestamp": self.start_timestamp,
143
      "exec_timestamp": self.exec_timestamp,
144
      "end_timestamp": self.end_timestamp,
145
      }
146

    
147

    
148
class _QueuedJob(object):
149
  """In-memory job representation.
150

151
  This is what we use to track the user-submitted jobs. Locking must
152
  be taken care of by users of this class.
153

154
  @type queue: L{JobQueue}
155
  @ivar queue: the parent queue
156
  @ivar id: the job ID
157
  @type ops: list
158
  @ivar ops: the list of _QueuedOpCode that constitute the job
159
  @type log_serial: int
160
  @ivar log_serial: holds the index for the next log entry
161
  @ivar received_timestamp: the timestamp for when the job was received
162
  @ivar start_timestmap: the timestamp for start of execution
163
  @ivar end_timestamp: the timestamp for end of execution
164
  @ivar lock_status: In-memory locking information for debugging
165

166
  """
167
  # pylint: disable-msg=W0212
168
  __slots__ = ["queue", "id", "ops", "log_serial",
169
               "received_timestamp", "start_timestamp", "end_timestamp",
170
               "lock_status", "change",
171
               "__weakref__"]
172

    
173
  def __init__(self, queue, job_id, ops):
174
    """Constructor for the _QueuedJob.
175

176
    @type queue: L{JobQueue}
177
    @param queue: our parent queue
178
    @type job_id: job_id
179
    @param job_id: our job id
180
    @type ops: list
181
    @param ops: the list of opcodes we hold, which will be encapsulated
182
        in _QueuedOpCodes
183

184
    """
185
    if not ops:
186
      raise errors.GenericError("A job needs at least one opcode")
187

    
188
    self.queue = queue
189
    self.id = job_id
190
    self.ops = [_QueuedOpCode(op) for op in ops]
191
    self.log_serial = 0
192
    self.received_timestamp = TimeStampNow()
193
    self.start_timestamp = None
194
    self.end_timestamp = None
195

    
196
    # In-memory attributes
197
    self.lock_status = None
198

    
199
  def __repr__(self):
200
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
201
              "id=%s" % self.id,
202
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
203

    
204
    return "<%s at %#x>" % (" ".join(status), id(self))
205

    
206
  @classmethod
207
  def Restore(cls, queue, state):
208
    """Restore a _QueuedJob from serialized state:
209

210
    @type queue: L{JobQueue}
211
    @param queue: to which queue the restored job belongs
212
    @type state: dict
213
    @param state: the serialized state
214
    @rtype: _JobQueue
215
    @return: the restored _JobQueue instance
216

217
    """
218
    obj = _QueuedJob.__new__(cls)
219
    obj.queue = queue
220
    obj.id = state["id"]
221
    obj.received_timestamp = state.get("received_timestamp", None)
222
    obj.start_timestamp = state.get("start_timestamp", None)
223
    obj.end_timestamp = state.get("end_timestamp", None)
224

    
225
    # In-memory attributes
226
    obj.lock_status = None
227

    
228
    obj.ops = []
229
    obj.log_serial = 0
230
    for op_state in state["ops"]:
231
      op = _QueuedOpCode.Restore(op_state)
232
      for log_entry in op.log:
233
        obj.log_serial = max(obj.log_serial, log_entry[0])
234
      obj.ops.append(op)
235

    
236
    return obj
237

    
238
  def Serialize(self):
239
    """Serialize the _JobQueue instance.
240

241
    @rtype: dict
242
    @return: the serialized state
243

244
    """
245
    return {
246
      "id": self.id,
247
      "ops": [op.Serialize() for op in self.ops],
248
      "start_timestamp": self.start_timestamp,
249
      "end_timestamp": self.end_timestamp,
250
      "received_timestamp": self.received_timestamp,
251
      }
252

    
253
  def CalcStatus(self):
254
    """Compute the status of this job.
255

256
    This function iterates over all the _QueuedOpCodes in the job and
257
    based on their status, computes the job status.
258

259
    The algorithm is:
260
      - if we find a cancelled, or finished with error, the job
261
        status will be the same
262
      - otherwise, the last opcode with the status one of:
263
          - waitlock
264
          - canceling
265
          - running
266

267
        will determine the job status
268

269
      - otherwise, it means either all opcodes are queued, or success,
270
        and the job status will be the same
271

272
    @return: the job status
273

274
    """
275
    status = constants.JOB_STATUS_QUEUED
276

    
277
    all_success = True
278
    for op in self.ops:
279
      if op.status == constants.OP_STATUS_SUCCESS:
280
        continue
281

    
282
      all_success = False
283

    
284
      if op.status == constants.OP_STATUS_QUEUED:
285
        pass
286
      elif op.status == constants.OP_STATUS_WAITLOCK:
287
        status = constants.JOB_STATUS_WAITLOCK
288
      elif op.status == constants.OP_STATUS_RUNNING:
289
        status = constants.JOB_STATUS_RUNNING
290
      elif op.status == constants.OP_STATUS_CANCELING:
291
        status = constants.JOB_STATUS_CANCELING
292
        break
293
      elif op.status == constants.OP_STATUS_ERROR:
294
        status = constants.JOB_STATUS_ERROR
295
        # The whole job fails if one opcode failed
296
        break
297
      elif op.status == constants.OP_STATUS_CANCELED:
298
        status = constants.OP_STATUS_CANCELED
299
        break
300

    
301
    if all_success:
302
      status = constants.JOB_STATUS_SUCCESS
303

    
304
    return status
305

    
306
  def GetLogEntries(self, newer_than):
307
    """Selectively returns the log entries.
308

309
    @type newer_than: None or int
310
    @param newer_than: if this is None, return all log entries,
311
        otherwise return only the log entries with serial higher
312
        than this value
313
    @rtype: list
314
    @return: the list of the log entries selected
315

316
    """
317
    if newer_than is None:
318
      serial = -1
319
    else:
320
      serial = newer_than
321

    
322
    entries = []
323
    for op in self.ops:
324
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325

    
326
    return entries
327

    
328
  def GetInfo(self, fields):
329
    """Returns information about a job.
330

331
    @type fields: list
332
    @param fields: names of fields to return
333
    @rtype: list
334
    @return: list with one element for each field
335
    @raise errors.OpExecError: when an invalid field
336
        has been passed
337

338
    """
339
    row = []
340
    for fname in fields:
341
      if fname == "id":
342
        row.append(self.id)
343
      elif fname == "status":
344
        row.append(self.CalcStatus())
345
      elif fname == "ops":
346
        row.append([op.input.__getstate__() for op in self.ops])
347
      elif fname == "opresult":
348
        row.append([op.result for op in self.ops])
349
      elif fname == "opstatus":
350
        row.append([op.status for op in self.ops])
351
      elif fname == "oplog":
352
        row.append([op.log for op in self.ops])
353
      elif fname == "opstart":
354
        row.append([op.start_timestamp for op in self.ops])
355
      elif fname == "opexec":
356
        row.append([op.exec_timestamp for op in self.ops])
357
      elif fname == "opend":
358
        row.append([op.end_timestamp for op in self.ops])
359
      elif fname == "received_ts":
360
        row.append(self.received_timestamp)
361
      elif fname == "start_ts":
362
        row.append(self.start_timestamp)
363
      elif fname == "end_ts":
364
        row.append(self.end_timestamp)
365
      elif fname == "lock_status":
366
        row.append(self.lock_status)
367
      elif fname == "summary":
368
        row.append([op.input.Summary() for op in self.ops])
369
      else:
370
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
371
    return row
372

    
373
  def MarkUnfinishedOps(self, status, result):
374
    """Mark unfinished opcodes with a given status and result.
375

376
    This is an utility function for marking all running or waiting to
377
    be run opcodes with a given status. Opcodes which are already
378
    finalised are not changed.
379

380
    @param status: a given opcode status
381
    @param result: the opcode result
382

383
    """
384
    not_marked = True
385
    for op in self.ops:
386
      if op.status in constants.OPS_FINALIZED:
387
        assert not_marked, "Finalized opcodes found after non-finalized ones"
388
        continue
389
      op.status = status
390
      op.result = result
391
      not_marked = False
392

    
393

    
394
class _OpExecCallbacks(mcpu.OpExecCbBase):
395
  def __init__(self, queue, job, op):
396
    """Initializes this class.
397

398
    @type queue: L{JobQueue}
399
    @param queue: Job queue
400
    @type job: L{_QueuedJob}
401
    @param job: Job object
402
    @type op: L{_QueuedOpCode}
403
    @param op: OpCode
404

405
    """
406
    assert queue, "Queue is missing"
407
    assert job, "Job is missing"
408
    assert op, "Opcode is missing"
409

    
410
    self._queue = queue
411
    self._job = job
412
    self._op = op
413

    
414
  def NotifyStart(self):
415
    """Mark the opcode as running, not lock-waiting.
416

417
    This is called from the mcpu code as a notifier function, when the LU is
418
    finally about to start the Exec() method. Of course, to have end-user
419
    visible results, the opcode must be initially (before calling into
420
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
421

422
    """
423
    self._queue.acquire()
424
    try:
425
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
426
                                 constants.OP_STATUS_CANCELING)
427

    
428
      # All locks are acquired by now
429
      self._job.lock_status = None
430

    
431
      # Cancel here if we were asked to
432
      if self._op.status == constants.OP_STATUS_CANCELING:
433
        raise CancelJob()
434

    
435
      self._op.status = constants.OP_STATUS_RUNNING
436
      self._op.exec_timestamp = TimeStampNow()
437
    finally:
438
      self._queue.release()
439

    
440
  def Feedback(self, *args):
441
    """Append a log entry.
442

443
    """
444
    assert len(args) < 3
445

    
446
    if len(args) == 1:
447
      log_type = constants.ELOG_MESSAGE
448
      log_msg = args[0]
449
    else:
450
      (log_type, log_msg) = args
451

    
452
    # The time is split to make serialization easier and not lose
453
    # precision.
454
    timestamp = utils.SplitTime(time.time())
455

    
456
    self._queue.acquire()
457
    try:
458
      self._job.log_serial += 1
459
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
460
      self._queue.UpdateJobUnlocked(self._job, replicate=False)
461
    finally:
462
      self._queue.release()
463

    
464
  def ReportLocks(self, msg):
465
    """Write locking information to the job.
466

467
    Called whenever the LU processor is waiting for a lock or has acquired one.
468

469
    """
470
    # Not getting the queue lock because this is a single assignment
471
    self._job.lock_status = msg
472

    
473

    
474
class _WaitForJobChangesHelper(object):
475
  """Helper class using initofy to wait for changes in a job file.
476

477
  This class takes a previous job status and serial, and alerts the client when
478
  the current job status has changed.
479

480
  @type job_id: string
481
  @ivar job_id: id of the job we're watching
482
  @type prev_job_info: string
483
  @ivar prev_job_info: previous job info, as passed by the luxi client
484
  @type prev_log_serial: string
485
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
486
  @type queue: L{JobQueue}
487
  @ivar queue: job queue (used for a few utility functions)
488
  @type job_path: string
489
  @ivar job_path: absolute path of the job file
490
  @type wm: pyinotify.WatchManager (or None)
491
  @ivar wm: inotify watch manager to watch for changes
492
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
493
  @ivar inotify_handler: single file event handler, used for watching
494
  @type notifier: pyinotify.Notifier
495
  @ivar notifier: inotify single-threaded notifier, used for watching
496

497
  """
498

    
499
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
500
    self.job_id = job_id
501
    self.fields = fields
502
    self.prev_job_info = prev_job_info
503
    self.prev_log_serial = prev_log_serial
504
    self.queue = queue
505
    # pylint: disable-msg=W0212
506
    self.job_path = self.queue._GetJobPath(self.job_id)
507
    self.wm = None
508
    self.inotify_handler = None
509
    self.notifier = None
510

    
511
  def _SetupInotify(self):
512
    """Create the inotify
513

514
    @raises errors.InotifyError: if the notifier cannot be setup
515

516
    """
517
    if self.wm:
518
      return
519
    self.wm = pyinotify.WatchManager()
520
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
521
                                                                self.OnInotify,
522
                                                                self.job_path)
523
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
524
    self.inotify_handler.enable()
525

    
526
  def _LoadDiskStatus(self):
527
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
528
    if not job:
529
      raise errors.JobLost()
530
    self.job_status = job.CalcStatus()
531

    
532
    job_info = job.GetInfo(self.fields)
533
    log_entries = job.GetLogEntries(self.prev_log_serial)
534
    # Serializing and deserializing data can cause type changes (e.g. from
535
    # tuple to list) or precision loss. We're doing it here so that we get
536
    # the same modifications as the data received from the client. Without
537
    # this, the comparison afterwards might fail without the data being
538
    # significantly different.
539
    # TODO: we just deserialized from disk, investigate how to make sure that
540
    # the job info and log entries are compatible to avoid this further step.
541
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
542
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
543

    
544
  def _CheckForChanges(self):
545
    self._LoadDiskStatus()
546
    # Don't even try to wait if the job is no longer running, there will be
547
    # no changes.
548
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
549
                                constants.JOB_STATUS_RUNNING,
550
                                constants.JOB_STATUS_WAITLOCK) or
551
        self.prev_job_info != self.job_info or
552
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
553
      logging.debug("Job %s changed", self.job_id)
554
      return (self.job_info, self.log_entries)
555

    
556
    raise utils.RetryAgain()
557

    
558
  def OnInotify(self, notifier_enabled):
559
    if not notifier_enabled:
560
      self.inotify_handler.enable()
561

    
562
  def WaitFn(self, timeout):
563
    self._SetupInotify()
564
    if self.notifier.check_events(timeout*1000):
565
      self.notifier.read_events()
566
    self.notifier.process_events()
567

    
568
  def WaitForChanges(self, timeout):
569
    try:
570
      return utils.Retry(self._CheckForChanges,
571
                         utils.RETRY_REMAINING_TIME,
572
                         timeout,
573
                         wait_fn=self.WaitFn)
574
    except (errors.InotifyError, errors.JobLost):
575
      return None
576
    except utils.RetryTimeout:
577
      return constants.JOB_NOTCHANGED
578

    
579
  def Close(self):
580
    if self.wm:
581
      self.notifier.stop()
582

    
583

    
584
class _JobQueueWorker(workerpool.BaseWorker):
585
  """The actual job workers.
586

587
  """
588
  def RunTask(self, job): # pylint: disable-msg=W0221
589
    """Job executor.
590

591
    This functions processes a job. It is closely tied to the _QueuedJob and
592
    _QueuedOpCode classes.
593

594
    @type job: L{_QueuedJob}
595
    @param job: the job to be processed
596

597
    """
598
    logging.info("Processing job %s", job.id)
599
    proc = mcpu.Processor(self.pool.queue.context, job.id)
600
    queue = job.queue
601
    try:
602
      try:
603
        count = len(job.ops)
604
        for idx, op in enumerate(job.ops):
605
          op_summary = op.input.Summary()
606
          if op.status == constants.OP_STATUS_SUCCESS:
607
            # this is a job that was partially completed before master
608
            # daemon shutdown, so it can be expected that some opcodes
609
            # are already completed successfully (if any did error
610
            # out, then the whole job should have been aborted and not
611
            # resubmitted for processing)
612
            logging.info("Op %s/%s: opcode %s already processed, skipping",
613
                         idx + 1, count, op_summary)
614
            continue
615
          try:
616
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
617
                         op_summary)
618

    
619
            queue.acquire()
620
            try:
621
              if op.status == constants.OP_STATUS_CANCELED:
622
                raise CancelJob()
623
              assert op.status == constants.OP_STATUS_QUEUED
624
              op.status = constants.OP_STATUS_WAITLOCK
625
              op.result = None
626
              op.start_timestamp = TimeStampNow()
627
              if idx == 0: # first opcode
628
                job.start_timestamp = op.start_timestamp
629
              queue.UpdateJobUnlocked(job)
630

    
631
              input_opcode = op.input
632
            finally:
633
              queue.release()
634

    
635
            # Make sure not to hold queue lock while calling ExecOpCode
636
            result = proc.ExecOpCode(input_opcode,
637
                                     _OpExecCallbacks(queue, job, op))
638

    
639
            queue.acquire()
640
            try:
641
              op.status = constants.OP_STATUS_SUCCESS
642
              op.result = result
643
              op.end_timestamp = TimeStampNow()
644
              queue.UpdateJobUnlocked(job)
645
            finally:
646
              queue.release()
647

    
648
            logging.info("Op %s/%s: Successfully finished opcode %s",
649
                         idx + 1, count, op_summary)
650
          except CancelJob:
651
            # Will be handled further up
652
            raise
653
          except Exception, err:
654
            queue.acquire()
655
            try:
656
              try:
657
                op.status = constants.OP_STATUS_ERROR
658
                if isinstance(err, errors.GenericError):
659
                  op.result = errors.EncodeException(err)
660
                else:
661
                  op.result = str(err)
662
                op.end_timestamp = TimeStampNow()
663
                logging.info("Op %s/%s: Error in opcode %s: %s",
664
                             idx + 1, count, op_summary, err)
665
              finally:
666
                queue.UpdateJobUnlocked(job)
667
            finally:
668
              queue.release()
669
            raise
670

    
671
      except CancelJob:
672
        queue.acquire()
673
        try:
674
          queue.CancelJobUnlocked(job)
675
        finally:
676
          queue.release()
677
      except errors.GenericError, err:
678
        logging.exception("Ganeti exception")
679
      except:
680
        logging.exception("Unhandled exception")
681
    finally:
682
      queue.acquire()
683
      try:
684
        try:
685
          job.lock_status = None
686
          job.end_timestamp = TimeStampNow()
687
          queue.UpdateJobUnlocked(job)
688
        finally:
689
          job_id = job.id
690
          status = job.CalcStatus()
691
      finally:
692
        queue.release()
693

    
694
      logging.info("Finished job %s, status = %s", job_id, status)
695

    
696

    
697
class _JobQueueWorkerPool(workerpool.WorkerPool):
698
  """Simple class implementing a job-processing workerpool.
699

700
  """
701
  def __init__(self, queue):
702
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
703
                                              JOBQUEUE_THREADS,
704
                                              _JobQueueWorker)
705
    self.queue = queue
706

    
707

    
708
def _RequireOpenQueue(fn):
709
  """Decorator for "public" functions.
710

711
  This function should be used for all 'public' functions. That is,
712
  functions usually called from other classes. Note that this should
713
  be applied only to methods (not plain functions), since it expects
714
  that the decorated function is called with a first argument that has
715
  a '_queue_filelock' argument.
716

717
  @warning: Use this decorator only after utils.LockedMethod!
718

719
  Example::
720
    @utils.LockedMethod
721
    @_RequireOpenQueue
722
    def Example(self):
723
      pass
724

725
  """
726
  def wrapper(self, *args, **kwargs):
727
    # pylint: disable-msg=W0212
728
    assert self._queue_filelock is not None, "Queue should be open"
729
    return fn(self, *args, **kwargs)
730
  return wrapper
731

    
732

    
733
class JobQueue(object):
734
  """Queue used to manage the jobs.
735

736
  @cvar _RE_JOB_FILE: regex matching the valid job file names
737

738
  """
739
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
740

    
741
  def __init__(self, context):
742
    """Constructor for JobQueue.
743

744
    The constructor will initialize the job queue object and then
745
    start loading the current jobs from disk, either for starting them
746
    (if they were queue) or for aborting them (if they were already
747
    running).
748

749
    @type context: GanetiContext
750
    @param context: the context object for access to the configuration
751
        data and other ganeti objects
752

753
    """
754
    self.context = context
755
    self._memcache = weakref.WeakValueDictionary()
756
    self._my_hostname = utils.HostInfo().name
757

    
758
    # Locking
759
    self._lock = threading.Lock()
760
    self.acquire = self._lock.acquire
761
    self.release = self._lock.release
762

    
763
    # Initialize the queue, and acquire the filelock.
764
    # This ensures no other process is working on the job queue.
765
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
766

    
767
    # Read serial file
768
    self._last_serial = jstore.ReadSerial()
769
    assert self._last_serial is not None, ("Serial file was modified between"
770
                                           " check in jstore and here")
771

    
772
    # Get initial list of nodes
773
    self._nodes = dict((n.name, n.primary_ip)
774
                       for n in self.context.cfg.GetAllNodesInfo().values()
775
                       if n.master_candidate)
776

    
777
    # Remove master node
778
    self._nodes.pop(self._my_hostname, None)
779

    
780
    # TODO: Check consistency across nodes
781

    
782
    self._queue_size = 0
783
    self._UpdateQueueSizeUnlocked()
784
    self._drained = self._IsQueueMarkedDrain()
785

    
786
    # Setup worker pool
787
    self._wpool = _JobQueueWorkerPool(self)
788
    try:
789
      # We need to lock here because WorkerPool.AddTask() may start a job while
790
      # we're still doing our work.
791
      self.acquire()
792
      try:
793
        logging.info("Inspecting job queue")
794

    
795
        all_job_ids = self._GetJobIDsUnlocked()
796
        jobs_count = len(all_job_ids)
797
        lastinfo = time.time()
798
        for idx, job_id in enumerate(all_job_ids):
799
          # Give an update every 1000 jobs or 10 seconds
800
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
801
              idx == (jobs_count - 1)):
802
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
803
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
804
            lastinfo = time.time()
805

    
806
          job = self._LoadJobUnlocked(job_id)
807

    
808
          # a failure in loading the job can cause 'None' to be returned
809
          if job is None:
810
            continue
811

    
812
          status = job.CalcStatus()
813

    
814
          if status in (constants.JOB_STATUS_QUEUED, ):
815
            self._wpool.AddTask(job)
816

    
817
          elif status in (constants.JOB_STATUS_RUNNING,
818
                          constants.JOB_STATUS_WAITLOCK,
819
                          constants.JOB_STATUS_CANCELING):
820
            logging.warning("Unfinished job %s found: %s", job.id, job)
821
            try:
822
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
823
                                    "Unclean master daemon shutdown")
824
            finally:
825
              self.UpdateJobUnlocked(job)
826

    
827
        logging.info("Job queue inspection finished")
828
      finally:
829
        self.release()
830
    except:
831
      self._wpool.TerminateWorkers()
832
      raise
833

    
834
  @utils.LockedMethod
835
  @_RequireOpenQueue
836
  def AddNode(self, node):
837
    """Register a new node with the queue.
838

839
    @type node: L{objects.Node}
840
    @param node: the node object to be added
841

842
    """
843
    node_name = node.name
844
    assert node_name != self._my_hostname
845

    
846
    # Clean queue directory on added node
847
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
848
    msg = result.fail_msg
849
    if msg:
850
      logging.warning("Cannot cleanup queue directory on node %s: %s",
851
                      node_name, msg)
852

    
853
    if not node.master_candidate:
854
      # remove if existing, ignoring errors
855
      self._nodes.pop(node_name, None)
856
      # and skip the replication of the job ids
857
      return
858

    
859
    # Upload the whole queue excluding archived jobs
860
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
861

    
862
    # Upload current serial file
863
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
864

    
865
    for file_name in files:
866
      # Read file content
867
      content = utils.ReadFile(file_name)
868

    
869
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
870
                                                  [node.primary_ip],
871
                                                  file_name, content)
872
      msg = result[node_name].fail_msg
873
      if msg:
874
        logging.error("Failed to upload file %s to node %s: %s",
875
                      file_name, node_name, msg)
876

    
877
    self._nodes[node_name] = node.primary_ip
878

    
879
  @utils.LockedMethod
880
  @_RequireOpenQueue
881
  def RemoveNode(self, node_name):
882
    """Callback called when removing nodes from the cluster.
883

884
    @type node_name: str
885
    @param node_name: the name of the node to remove
886

887
    """
888
    self._nodes.pop(node_name, None)
889

    
890
  @staticmethod
891
  def _CheckRpcResult(result, nodes, failmsg):
892
    """Verifies the status of an RPC call.
893

894
    Since we aim to keep consistency should this node (the current
895
    master) fail, we will log errors if our rpc fail, and especially
896
    log the case when more than half of the nodes fails.
897

898
    @param result: the data as returned from the rpc call
899
    @type nodes: list
900
    @param nodes: the list of nodes we made the call to
901
    @type failmsg: str
902
    @param failmsg: the identifier to be used for logging
903

904
    """
905
    failed = []
906
    success = []
907

    
908
    for node in nodes:
909
      msg = result[node].fail_msg
910
      if msg:
911
        failed.append(node)
912
        logging.error("RPC call %s (%s) failed on node %s: %s",
913
                      result[node].call, failmsg, node, msg)
914
      else:
915
        success.append(node)
916

    
917
    # +1 for the master node
918
    if (len(success) + 1) < len(failed):
919
      # TODO: Handle failing nodes
920
      logging.error("More than half of the nodes failed")
921

    
922
  def _GetNodeIp(self):
923
    """Helper for returning the node name/ip list.
924

925
    @rtype: (list, list)
926
    @return: a tuple of two lists, the first one with the node
927
        names and the second one with the node addresses
928

929
    """
930
    name_list = self._nodes.keys()
931
    addr_list = [self._nodes[name] for name in name_list]
932
    return name_list, addr_list
933

    
934
  def _UpdateJobQueueFile(self, file_name, data, replicate):
935
    """Writes a file locally and then replicates it to all nodes.
936

937
    This function will replace the contents of a file on the local
938
    node and then replicate it to all the other nodes we have.
939

940
    @type file_name: str
941
    @param file_name: the path of the file to be replicated
942
    @type data: str
943
    @param data: the new contents of the file
944
    @type replicate: boolean
945
    @param replicate: whether to spread the changes to the remote nodes
946

947
    """
948
    utils.WriteFile(file_name, data=data)
949

    
950
    if replicate:
951
      names, addrs = self._GetNodeIp()
952
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
953
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
954

    
955
  def _RenameFilesUnlocked(self, rename):
956
    """Renames a file locally and then replicate the change.
957

958
    This function will rename a file in the local queue directory
959
    and then replicate this rename to all the other nodes we have.
960

961
    @type rename: list of (old, new)
962
    @param rename: List containing tuples mapping old to new names
963

964
    """
965
    # Rename them locally
966
    for old, new in rename:
967
      utils.RenameFile(old, new, mkdir=True)
968

    
969
    # ... and on all nodes
970
    names, addrs = self._GetNodeIp()
971
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
972
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
973

    
974
  @staticmethod
975
  def _FormatJobID(job_id):
976
    """Convert a job ID to string format.
977

978
    Currently this just does C{str(job_id)} after performing some
979
    checks, but if we want to change the job id format this will
980
    abstract this change.
981

982
    @type job_id: int or long
983
    @param job_id: the numeric job id
984
    @rtype: str
985
    @return: the formatted job id
986

987
    """
988
    if not isinstance(job_id, (int, long)):
989
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
990
    if job_id < 0:
991
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
992

    
993
    return str(job_id)
994

    
995
  @classmethod
996
  def _GetArchiveDirectory(cls, job_id):
997
    """Returns the archive directory for a job.
998

999
    @type job_id: str
1000
    @param job_id: Job identifier
1001
    @rtype: str
1002
    @return: Directory name
1003

1004
    """
1005
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1006

    
1007
  def _NewSerialsUnlocked(self, count):
1008
    """Generates a new job identifier.
1009

1010
    Job identifiers are unique during the lifetime of a cluster.
1011

1012
    @type count: integer
1013
    @param count: how many serials to return
1014
    @rtype: str
1015
    @return: a string representing the job identifier.
1016

1017
    """
1018
    assert count > 0
1019
    # New number
1020
    serial = self._last_serial + count
1021

    
1022
    # Write to file
1023
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1024
                             "%s\n" % serial, True)
1025

    
1026
    result = [self._FormatJobID(v)
1027
              for v in range(self._last_serial, serial + 1)]
1028
    # Keep it only if we were able to write the file
1029
    self._last_serial = serial
1030

    
1031
    return result
1032

    
1033
  @staticmethod
1034
  def _GetJobPath(job_id):
1035
    """Returns the job file for a given job id.
1036

1037
    @type job_id: str
1038
    @param job_id: the job identifier
1039
    @rtype: str
1040
    @return: the path to the job file
1041

1042
    """
1043
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1044

    
1045
  @classmethod
1046
  def _GetArchivedJobPath(cls, job_id):
1047
    """Returns the archived job file for a give job id.
1048

1049
    @type job_id: str
1050
    @param job_id: the job identifier
1051
    @rtype: str
1052
    @return: the path to the archived job file
1053

1054
    """
1055
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1056
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1057

    
1058
  def _GetJobIDsUnlocked(self, sort=True):
1059
    """Return all known job IDs.
1060

1061
    The method only looks at disk because it's a requirement that all
1062
    jobs are present on disk (so in the _memcache we don't have any
1063
    extra IDs).
1064

1065
    @type sort: boolean
1066
    @param sort: perform sorting on the returned job ids
1067
    @rtype: list
1068
    @return: the list of job IDs
1069

1070
    """
1071
    jlist = []
1072
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1073
      m = self._RE_JOB_FILE.match(filename)
1074
      if m:
1075
        jlist.append(m.group(1))
1076
    if sort:
1077
      jlist = utils.NiceSort(jlist)
1078
    return jlist
1079

    
1080
  def _LoadJobUnlocked(self, job_id):
1081
    """Loads a job from the disk or memory.
1082

1083
    Given a job id, this will return the cached job object if
1084
    existing, or try to load the job from the disk. If loading from
1085
    disk, it will also add the job to the cache.
1086

1087
    @param job_id: the job id
1088
    @rtype: L{_QueuedJob} or None
1089
    @return: either None or the job object
1090

1091
    """
1092
    job = self._memcache.get(job_id, None)
1093
    if job:
1094
      logging.debug("Found job %s in memcache", job_id)
1095
      return job
1096

    
1097
    try:
1098
      job = self._LoadJobFromDisk(job_id)
1099
    except errors.JobFileCorrupted:
1100
      old_path = self._GetJobPath(job_id)
1101
      new_path = self._GetArchivedJobPath(job_id)
1102
      if old_path == new_path:
1103
        # job already archived (future case)
1104
        logging.exception("Can't parse job %s", job_id)
1105
      else:
1106
        # non-archived case
1107
        logging.exception("Can't parse job %s, will archive.", job_id)
1108
        self._RenameFilesUnlocked([(old_path, new_path)])
1109
      return None
1110

    
1111
    self._memcache[job_id] = job
1112
    logging.debug("Added job %s to the cache", job_id)
1113
    return job
1114

    
1115
  def _LoadJobFromDisk(self, job_id):
1116
    """Load the given job file from disk.
1117

1118
    Given a job file, read, load and restore it in a _QueuedJob format.
1119

1120
    @type job_id: string
1121
    @param job_id: job identifier
1122
    @rtype: L{_QueuedJob} or None
1123
    @return: either None or the job object
1124

1125
    """
1126
    filepath = self._GetJobPath(job_id)
1127
    logging.debug("Loading job from %s", filepath)
1128
    try:
1129
      raw_data = utils.ReadFile(filepath)
1130
    except EnvironmentError, err:
1131
      if err.errno in (errno.ENOENT, ):
1132
        return None
1133
      raise
1134

    
1135
    try:
1136
      data = serializer.LoadJson(raw_data)
1137
      job = _QueuedJob.Restore(self, data)
1138
    except Exception, err: # pylint: disable-msg=W0703
1139
      raise errors.JobFileCorrupted(err)
1140

    
1141
    return job
1142

    
1143
  def SafeLoadJobFromDisk(self, job_id):
1144
    """Load the given job file from disk.
1145

1146
    Given a job file, read, load and restore it in a _QueuedJob format.
1147
    In case of error reading the job, it gets returned as None, and the
1148
    exception is logged.
1149

1150
    @type job_id: string
1151
    @param job_id: job identifier
1152
    @rtype: L{_QueuedJob} or None
1153
    @return: either None or the job object
1154

1155
    """
1156
    try:
1157
      return self._LoadJobFromDisk(job_id)
1158
    except (errors.JobFileCorrupted, EnvironmentError):
1159
      logging.exception("Can't load/parse job %s", job_id)
1160
      return None
1161

    
1162
  @staticmethod
1163
  def _IsQueueMarkedDrain():
1164
    """Check if the queue is marked from drain.
1165

1166
    This currently uses the queue drain file, which makes it a
1167
    per-node flag. In the future this can be moved to the config file.
1168

1169
    @rtype: boolean
1170
    @return: True of the job queue is marked for draining
1171

1172
    """
1173
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1174

    
1175
  def _UpdateQueueSizeUnlocked(self):
1176
    """Update the queue size.
1177

1178
    """
1179
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1180

    
1181
  @utils.LockedMethod
1182
  @_RequireOpenQueue
1183
  def SetDrainFlag(self, drain_flag):
1184
    """Sets the drain flag for the queue.
1185

1186
    @type drain_flag: boolean
1187
    @param drain_flag: Whether to set or unset the drain flag
1188

1189
    """
1190
    if drain_flag:
1191
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1192
    else:
1193
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1194

    
1195
    self._drained = drain_flag
1196

    
1197
    return True
1198

    
1199
  @_RequireOpenQueue
1200
  def _SubmitJobUnlocked(self, job_id, ops):
1201
    """Create and store a new job.
1202

1203
    This enters the job into our job queue and also puts it on the new
1204
    queue, in order for it to be picked up by the queue processors.
1205

1206
    @type job_id: job ID
1207
    @param job_id: the job ID for the new job
1208
    @type ops: list
1209
    @param ops: The list of OpCodes that will become the new job.
1210
    @rtype: L{_QueuedJob}
1211
    @return: the job object to be queued
1212
    @raise errors.JobQueueDrainError: if the job queue is marked for draining
1213
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
1214

1215
    """
1216
    # Ok when sharing the big job queue lock, as the drain file is created when
1217
    # the lock is exclusive.
1218
    if self._drained:
1219
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1220

    
1221
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1222
      raise errors.JobQueueFull()
1223

    
1224
    job = _QueuedJob(self, job_id, ops)
1225

    
1226
    # Write to disk
1227
    self.UpdateJobUnlocked(job)
1228

    
1229
    self._queue_size += 1
1230

    
1231
    logging.debug("Adding new job %s to the cache", job_id)
1232
    self._memcache[job_id] = job
1233

    
1234
    return job
1235

    
1236
  @utils.LockedMethod
1237
  @_RequireOpenQueue
1238
  def SubmitJob(self, ops):
1239
    """Create and store a new job.
1240

1241
    @see: L{_SubmitJobUnlocked}
1242

1243
    """
1244
    job_id = self._NewSerialsUnlocked(1)[0]
1245
    self._wpool.AddTask(self._SubmitJobUnlocked(job_id, ops))
1246
    return job_id
1247

    
1248
  @utils.LockedMethod
1249
  @_RequireOpenQueue
1250
  def SubmitManyJobs(self, jobs):
1251
    """Create and store multiple jobs.
1252

1253
    @see: L{_SubmitJobUnlocked}
1254

1255
    """
1256
    results = []
1257
    tasks = []
1258
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1259
    for job_id, ops in zip(all_job_ids, jobs):
1260
      try:
1261
        tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1262
        status = True
1263
        data = job_id
1264
      except errors.GenericError, err:
1265
        data = str(err)
1266
        status = False
1267
      results.append((status, data))
1268
    self._wpool.AddManyTasks(tasks)
1269

    
1270
    return results
1271

    
1272
  @_RequireOpenQueue
1273
  def UpdateJobUnlocked(self, job, replicate=True):
1274
    """Update a job's on disk storage.
1275

1276
    After a job has been modified, this function needs to be called in
1277
    order to write the changes to disk and replicate them to the other
1278
    nodes.
1279

1280
    @type job: L{_QueuedJob}
1281
    @param job: the changed job
1282
    @type replicate: boolean
1283
    @param replicate: whether to replicate the change to remote nodes
1284

1285
    """
1286
    filename = self._GetJobPath(job.id)
1287
    data = serializer.DumpJson(job.Serialize(), indent=False)
1288
    logging.debug("Writing job %s to %s", job.id, filename)
1289
    self._UpdateJobQueueFile(filename, data, replicate)
1290

    
1291
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1292
                        timeout):
1293
    """Waits for changes in a job.
1294

1295
    @type job_id: string
1296
    @param job_id: Job identifier
1297
    @type fields: list of strings
1298
    @param fields: Which fields to check for changes
1299
    @type prev_job_info: list or None
1300
    @param prev_job_info: Last job information returned
1301
    @type prev_log_serial: int
1302
    @param prev_log_serial: Last job message serial number
1303
    @type timeout: float
1304
    @param timeout: maximum time to wait
1305
    @rtype: tuple (job info, log entries)
1306
    @return: a tuple of the job information as required via
1307
        the fields parameter, and the log entries as a list
1308

1309
        if the job has not changed and the timeout has expired,
1310
        we instead return a special value,
1311
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1312
        as such by the clients
1313

1314
    """
1315
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1316
                                      prev_log_serial, self)
1317
    try:
1318
      return helper.WaitForChanges(timeout)
1319
    finally:
1320
      helper.Close()
1321

    
1322
  @utils.LockedMethod
1323
  @_RequireOpenQueue
1324
  def CancelJob(self, job_id):
1325
    """Cancels a job.
1326

1327
    This will only succeed if the job has not started yet.
1328

1329
    @type job_id: string
1330
    @param job_id: job ID of job to be cancelled.
1331

1332
    """
1333
    logging.info("Cancelling job %s", job_id)
1334

    
1335
    job = self._LoadJobUnlocked(job_id)
1336
    if not job:
1337
      logging.debug("Job %s not found", job_id)
1338
      return (False, "Job %s not found" % job_id)
1339

    
1340
    job_status = job.CalcStatus()
1341

    
1342
    if job_status not in (constants.JOB_STATUS_QUEUED,
1343
                          constants.JOB_STATUS_WAITLOCK):
1344
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1345
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1346

    
1347
    if job_status == constants.JOB_STATUS_QUEUED:
1348
      self.CancelJobUnlocked(job)
1349
      return (True, "Job %s canceled" % job.id)
1350

    
1351
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1352
      # The worker will notice the new status and cancel the job
1353
      try:
1354
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1355
      finally:
1356
        self.UpdateJobUnlocked(job)
1357
      return (True, "Job %s will be canceled" % job.id)
1358

    
1359
  @_RequireOpenQueue
1360
  def CancelJobUnlocked(self, job):
1361
    """Marks a job as canceled.
1362

1363
    """
1364
    try:
1365
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1366
                            "Job canceled by request")
1367
    finally:
1368
      self.UpdateJobUnlocked(job)
1369

    
1370
  @_RequireOpenQueue
1371
  def _ArchiveJobsUnlocked(self, jobs):
1372
    """Archives jobs.
1373

1374
    @type jobs: list of L{_QueuedJob}
1375
    @param jobs: Job objects
1376
    @rtype: int
1377
    @return: Number of archived jobs
1378

1379
    """
1380
    archive_jobs = []
1381
    rename_files = []
1382
    for job in jobs:
1383
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1384
                                  constants.JOB_STATUS_SUCCESS,
1385
                                  constants.JOB_STATUS_ERROR):
1386
        logging.debug("Job %s is not yet done", job.id)
1387
        continue
1388

    
1389
      archive_jobs.append(job)
1390

    
1391
      old = self._GetJobPath(job.id)
1392
      new = self._GetArchivedJobPath(job.id)
1393
      rename_files.append((old, new))
1394

    
1395
    # TODO: What if 1..n files fail to rename?
1396
    self._RenameFilesUnlocked(rename_files)
1397

    
1398
    logging.debug("Successfully archived job(s) %s",
1399
                  utils.CommaJoin(job.id for job in archive_jobs))
1400

    
1401
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1402
    # the files, we update the cached queue size from the filesystem. When we
1403
    # get around to fix the TODO: above, we can use the number of actually
1404
    # archived jobs to fix this.
1405
    self._UpdateQueueSizeUnlocked()
1406
    return len(archive_jobs)
1407

    
1408
  @utils.LockedMethod
1409
  @_RequireOpenQueue
1410
  def ArchiveJob(self, job_id):
1411
    """Archives a job.
1412

1413
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1414

1415
    @type job_id: string
1416
    @param job_id: Job ID of job to be archived.
1417
    @rtype: bool
1418
    @return: Whether job was archived
1419

1420
    """
1421
    logging.info("Archiving job %s", job_id)
1422

    
1423
    job = self._LoadJobUnlocked(job_id)
1424
    if not job:
1425
      logging.debug("Job %s not found", job_id)
1426
      return False
1427

    
1428
    return self._ArchiveJobsUnlocked([job]) == 1
1429

    
1430
  @utils.LockedMethod
1431
  @_RequireOpenQueue
1432
  def AutoArchiveJobs(self, age, timeout):
1433
    """Archives all jobs based on age.
1434

1435
    The method will archive all jobs which are older than the age
1436
    parameter. For jobs that don't have an end timestamp, the start
1437
    timestamp will be considered. The special '-1' age will cause
1438
    archival of all jobs (that are not running or queued).
1439

1440
    @type age: int
1441
    @param age: the minimum age in seconds
1442

1443
    """
1444
    logging.info("Archiving jobs with age more than %s seconds", age)
1445

    
1446
    now = time.time()
1447
    end_time = now + timeout
1448
    archived_count = 0
1449
    last_touched = 0
1450

    
1451
    all_job_ids = self._GetJobIDsUnlocked()
1452
    pending = []
1453
    for idx, job_id in enumerate(all_job_ids):
1454
      last_touched = idx + 1
1455

    
1456
      # Not optimal because jobs could be pending
1457
      # TODO: Measure average duration for job archival and take number of
1458
      # pending jobs into account.
1459
      if time.time() > end_time:
1460
        break
1461

    
1462
      # Returns None if the job failed to load
1463
      job = self._LoadJobUnlocked(job_id)
1464
      if job:
1465
        if job.end_timestamp is None:
1466
          if job.start_timestamp is None:
1467
            job_age = job.received_timestamp
1468
          else:
1469
            job_age = job.start_timestamp
1470
        else:
1471
          job_age = job.end_timestamp
1472

    
1473
        if age == -1 or now - job_age[0] > age:
1474
          pending.append(job)
1475

    
1476
          # Archive 10 jobs at a time
1477
          if len(pending) >= 10:
1478
            archived_count += self._ArchiveJobsUnlocked(pending)
1479
            pending = []
1480

    
1481
    if pending:
1482
      archived_count += self._ArchiveJobsUnlocked(pending)
1483

    
1484
    return (archived_count, len(all_job_ids) - last_touched)
1485

    
1486
  def QueryJobs(self, job_ids, fields):
1487
    """Returns a list of jobs in queue.
1488

1489
    @type job_ids: list
1490
    @param job_ids: sequence of job identifiers or None for all
1491
    @type fields: list
1492
    @param fields: names of fields to return
1493
    @rtype: list
1494
    @return: list one element per job, each element being list with
1495
        the requested fields
1496

1497
    """
1498
    jobs = []
1499
    list_all = False
1500
    if not job_ids:
1501
      # Since files are added to/removed from the queue atomically, there's no
1502
      # risk of getting the job ids in an inconsistent state.
1503
      job_ids = self._GetJobIDsUnlocked()
1504
      list_all = True
1505

    
1506
    for job_id in job_ids:
1507
      job = self.SafeLoadJobFromDisk(job_id)
1508
      if job is not None:
1509
        jobs.append(job.GetInfo(fields))
1510
      elif not list_all:
1511
        jobs.append(None)
1512

    
1513
    return jobs
1514

    
1515
  @utils.LockedMethod
1516
  @_RequireOpenQueue
1517
  def Shutdown(self):
1518
    """Stops the job queue.
1519

1520
    This shutdowns all the worker threads an closes the queue.
1521

1522
    """
1523
    self._wpool.TerminateWorkers()
1524

    
1525
    self._queue_filelock.Close()
1526
    self._queue_filelock = None