Removing all ssh setup code from the core
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63 # member lock names to be passed to @ssynchronized decorator
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
66
67
68 class CancelJob(Exception):
69   """Special exception to cancel a job.
70
71   """
72
73
74 def TimeStampNow():
75   """Returns the current timestamp.
76
77   @rtype: tuple
78   @return: the current time in the (seconds, microseconds) format
79
80   """
81   return utils.SplitTime(time.time())
82
83
84 class _QueuedOpCode(object):
85   """Encapsulates an opcode object.
86
87   @ivar log: holds the execution log and consists of tuples
88   of the form C{(log_serial, timestamp, level, message)}
89   @ivar input: the OpCode we encapsulate
90   @ivar status: the current status
91   @ivar result: the result of the LU execution
92   @ivar start_timestamp: timestamp for the start of the execution
93   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94   @ivar stop_timestamp: timestamp for the end of the execution
95
96   """
97   __slots__ = ["input", "status", "result", "log",
98                "start_timestamp", "exec_timestamp", "end_timestamp",
99                "__weakref__"]
100
101   def __init__(self, op):
102     """Constructor for the _QuededOpCode.
103
104     @type op: L{opcodes.OpCode}
105     @param op: the opcode we encapsulate
106
107     """
108     self.input = op
109     self.status = constants.OP_STATUS_QUEUED
110     self.result = None
111     self.log = []
112     self.start_timestamp = None
113     self.exec_timestamp = None
114     self.end_timestamp = None
115
116   @classmethod
117   def Restore(cls, state):
118     """Restore the _QueuedOpCode from the serialized form.
119
120     @type state: dict
121     @param state: the serialized state
122     @rtype: _QueuedOpCode
123     @return: a new _QueuedOpCode instance
124
125     """
126     obj = _QueuedOpCode.__new__(cls)
127     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128     obj.status = state["status"]
129     obj.result = state["result"]
130     obj.log = state["log"]
131     obj.start_timestamp = state.get("start_timestamp", None)
132     obj.exec_timestamp = state.get("exec_timestamp", None)
133     obj.end_timestamp = state.get("end_timestamp", None)
134     return obj
135
136   def Serialize(self):
137     """Serializes this _QueuedOpCode.
138
139     @rtype: dict
140     @return: the dictionary holding the serialized state
141
142     """
143     return {
144       "input": self.input.__getstate__(),
145       "status": self.status,
146       "result": self.result,
147       "log": self.log,
148       "start_timestamp": self.start_timestamp,
149       "exec_timestamp": self.exec_timestamp,
150       "end_timestamp": self.end_timestamp,
151       }
152
153
154 class _QueuedJob(object):
155   """In-memory job representation.
156
157   This is what we use to track the user-submitted jobs. Locking must
158   be taken care of by users of this class.
159
160   @type queue: L{JobQueue}
161   @ivar queue: the parent queue
162   @ivar id: the job ID
163   @type ops: list
164   @ivar ops: the list of _QueuedOpCode that constitute the job
165   @type log_serial: int
166   @ivar log_serial: holds the index for the next log entry
167   @ivar received_timestamp: the timestamp for when the job was received
168   @ivar start_timestmap: the timestamp for start of execution
169   @ivar end_timestamp: the timestamp for end of execution
170   @ivar lock_status: In-memory locking information for debugging
171
172   """
173   # pylint: disable-msg=W0212
174   __slots__ = ["queue", "id", "ops", "log_serial",
175                "received_timestamp", "start_timestamp", "end_timestamp",
176                "lock_status", "change",
177                "__weakref__"]
178
179   def __init__(self, queue, job_id, ops):
180     """Constructor for the _QueuedJob.
181
182     @type queue: L{JobQueue}
183     @param queue: our parent queue
184     @type job_id: job_id
185     @param job_id: our job id
186     @type ops: list
187     @param ops: the list of opcodes we hold, which will be encapsulated
188         in _QueuedOpCodes
189
190     """
191     if not ops:
192       raise errors.GenericError("A job needs at least one opcode")
193
194     self.queue = queue
195     self.id = job_id
196     self.ops = [_QueuedOpCode(op) for op in ops]
197     self.log_serial = 0
198     self.received_timestamp = TimeStampNow()
199     self.start_timestamp = None
200     self.end_timestamp = None
201
202     # In-memory attributes
203     self.lock_status = None
204
205   def __repr__(self):
206     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
207               "id=%s" % self.id,
208               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
209
210     return "<%s at %#x>" % (" ".join(status), id(self))
211
212   @classmethod
213   def Restore(cls, queue, state):
214     """Restore a _QueuedJob from serialized state:
215
216     @type queue: L{JobQueue}
217     @param queue: to which queue the restored job belongs
218     @type state: dict
219     @param state: the serialized state
220     @rtype: _JobQueue
221     @return: the restored _JobQueue instance
222
223     """
224     obj = _QueuedJob.__new__(cls)
225     obj.queue = queue
226     obj.id = state["id"]
227     obj.received_timestamp = state.get("received_timestamp", None)
228     obj.start_timestamp = state.get("start_timestamp", None)
229     obj.end_timestamp = state.get("end_timestamp", None)
230
231     # In-memory attributes
232     obj.lock_status = None
233
234     obj.ops = []
235     obj.log_serial = 0
236     for op_state in state["ops"]:
237       op = _QueuedOpCode.Restore(op_state)
238       for log_entry in op.log:
239         obj.log_serial = max(obj.log_serial, log_entry[0])
240       obj.ops.append(op)
241
242     return obj
243
244   def Serialize(self):
245     """Serialize the _JobQueue instance.
246
247     @rtype: dict
248     @return: the serialized state
249
250     """
251     return {
252       "id": self.id,
253       "ops": [op.Serialize() for op in self.ops],
254       "start_timestamp": self.start_timestamp,
255       "end_timestamp": self.end_timestamp,
256       "received_timestamp": self.received_timestamp,
257       }
258
259   def CalcStatus(self):
260     """Compute the status of this job.
261
262     This function iterates over all the _QueuedOpCodes in the job and
263     based on their status, computes the job status.
264
265     The algorithm is:
266       - if we find a cancelled, or finished with error, the job
267         status will be the same
268       - otherwise, the last opcode with the status one of:
269           - waitlock
270           - canceling
271           - running
272
273         will determine the job status
274
275       - otherwise, it means either all opcodes are queued, or success,
276         and the job status will be the same
277
278     @return: the job status
279
280     """
281     status = constants.JOB_STATUS_QUEUED
282
283     all_success = True
284     for op in self.ops:
285       if op.status == constants.OP_STATUS_SUCCESS:
286         continue
287
288       all_success = False
289
290       if op.status == constants.OP_STATUS_QUEUED:
291         pass
292       elif op.status == constants.OP_STATUS_WAITLOCK:
293         status = constants.JOB_STATUS_WAITLOCK
294       elif op.status == constants.OP_STATUS_RUNNING:
295         status = constants.JOB_STATUS_RUNNING
296       elif op.status == constants.OP_STATUS_CANCELING:
297         status = constants.JOB_STATUS_CANCELING
298         break
299       elif op.status == constants.OP_STATUS_ERROR:
300         status = constants.JOB_STATUS_ERROR
301         # The whole job fails if one opcode failed
302         break
303       elif op.status == constants.OP_STATUS_CANCELED:
304         status = constants.OP_STATUS_CANCELED
305         break
306
307     if all_success:
308       status = constants.JOB_STATUS_SUCCESS
309
310     return status
311
312   def GetLogEntries(self, newer_than):
313     """Selectively returns the log entries.
314
315     @type newer_than: None or int
316     @param newer_than: if this is None, return all log entries,
317         otherwise return only the log entries with serial higher
318         than this value
319     @rtype: list
320     @return: the list of the log entries selected
321
322     """
323     if newer_than is None:
324       serial = -1
325     else:
326       serial = newer_than
327
328     entries = []
329     for op in self.ops:
330       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
331
332     return entries
333
334   def GetInfo(self, fields):
335     """Returns information about a job.
336
337     @type fields: list
338     @param fields: names of fields to return
339     @rtype: list
340     @return: list with one element for each field
341     @raise errors.OpExecError: when an invalid field
342         has been passed
343
344     """
345     row = []
346     for fname in fields:
347       if fname == "id":
348         row.append(self.id)
349       elif fname == "status":
350         row.append(self.CalcStatus())
351       elif fname == "ops":
352         row.append([op.input.__getstate__() for op in self.ops])
353       elif fname == "opresult":
354         row.append([op.result for op in self.ops])
355       elif fname == "opstatus":
356         row.append([op.status for op in self.ops])
357       elif fname == "oplog":
358         row.append([op.log for op in self.ops])
359       elif fname == "opstart":
360         row.append([op.start_timestamp for op in self.ops])
361       elif fname == "opexec":
362         row.append([op.exec_timestamp for op in self.ops])
363       elif fname == "opend":
364         row.append([op.end_timestamp for op in self.ops])
365       elif fname == "received_ts":
366         row.append(self.received_timestamp)
367       elif fname == "start_ts":
368         row.append(self.start_timestamp)
369       elif fname == "end_ts":
370         row.append(self.end_timestamp)
371       elif fname == "lock_status":
372         row.append(self.lock_status)
373       elif fname == "summary":
374         row.append([op.input.Summary() for op in self.ops])
375       else:
376         raise errors.OpExecError("Invalid self query field '%s'" % fname)
377     return row
378
379   def MarkUnfinishedOps(self, status, result):
380     """Mark unfinished opcodes with a given status and result.
381
382     This is an utility function for marking all running or waiting to
383     be run opcodes with a given status. Opcodes which are already
384     finalised are not changed.
385
386     @param status: a given opcode status
387     @param result: the opcode result
388
389     """
390     try:
391       not_marked = True
392       for op in self.ops:
393         if op.status in constants.OPS_FINALIZED:
394           assert not_marked, "Finalized opcodes found after non-finalized ones"
395           continue
396         op.status = status
397         op.result = result
398         not_marked = False
399     finally:
400       self.queue.UpdateJobUnlocked(self)
401
402
403 class _OpExecCallbacks(mcpu.OpExecCbBase):
404   def __init__(self, queue, job, op):
405     """Initializes this class.
406
407     @type queue: L{JobQueue}
408     @param queue: Job queue
409     @type job: L{_QueuedJob}
410     @param job: Job object
411     @type op: L{_QueuedOpCode}
412     @param op: OpCode
413
414     """
415     assert queue, "Queue is missing"
416     assert job, "Job is missing"
417     assert op, "Opcode is missing"
418
419     self._queue = queue
420     self._job = job
421     self._op = op
422
423   def _CheckCancel(self):
424     """Raises an exception to cancel the job if asked to.
425
426     """
427     # Cancel here if we were asked to
428     if self._op.status == constants.OP_STATUS_CANCELING:
429       logging.debug("Canceling opcode")
430       raise CancelJob()
431
432   @locking.ssynchronized(_QUEUE, shared=1)
433   def NotifyStart(self):
434     """Mark the opcode as running, not lock-waiting.
435
436     This is called from the mcpu code as a notifier function, when the LU is
437     finally about to start the Exec() method. Of course, to have end-user
438     visible results, the opcode must be initially (before calling into
439     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
440
441     """
442     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
443                                constants.OP_STATUS_CANCELING)
444
445     # All locks are acquired by now
446     self._job.lock_status = None
447
448     # Cancel here if we were asked to
449     self._CheckCancel()
450
451     logging.debug("Opcode is now running")
452     self._op.status = constants.OP_STATUS_RUNNING
453     self._op.exec_timestamp = TimeStampNow()
454
455     # And finally replicate the job status
456     self._queue.UpdateJobUnlocked(self._job)
457
458   @locking.ssynchronized(_QUEUE, shared=1)
459   def _AppendFeedback(self, timestamp, log_type, log_msg):
460     """Internal feedback append function, with locks
461
462     """
463     self._job.log_serial += 1
464     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
465     self._queue.UpdateJobUnlocked(self._job, replicate=False)
466
467   def Feedback(self, *args):
468     """Append a log entry.
469
470     """
471     assert len(args) < 3
472
473     if len(args) == 1:
474       log_type = constants.ELOG_MESSAGE
475       log_msg = args[0]
476     else:
477       (log_type, log_msg) = args
478
479     # The time is split to make serialization easier and not lose
480     # precision.
481     timestamp = utils.SplitTime(time.time())
482     self._AppendFeedback(timestamp, log_type, log_msg)
483
484   def ReportLocks(self, msg):
485     """Write locking information to the job.
486
487     Called whenever the LU processor is waiting for a lock or has acquired one.
488
489     """
490     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
491                                constants.OP_STATUS_CANCELING)
492
493     # Not getting the queue lock because this is a single assignment
494     self._job.lock_status = msg
495
496     # Cancel here if we were asked to
497     self._CheckCancel()
498
499
500 class _JobChangesChecker(object):
501   def __init__(self, fields, prev_job_info, prev_log_serial):
502     """Initializes this class.
503
504     @type fields: list of strings
505     @param fields: Fields requested by LUXI client
506     @type prev_job_info: string
507     @param prev_job_info: previous job info, as passed by the LUXI client
508     @type prev_log_serial: string
509     @param prev_log_serial: previous job serial, as passed by the LUXI client
510
511     """
512     self._fields = fields
513     self._prev_job_info = prev_job_info
514     self._prev_log_serial = prev_log_serial
515
516   def __call__(self, job):
517     """Checks whether job has changed.
518
519     @type job: L{_QueuedJob}
520     @param job: Job object
521
522     """
523     status = job.CalcStatus()
524     job_info = job.GetInfo(self._fields)
525     log_entries = job.GetLogEntries(self._prev_log_serial)
526
527     # Serializing and deserializing data can cause type changes (e.g. from
528     # tuple to list) or precision loss. We're doing it here so that we get
529     # the same modifications as the data received from the client. Without
530     # this, the comparison afterwards might fail without the data being
531     # significantly different.
532     # TODO: we just deserialized from disk, investigate how to make sure that
533     # the job info and log entries are compatible to avoid this further step.
534     # TODO: Doing something like in testutils.py:UnifyValueType might be more
535     # efficient, though floats will be tricky
536     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
537     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
538
539     # Don't even try to wait if the job is no longer running, there will be
540     # no changes.
541     if (status not in (constants.JOB_STATUS_QUEUED,
542                        constants.JOB_STATUS_RUNNING,
543                        constants.JOB_STATUS_WAITLOCK) or
544         job_info != self._prev_job_info or
545         (log_entries and self._prev_log_serial != log_entries[0][0])):
546       logging.debug("Job %s changed", job.id)
547       return (job_info, log_entries)
548
549     return None
550
551
552 class _JobFileChangesWaiter(object):
553   def __init__(self, filename):
554     """Initializes this class.
555
556     @type filename: string
557     @param filename: Path to job file
558     @raises errors.InotifyError: if the notifier cannot be setup
559
560     """
561     self._wm = pyinotify.WatchManager()
562     self._inotify_handler = \
563       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
564     self._notifier = \
565       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
566     try:
567       self._inotify_handler.enable()
568     except Exception:
569       # pyinotify doesn't close file descriptors automatically
570       self._notifier.stop()
571       raise
572
573   def _OnInotify(self, notifier_enabled):
574     """Callback for inotify.
575
576     """
577     if not notifier_enabled:
578       self._inotify_handler.enable()
579
580   def Wait(self, timeout):
581     """Waits for the job file to change.
582
583     @type timeout: float
584     @param timeout: Timeout in seconds
585     @return: Whether there have been events
586
587     """
588     assert timeout >= 0
589     have_events = self._notifier.check_events(timeout * 1000)
590     if have_events:
591       self._notifier.read_events()
592     self._notifier.process_events()
593     return have_events
594
595   def Close(self):
596     """Closes underlying notifier and its file descriptor.
597
598     """
599     self._notifier.stop()
600
601
602 class _JobChangesWaiter(object):
603   def __init__(self, filename):
604     """Initializes this class.
605
606     @type filename: string
607     @param filename: Path to job file
608
609     """
610     self._filewaiter = None
611     self._filename = filename
612
613   def Wait(self, timeout):
614     """Waits for a job to change.
615
616     @type timeout: float
617     @param timeout: Timeout in seconds
618     @return: Whether there have been events
619
620     """
621     if self._filewaiter:
622       return self._filewaiter.Wait(timeout)
623
624     # Lazy setup: Avoid inotify setup cost when job file has already changed.
625     # If this point is reached, return immediately and let caller check the job
626     # file again in case there were changes since the last check. This avoids a
627     # race condition.
628     self._filewaiter = _JobFileChangesWaiter(self._filename)
629
630     return True
631
632   def Close(self):
633     """Closes underlying waiter.
634
635     """
636     if self._filewaiter:
637       self._filewaiter.Close()
638
639
640 class _WaitForJobChangesHelper(object):
641   """Helper class using inotify to wait for changes in a job file.
642
643   This class takes a previous job status and serial, and alerts the client when
644   the current job status has changed.
645
646   """
647   @staticmethod
648   def _CheckForChanges(job_load_fn, check_fn):
649     job = job_load_fn()
650     if not job:
651       raise errors.JobLost()
652
653     result = check_fn(job)
654     if result is None:
655       raise utils.RetryAgain()
656
657     return result
658
659   def __call__(self, filename, job_load_fn,
660                fields, prev_job_info, prev_log_serial, timeout):
661     """Waits for changes on a job.
662
663     @type filename: string
664     @param filename: File on which to wait for changes
665     @type job_load_fn: callable
666     @param job_load_fn: Function to load job
667     @type fields: list of strings
668     @param fields: Which fields to check for changes
669     @type prev_job_info: list or None
670     @param prev_job_info: Last job information returned
671     @type prev_log_serial: int
672     @param prev_log_serial: Last job message serial number
673     @type timeout: float
674     @param timeout: maximum time to wait in seconds
675
676     """
677     try:
678       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
679       waiter = _JobChangesWaiter(filename)
680       try:
681         return utils.Retry(compat.partial(self._CheckForChanges,
682                                           job_load_fn, check_fn),
683                            utils.RETRY_REMAINING_TIME, timeout,
684                            wait_fn=waiter.Wait)
685       finally:
686         waiter.Close()
687     except (errors.InotifyError, errors.JobLost):
688       return None
689     except utils.RetryTimeout:
690       return constants.JOB_NOTCHANGED
691
692
693 class _JobQueueWorker(workerpool.BaseWorker):
694   """The actual job workers.
695
696   """
697   def RunTask(self, job): # pylint: disable-msg=W0221
698     """Job executor.
699
700     This functions processes a job. It is closely tied to the _QueuedJob and
701     _QueuedOpCode classes.
702
703     @type job: L{_QueuedJob}
704     @param job: the job to be processed
705
706     """
707     logging.info("Processing job %s", job.id)
708     proc = mcpu.Processor(self.pool.queue.context, job.id)
709     queue = job.queue
710     try:
711       try:
712         count = len(job.ops)
713         for idx, op in enumerate(job.ops):
714           op_summary = op.input.Summary()
715           if op.status == constants.OP_STATUS_SUCCESS:
716             # this is a job that was partially completed before master
717             # daemon shutdown, so it can be expected that some opcodes
718             # are already completed successfully (if any did error
719             # out, then the whole job should have been aborted and not
720             # resubmitted for processing)
721             logging.info("Op %s/%s: opcode %s already processed, skipping",
722                          idx + 1, count, op_summary)
723             continue
724           try:
725             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726                          op_summary)
727
728             queue.acquire(shared=1)
729             try:
730               if op.status == constants.OP_STATUS_CANCELED:
731                 logging.debug("Canceling opcode")
732                 raise CancelJob()
733               assert op.status == constants.OP_STATUS_QUEUED
734               logging.debug("Opcode %s/%s waiting for locks",
735                             idx + 1, count)
736               op.status = constants.OP_STATUS_WAITLOCK
737               op.result = None
738               op.start_timestamp = TimeStampNow()
739               if idx == 0: # first opcode
740                 job.start_timestamp = op.start_timestamp
741               queue.UpdateJobUnlocked(job)
742
743               input_opcode = op.input
744             finally:
745               queue.release()
746
747             # Make sure not to hold queue lock while calling ExecOpCode
748             result = proc.ExecOpCode(input_opcode,
749                                      _OpExecCallbacks(queue, job, op))
750
751             queue.acquire(shared=1)
752             try:
753               logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754               op.status = constants.OP_STATUS_SUCCESS
755               op.result = result
756               op.end_timestamp = TimeStampNow()
757               if idx == count - 1:
758                 job.lock_status = None
759                 job.end_timestamp = TimeStampNow()
760
761                 # Consistency check
762                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
763                                   for i in job.ops)
764
765               queue.UpdateJobUnlocked(job)
766             finally:
767               queue.release()
768
769             logging.info("Op %s/%s: Successfully finished opcode %s",
770                          idx + 1, count, op_summary)
771           except CancelJob:
772             # Will be handled further up
773             raise
774           except Exception, err:
775             queue.acquire(shared=1)
776             try:
777               try:
778                 logging.debug("Opcode %s/%s failed", idx + 1, count)
779                 op.status = constants.OP_STATUS_ERROR
780                 if isinstance(err, errors.GenericError):
781                   to_encode = err
782                 else:
783                   to_encode = errors.OpExecError(str(err))
784                 op.result = errors.EncodeException(to_encode)
785                 op.end_timestamp = TimeStampNow()
786                 logging.info("Op %s/%s: Error in opcode %s: %s",
787                              idx + 1, count, op_summary, err)
788
789                 to_encode = errors.OpExecError("Preceding opcode failed")
790                 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
791                                       errors.EncodeException(to_encode))
792
793                 # Consistency check
794                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
795                                   for i in job.ops[:idx])
796                 assert compat.all(i.status == constants.OP_STATUS_ERROR and
797                                   errors.GetEncodedError(i.result)
798                                   for i in job.ops[idx:])
799               finally:
800                 job.lock_status = None
801                 job.end_timestamp = TimeStampNow()
802                 queue.UpdateJobUnlocked(job)
803             finally:
804               queue.release()
805             raise
806
807       except CancelJob:
808         queue.acquire(shared=1)
809         try:
810           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
811                                 "Job canceled by request")
812           job.lock_status = None
813           job.end_timestamp = TimeStampNow()
814           queue.UpdateJobUnlocked(job)
815         finally:
816           queue.release()
817       except errors.GenericError, err:
818         logging.exception("Ganeti exception")
819       except:
820         logging.exception("Unhandled exception")
821     finally:
822       status = job.CalcStatus()
823       logging.info("Finished job %s, status = %s", job.id, status)
824
825
826 class _JobQueueWorkerPool(workerpool.WorkerPool):
827   """Simple class implementing a job-processing workerpool.
828
829   """
830   def __init__(self, queue):
831     super(_JobQueueWorkerPool, self).__init__("JobQueue",
832                                               JOBQUEUE_THREADS,
833                                               _JobQueueWorker)
834     self.queue = queue
835
836
837 def _RequireOpenQueue(fn):
838   """Decorator for "public" functions.
839
840   This function should be used for all 'public' functions. That is,
841   functions usually called from other classes. Note that this should
842   be applied only to methods (not plain functions), since it expects
843   that the decorated function is called with a first argument that has
844   a '_queue_filelock' argument.
845
846   @warning: Use this decorator only after locking.ssynchronized
847
848   Example::
849     @locking.ssynchronized(_LOCK)
850     @_RequireOpenQueue
851     def Example(self):
852       pass
853
854   """
855   def wrapper(self, *args, **kwargs):
856     # pylint: disable-msg=W0212
857     assert self._queue_filelock is not None, "Queue should be open"
858     return fn(self, *args, **kwargs)
859   return wrapper
860
861
862 class JobQueue(object):
863   """Queue used to manage the jobs.
864
865   @cvar _RE_JOB_FILE: regex matching the valid job file names
866
867   """
868   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
869
870   def __init__(self, context):
871     """Constructor for JobQueue.
872
873     The constructor will initialize the job queue object and then
874     start loading the current jobs from disk, either for starting them
875     (if they were queue) or for aborting them (if they were already
876     running).
877
878     @type context: GanetiContext
879     @param context: the context object for access to the configuration
880         data and other ganeti objects
881
882     """
883     self.context = context
884     self._memcache = weakref.WeakValueDictionary()
885     self._my_hostname = netutils.Hostname.GetSysName()
886
887     # The Big JobQueue lock. If a code block or method acquires it in shared
888     # mode safe it must guarantee concurrency with all the code acquiring it in
889     # shared mode, including itself. In order not to acquire it at all
890     # concurrency must be guaranteed with all code acquiring it in shared mode
891     # and all code acquiring it exclusively.
892     self._lock = locking.SharedLock("JobQueue")
893
894     self.acquire = self._lock.acquire
895     self.release = self._lock.release
896
897     # Initialize the queue, and acquire the filelock.
898     # This ensures no other process is working on the job queue.
899     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
900
901     # Read serial file
902     self._last_serial = jstore.ReadSerial()
903     assert self._last_serial is not None, ("Serial file was modified between"
904                                            " check in jstore and here")
905
906     # Get initial list of nodes
907     self._nodes = dict((n.name, n.primary_ip)
908                        for n in self.context.cfg.GetAllNodesInfo().values()
909                        if n.master_candidate)
910
911     # Remove master node
912     self._nodes.pop(self._my_hostname, None)
913
914     # TODO: Check consistency across nodes
915
916     self._queue_size = 0
917     self._UpdateQueueSizeUnlocked()
918     self._drained = self._IsQueueMarkedDrain()
919
920     # Setup worker pool
921     self._wpool = _JobQueueWorkerPool(self)
922     try:
923       # We need to lock here because WorkerPool.AddTask() may start a job while
924       # we're still doing our work.
925       self.acquire()
926       try:
927         logging.info("Inspecting job queue")
928
929         all_job_ids = self._GetJobIDsUnlocked()
930         jobs_count = len(all_job_ids)
931         lastinfo = time.time()
932         for idx, job_id in enumerate(all_job_ids):
933           # Give an update every 1000 jobs or 10 seconds
934           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
935               idx == (jobs_count - 1)):
936             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
937                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
938             lastinfo = time.time()
939
940           job = self._LoadJobUnlocked(job_id)
941
942           # a failure in loading the job can cause 'None' to be returned
943           if job is None:
944             continue
945
946           status = job.CalcStatus()
947
948           if status in (constants.JOB_STATUS_QUEUED, ):
949             self._wpool.AddTask((job, ))
950
951           elif status in (constants.JOB_STATUS_RUNNING,
952                           constants.JOB_STATUS_WAITLOCK,
953                           constants.JOB_STATUS_CANCELING):
954             logging.warning("Unfinished job %s found: %s", job.id, job)
955             job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
956                                   "Unclean master daemon shutdown")
957
958         logging.info("Job queue inspection finished")
959       finally:
960         self.release()
961     except:
962       self._wpool.TerminateWorkers()
963       raise
964
965   @locking.ssynchronized(_LOCK)
966   @_RequireOpenQueue
967   def AddNode(self, node):
968     """Register a new node with the queue.
969
970     @type node: L{objects.Node}
971     @param node: the node object to be added
972
973     """
974     node_name = node.name
975     assert node_name != self._my_hostname
976
977     # Clean queue directory on added node
978     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
979     msg = result.fail_msg
980     if msg:
981       logging.warning("Cannot cleanup queue directory on node %s: %s",
982                       node_name, msg)
983
984     if not node.master_candidate:
985       # remove if existing, ignoring errors
986       self._nodes.pop(node_name, None)
987       # and skip the replication of the job ids
988       return
989
990     # Upload the whole queue excluding archived jobs
991     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
992
993     # Upload current serial file
994     files.append(constants.JOB_QUEUE_SERIAL_FILE)
995
996     for file_name in files:
997       # Read file content
998       content = utils.ReadFile(file_name)
999
1000       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1001                                                   [node.primary_ip],
1002                                                   file_name, content)
1003       msg = result[node_name].fail_msg
1004       if msg:
1005         logging.error("Failed to upload file %s to node %s: %s",
1006                       file_name, node_name, msg)
1007
1008     self._nodes[node_name] = node.primary_ip
1009
1010   @locking.ssynchronized(_LOCK)
1011   @_RequireOpenQueue
1012   def RemoveNode(self, node_name):
1013     """Callback called when removing nodes from the cluster.
1014
1015     @type node_name: str
1016     @param node_name: the name of the node to remove
1017
1018     """
1019     self._nodes.pop(node_name, None)
1020
1021   @staticmethod
1022   def _CheckRpcResult(result, nodes, failmsg):
1023     """Verifies the status of an RPC call.
1024
1025     Since we aim to keep consistency should this node (the current
1026     master) fail, we will log errors if our rpc fail, and especially
1027     log the case when more than half of the nodes fails.
1028
1029     @param result: the data as returned from the rpc call
1030     @type nodes: list
1031     @param nodes: the list of nodes we made the call to
1032     @type failmsg: str
1033     @param failmsg: the identifier to be used for logging
1034
1035     """
1036     failed = []
1037     success = []
1038
1039     for node in nodes:
1040       msg = result[node].fail_msg
1041       if msg:
1042         failed.append(node)
1043         logging.error("RPC call %s (%s) failed on node %s: %s",
1044                       result[node].call, failmsg, node, msg)
1045       else:
1046         success.append(node)
1047
1048     # +1 for the master node
1049     if (len(success) + 1) < len(failed):
1050       # TODO: Handle failing nodes
1051       logging.error("More than half of the nodes failed")
1052
1053   def _GetNodeIp(self):
1054     """Helper for returning the node name/ip list.
1055
1056     @rtype: (list, list)
1057     @return: a tuple of two lists, the first one with the node
1058         names and the second one with the node addresses
1059
1060     """
1061     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1062     name_list = self._nodes.keys()
1063     addr_list = [self._nodes[name] for name in name_list]
1064     return name_list, addr_list
1065
1066   def _UpdateJobQueueFile(self, file_name, data, replicate):
1067     """Writes a file locally and then replicates it to all nodes.
1068
1069     This function will replace the contents of a file on the local
1070     node and then replicate it to all the other nodes we have.
1071
1072     @type file_name: str
1073     @param file_name: the path of the file to be replicated
1074     @type data: str
1075     @param data: the new contents of the file
1076     @type replicate: boolean
1077     @param replicate: whether to spread the changes to the remote nodes
1078
1079     """
1080     utils.WriteFile(file_name, data=data)
1081
1082     if replicate:
1083       names, addrs = self._GetNodeIp()
1084       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1085       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1086
1087   def _RenameFilesUnlocked(self, rename):
1088     """Renames a file locally and then replicate the change.
1089
1090     This function will rename a file in the local queue directory
1091     and then replicate this rename to all the other nodes we have.
1092
1093     @type rename: list of (old, new)
1094     @param rename: List containing tuples mapping old to new names
1095
1096     """
1097     # Rename them locally
1098     for old, new in rename:
1099       utils.RenameFile(old, new, mkdir=True)
1100
1101     # ... and on all nodes
1102     names, addrs = self._GetNodeIp()
1103     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1104     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1105
1106   @staticmethod
1107   def _FormatJobID(job_id):
1108     """Convert a job ID to string format.
1109
1110     Currently this just does C{str(job_id)} after performing some
1111     checks, but if we want to change the job id format this will
1112     abstract this change.
1113
1114     @type job_id: int or long
1115     @param job_id: the numeric job id
1116     @rtype: str
1117     @return: the formatted job id
1118
1119     """
1120     if not isinstance(job_id, (int, long)):
1121       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1122     if job_id < 0:
1123       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1124
1125     return str(job_id)
1126
1127   @classmethod
1128   def _GetArchiveDirectory(cls, job_id):
1129     """Returns the archive directory for a job.
1130
1131     @type job_id: str
1132     @param job_id: Job identifier
1133     @rtype: str
1134     @return: Directory name
1135
1136     """
1137     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1138
1139   def _NewSerialsUnlocked(self, count):
1140     """Generates a new job identifier.
1141
1142     Job identifiers are unique during the lifetime of a cluster.
1143
1144     @type count: integer
1145     @param count: how many serials to return
1146     @rtype: str
1147     @return: a string representing the job identifier.
1148
1149     """
1150     assert count > 0
1151     # New number
1152     serial = self._last_serial + count
1153
1154     # Write to file
1155     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1156                              "%s\n" % serial, True)
1157
1158     result = [self._FormatJobID(v)
1159               for v in range(self._last_serial, serial + 1)]
1160     # Keep it only if we were able to write the file
1161     self._last_serial = serial
1162
1163     return result
1164
1165   @staticmethod
1166   def _GetJobPath(job_id):
1167     """Returns the job file for a given job id.
1168
1169     @type job_id: str
1170     @param job_id: the job identifier
1171     @rtype: str
1172     @return: the path to the job file
1173
1174     """
1175     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1176
1177   @classmethod
1178   def _GetArchivedJobPath(cls, job_id):
1179     """Returns the archived job file for a give job id.
1180
1181     @type job_id: str
1182     @param job_id: the job identifier
1183     @rtype: str
1184     @return: the path to the archived job file
1185
1186     """
1187     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1188                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1189
1190   def _GetJobIDsUnlocked(self, sort=True):
1191     """Return all known job IDs.
1192
1193     The method only looks at disk because it's a requirement that all
1194     jobs are present on disk (so in the _memcache we don't have any
1195     extra IDs).
1196
1197     @type sort: boolean
1198     @param sort: perform sorting on the returned job ids
1199     @rtype: list
1200     @return: the list of job IDs
1201
1202     """
1203     jlist = []
1204     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1205       m = self._RE_JOB_FILE.match(filename)
1206       if m:
1207         jlist.append(m.group(1))
1208     if sort:
1209       jlist = utils.NiceSort(jlist)
1210     return jlist
1211
1212   def _LoadJobUnlocked(self, job_id):
1213     """Loads a job from the disk or memory.
1214
1215     Given a job id, this will return the cached job object if
1216     existing, or try to load the job from the disk. If loading from
1217     disk, it will also add the job to the cache.
1218
1219     @param job_id: the job id
1220     @rtype: L{_QueuedJob} or None
1221     @return: either None or the job object
1222
1223     """
1224     job = self._memcache.get(job_id, None)
1225     if job:
1226       logging.debug("Found job %s in memcache", job_id)
1227       return job
1228
1229     try:
1230       job = self._LoadJobFromDisk(job_id)
1231       if job is None:
1232         return job
1233     except errors.JobFileCorrupted:
1234       old_path = self._GetJobPath(job_id)
1235       new_path = self._GetArchivedJobPath(job_id)
1236       if old_path == new_path:
1237         # job already archived (future case)
1238         logging.exception("Can't parse job %s", job_id)
1239       else:
1240         # non-archived case
1241         logging.exception("Can't parse job %s, will archive.", job_id)
1242         self._RenameFilesUnlocked([(old_path, new_path)])
1243       return None
1244
1245     self._memcache[job_id] = job
1246     logging.debug("Added job %s to the cache", job_id)
1247     return job
1248
1249   def _LoadJobFromDisk(self, job_id):
1250     """Load the given job file from disk.
1251
1252     Given a job file, read, load and restore it in a _QueuedJob format.
1253
1254     @type job_id: string
1255     @param job_id: job identifier
1256     @rtype: L{_QueuedJob} or None
1257     @return: either None or the job object
1258
1259     """
1260     filepath = self._GetJobPath(job_id)
1261     logging.debug("Loading job from %s", filepath)
1262     try:
1263       raw_data = utils.ReadFile(filepath)
1264     except EnvironmentError, err:
1265       if err.errno in (errno.ENOENT, ):
1266         return None
1267       raise
1268
1269     try:
1270       data = serializer.LoadJson(raw_data)
1271       job = _QueuedJob.Restore(self, data)
1272     except Exception, err: # pylint: disable-msg=W0703
1273       raise errors.JobFileCorrupted(err)
1274
1275     return job
1276
1277   def SafeLoadJobFromDisk(self, job_id):
1278     """Load the given job file from disk.
1279
1280     Given a job file, read, load and restore it in a _QueuedJob format.
1281     In case of error reading the job, it gets returned as None, and the
1282     exception is logged.
1283
1284     @type job_id: string
1285     @param job_id: job identifier
1286     @rtype: L{_QueuedJob} or None
1287     @return: either None or the job object
1288
1289     """
1290     try:
1291       return self._LoadJobFromDisk(job_id)
1292     except (errors.JobFileCorrupted, EnvironmentError):
1293       logging.exception("Can't load/parse job %s", job_id)
1294       return None
1295
1296   @staticmethod
1297   def _IsQueueMarkedDrain():
1298     """Check if the queue is marked from drain.
1299
1300     This currently uses the queue drain file, which makes it a
1301     per-node flag. In the future this can be moved to the config file.
1302
1303     @rtype: boolean
1304     @return: True of the job queue is marked for draining
1305
1306     """
1307     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1308
1309   def _UpdateQueueSizeUnlocked(self):
1310     """Update the queue size.
1311
1312     """
1313     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1314
1315   @locking.ssynchronized(_LOCK)
1316   @_RequireOpenQueue
1317   def SetDrainFlag(self, drain_flag):
1318     """Sets the drain flag for the queue.
1319
1320     @type drain_flag: boolean
1321     @param drain_flag: Whether to set or unset the drain flag
1322
1323     """
1324     if drain_flag:
1325       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1326     else:
1327       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1328
1329     self._drained = drain_flag
1330
1331     return True
1332
1333   @_RequireOpenQueue
1334   def _SubmitJobUnlocked(self, job_id, ops):
1335     """Create and store a new job.
1336
1337     This enters the job into our job queue and also puts it on the new
1338     queue, in order for it to be picked up by the queue processors.
1339
1340     @type job_id: job ID
1341     @param job_id: the job ID for the new job
1342     @type ops: list
1343     @param ops: The list of OpCodes that will become the new job.
1344     @rtype: L{_QueuedJob}
1345     @return: the job object to be queued
1346     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1347     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1348
1349     """
1350     # Ok when sharing the big job queue lock, as the drain file is created when
1351     # the lock is exclusive.
1352     if self._drained:
1353       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1354
1355     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1356       raise errors.JobQueueFull()
1357
1358     job = _QueuedJob(self, job_id, ops)
1359
1360     # Write to disk
1361     self.UpdateJobUnlocked(job)
1362
1363     self._queue_size += 1
1364
1365     logging.debug("Adding new job %s to the cache", job_id)
1366     self._memcache[job_id] = job
1367
1368     return job
1369
1370   @locking.ssynchronized(_LOCK)
1371   @_RequireOpenQueue
1372   def SubmitJob(self, ops):
1373     """Create and store a new job.
1374
1375     @see: L{_SubmitJobUnlocked}
1376
1377     """
1378     job_id = self._NewSerialsUnlocked(1)[0]
1379     self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1380     return job_id
1381
1382   @locking.ssynchronized(_LOCK)
1383   @_RequireOpenQueue
1384   def SubmitManyJobs(self, jobs):
1385     """Create and store multiple jobs.
1386
1387     @see: L{_SubmitJobUnlocked}
1388
1389     """
1390     results = []
1391     tasks = []
1392     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1393     for job_id, ops in zip(all_job_ids, jobs):
1394       try:
1395         tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1396         status = True
1397         data = job_id
1398       except errors.GenericError, err:
1399         data = str(err)
1400         status = False
1401       results.append((status, data))
1402     self._wpool.AddManyTasks(tasks)
1403
1404     return results
1405
1406   @_RequireOpenQueue
1407   def UpdateJobUnlocked(self, job, replicate=True):
1408     """Update a job's on disk storage.
1409
1410     After a job has been modified, this function needs to be called in
1411     order to write the changes to disk and replicate them to the other
1412     nodes.
1413
1414     @type job: L{_QueuedJob}
1415     @param job: the changed job
1416     @type replicate: boolean
1417     @param replicate: whether to replicate the change to remote nodes
1418
1419     """
1420     filename = self._GetJobPath(job.id)
1421     data = serializer.DumpJson(job.Serialize(), indent=False)
1422     logging.debug("Writing job %s to %s", job.id, filename)
1423     self._UpdateJobQueueFile(filename, data, replicate)
1424
1425   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1426                         timeout):
1427     """Waits for changes in a job.
1428
1429     @type job_id: string
1430     @param job_id: Job identifier
1431     @type fields: list of strings
1432     @param fields: Which fields to check for changes
1433     @type prev_job_info: list or None
1434     @param prev_job_info: Last job information returned
1435     @type prev_log_serial: int
1436     @param prev_log_serial: Last job message serial number
1437     @type timeout: float
1438     @param timeout: maximum time to wait in seconds
1439     @rtype: tuple (job info, log entries)
1440     @return: a tuple of the job information as required via
1441         the fields parameter, and the log entries as a list
1442
1443         if the job has not changed and the timeout has expired,
1444         we instead return a special value,
1445         L{constants.JOB_NOTCHANGED}, which should be interpreted
1446         as such by the clients
1447
1448     """
1449     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1450
1451     helper = _WaitForJobChangesHelper()
1452
1453     return helper(self._GetJobPath(job_id), load_fn,
1454                   fields, prev_job_info, prev_log_serial, timeout)
1455
1456   @locking.ssynchronized(_LOCK)
1457   @_RequireOpenQueue
1458   def CancelJob(self, job_id):
1459     """Cancels a job.
1460
1461     This will only succeed if the job has not started yet.
1462
1463     @type job_id: string
1464     @param job_id: job ID of job to be cancelled.
1465
1466     """
1467     logging.info("Cancelling job %s", job_id)
1468
1469     job = self._LoadJobUnlocked(job_id)
1470     if not job:
1471       logging.debug("Job %s not found", job_id)
1472       return (False, "Job %s not found" % job_id)
1473
1474     job_status = job.CalcStatus()
1475
1476     if job_status not in (constants.JOB_STATUS_QUEUED,
1477                           constants.JOB_STATUS_WAITLOCK):
1478       logging.debug("Job %s is no longer waiting in the queue", job.id)
1479       return (False, "Job %s is no longer waiting in the queue" % job.id)
1480
1481     if job_status == constants.JOB_STATUS_QUEUED:
1482       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1483                             "Job canceled by request")
1484       return (True, "Job %s canceled" % job.id)
1485
1486     elif job_status == constants.JOB_STATUS_WAITLOCK:
1487       # The worker will notice the new status and cancel the job
1488       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1489       return (True, "Job %s will be canceled" % job.id)
1490
1491   @_RequireOpenQueue
1492   def _ArchiveJobsUnlocked(self, jobs):
1493     """Archives jobs.
1494
1495     @type jobs: list of L{_QueuedJob}
1496     @param jobs: Job objects
1497     @rtype: int
1498     @return: Number of archived jobs
1499
1500     """
1501     archive_jobs = []
1502     rename_files = []
1503     for job in jobs:
1504       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1505         logging.debug("Job %s is not yet done", job.id)
1506         continue
1507
1508       archive_jobs.append(job)
1509
1510       old = self._GetJobPath(job.id)
1511       new = self._GetArchivedJobPath(job.id)
1512       rename_files.append((old, new))
1513
1514     # TODO: What if 1..n files fail to rename?
1515     self._RenameFilesUnlocked(rename_files)
1516
1517     logging.debug("Successfully archived job(s) %s",
1518                   utils.CommaJoin(job.id for job in archive_jobs))
1519
1520     # Since we haven't quite checked, above, if we succeeded or failed renaming
1521     # the files, we update the cached queue size from the filesystem. When we
1522     # get around to fix the TODO: above, we can use the number of actually
1523     # archived jobs to fix this.
1524     self._UpdateQueueSizeUnlocked()
1525     return len(archive_jobs)
1526
1527   @locking.ssynchronized(_LOCK)
1528   @_RequireOpenQueue
1529   def ArchiveJob(self, job_id):
1530     """Archives a job.
1531
1532     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1533
1534     @type job_id: string
1535     @param job_id: Job ID of job to be archived.
1536     @rtype: bool
1537     @return: Whether job was archived
1538
1539     """
1540     logging.info("Archiving job %s", job_id)
1541
1542     job = self._LoadJobUnlocked(job_id)
1543     if not job:
1544       logging.debug("Job %s not found", job_id)
1545       return False
1546
1547     return self._ArchiveJobsUnlocked([job]) == 1
1548
1549   @locking.ssynchronized(_LOCK)
1550   @_RequireOpenQueue
1551   def AutoArchiveJobs(self, age, timeout):
1552     """Archives all jobs based on age.
1553
1554     The method will archive all jobs which are older than the age
1555     parameter. For jobs that don't have an end timestamp, the start
1556     timestamp will be considered. The special '-1' age will cause
1557     archival of all jobs (that are not running or queued).
1558
1559     @type age: int
1560     @param age: the minimum age in seconds
1561
1562     """
1563     logging.info("Archiving jobs with age more than %s seconds", age)
1564
1565     now = time.time()
1566     end_time = now + timeout
1567     archived_count = 0
1568     last_touched = 0
1569
1570     all_job_ids = self._GetJobIDsUnlocked()
1571     pending = []
1572     for idx, job_id in enumerate(all_job_ids):
1573       last_touched = idx + 1
1574
1575       # Not optimal because jobs could be pending
1576       # TODO: Measure average duration for job archival and take number of
1577       # pending jobs into account.
1578       if time.time() > end_time:
1579         break
1580
1581       # Returns None if the job failed to load
1582       job = self._LoadJobUnlocked(job_id)
1583       if job:
1584         if job.end_timestamp is None:
1585           if job.start_timestamp is None:
1586             job_age = job.received_timestamp
1587           else:
1588             job_age = job.start_timestamp
1589         else:
1590           job_age = job.end_timestamp
1591
1592         if age == -1 or now - job_age[0] > age:
1593           pending.append(job)
1594
1595           # Archive 10 jobs at a time
1596           if len(pending) >= 10:
1597             archived_count += self._ArchiveJobsUnlocked(pending)
1598             pending = []
1599
1600     if pending:
1601       archived_count += self._ArchiveJobsUnlocked(pending)
1602
1603     return (archived_count, len(all_job_ids) - last_touched)
1604
1605   def QueryJobs(self, job_ids, fields):
1606     """Returns a list of jobs in queue.
1607
1608     @type job_ids: list
1609     @param job_ids: sequence of job identifiers or None for all
1610     @type fields: list
1611     @param fields: names of fields to return
1612     @rtype: list
1613     @return: list one element per job, each element being list with
1614         the requested fields
1615
1616     """
1617     jobs = []
1618     list_all = False
1619     if not job_ids:
1620       # Since files are added to/removed from the queue atomically, there's no
1621       # risk of getting the job ids in an inconsistent state.
1622       job_ids = self._GetJobIDsUnlocked()
1623       list_all = True
1624
1625     for job_id in job_ids:
1626       job = self.SafeLoadJobFromDisk(job_id)
1627       if job is not None:
1628         jobs.append(job.GetInfo(fields))
1629       elif not list_all:
1630         jobs.append(None)
1631
1632     return jobs
1633
1634   @locking.ssynchronized(_LOCK)
1635   @_RequireOpenQueue
1636   def Shutdown(self):
1637     """Stops the job queue.
1638
1639     This shutdowns all the worker threads an closes the queue.
1640
1641     """
1642     self._wpool.TerminateWorkers()
1643
1644     self._queue_filelock.Close()
1645     self._queue_filelock = None