e2a9baa1e41e03a737fc989e26bb9121a09d0b3a
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63 # member lock names to be passed to @ssynchronized decorator
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
66
67
68 class CancelJob(Exception):
69   """Special exception to cancel a job.
70
71   """
72
73
74 def TimeStampNow():
75   """Returns the current timestamp.
76
77   @rtype: tuple
78   @return: the current time in the (seconds, microseconds) format
79
80   """
81   return utils.SplitTime(time.time())
82
83
84 class _QueuedOpCode(object):
85   """Encapsulates an opcode object.
86
87   @ivar log: holds the execution log and consists of tuples
88   of the form C{(log_serial, timestamp, level, message)}
89   @ivar input: the OpCode we encapsulate
90   @ivar status: the current status
91   @ivar result: the result of the LU execution
92   @ivar start_timestamp: timestamp for the start of the execution
93   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94   @ivar stop_timestamp: timestamp for the end of the execution
95
96   """
97   __slots__ = ["input", "status", "result", "log",
98                "start_timestamp", "exec_timestamp", "end_timestamp",
99                "__weakref__"]
100
101   def __init__(self, op):
102     """Constructor for the _QuededOpCode.
103
104     @type op: L{opcodes.OpCode}
105     @param op: the opcode we encapsulate
106
107     """
108     self.input = op
109     self.status = constants.OP_STATUS_QUEUED
110     self.result = None
111     self.log = []
112     self.start_timestamp = None
113     self.exec_timestamp = None
114     self.end_timestamp = None
115
116   @classmethod
117   def Restore(cls, state):
118     """Restore the _QueuedOpCode from the serialized form.
119
120     @type state: dict
121     @param state: the serialized state
122     @rtype: _QueuedOpCode
123     @return: a new _QueuedOpCode instance
124
125     """
126     obj = _QueuedOpCode.__new__(cls)
127     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
128     obj.status = state["status"]
129     obj.result = state["result"]
130     obj.log = state["log"]
131     obj.start_timestamp = state.get("start_timestamp", None)
132     obj.exec_timestamp = state.get("exec_timestamp", None)
133     obj.end_timestamp = state.get("end_timestamp", None)
134     return obj
135
136   def Serialize(self):
137     """Serializes this _QueuedOpCode.
138
139     @rtype: dict
140     @return: the dictionary holding the serialized state
141
142     """
143     return {
144       "input": self.input.__getstate__(),
145       "status": self.status,
146       "result": self.result,
147       "log": self.log,
148       "start_timestamp": self.start_timestamp,
149       "exec_timestamp": self.exec_timestamp,
150       "end_timestamp": self.end_timestamp,
151       }
152
153
154 class _QueuedJob(object):
155   """In-memory job representation.
156
157   This is what we use to track the user-submitted jobs. Locking must
158   be taken care of by users of this class.
159
160   @type queue: L{JobQueue}
161   @ivar queue: the parent queue
162   @ivar id: the job ID
163   @type ops: list
164   @ivar ops: the list of _QueuedOpCode that constitute the job
165   @type log_serial: int
166   @ivar log_serial: holds the index for the next log entry
167   @ivar received_timestamp: the timestamp for when the job was received
168   @ivar start_timestmap: the timestamp for start of execution
169   @ivar end_timestamp: the timestamp for end of execution
170
171   """
172   # pylint: disable-msg=W0212
173   __slots__ = ["queue", "id", "ops", "log_serial",
174                "received_timestamp", "start_timestamp", "end_timestamp",
175                "__weakref__"]
176
177   def __init__(self, queue, job_id, ops):
178     """Constructor for the _QueuedJob.
179
180     @type queue: L{JobQueue}
181     @param queue: our parent queue
182     @type job_id: job_id
183     @param job_id: our job id
184     @type ops: list
185     @param ops: the list of opcodes we hold, which will be encapsulated
186         in _QueuedOpCodes
187
188     """
189     if not ops:
190       raise errors.GenericError("A job needs at least one opcode")
191
192     self.queue = queue
193     self.id = job_id
194     self.ops = [_QueuedOpCode(op) for op in ops]
195     self.log_serial = 0
196     self.received_timestamp = TimeStampNow()
197     self.start_timestamp = None
198     self.end_timestamp = None
199
200   def __repr__(self):
201     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
202               "id=%s" % self.id,
203               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
204
205     return "<%s at %#x>" % (" ".join(status), id(self))
206
207   @classmethod
208   def Restore(cls, queue, state):
209     """Restore a _QueuedJob from serialized state:
210
211     @type queue: L{JobQueue}
212     @param queue: to which queue the restored job belongs
213     @type state: dict
214     @param state: the serialized state
215     @rtype: _JobQueue
216     @return: the restored _JobQueue instance
217
218     """
219     obj = _QueuedJob.__new__(cls)
220     obj.queue = queue
221     obj.id = state["id"]
222     obj.received_timestamp = state.get("received_timestamp", None)
223     obj.start_timestamp = state.get("start_timestamp", None)
224     obj.end_timestamp = state.get("end_timestamp", None)
225
226     obj.ops = []
227     obj.log_serial = 0
228     for op_state in state["ops"]:
229       op = _QueuedOpCode.Restore(op_state)
230       for log_entry in op.log:
231         obj.log_serial = max(obj.log_serial, log_entry[0])
232       obj.ops.append(op)
233
234     return obj
235
236   def Serialize(self):
237     """Serialize the _JobQueue instance.
238
239     @rtype: dict
240     @return: the serialized state
241
242     """
243     return {
244       "id": self.id,
245       "ops": [op.Serialize() for op in self.ops],
246       "start_timestamp": self.start_timestamp,
247       "end_timestamp": self.end_timestamp,
248       "received_timestamp": self.received_timestamp,
249       }
250
251   def CalcStatus(self):
252     """Compute the status of this job.
253
254     This function iterates over all the _QueuedOpCodes in the job and
255     based on their status, computes the job status.
256
257     The algorithm is:
258       - if we find a cancelled, or finished with error, the job
259         status will be the same
260       - otherwise, the last opcode with the status one of:
261           - waitlock
262           - canceling
263           - running
264
265         will determine the job status
266
267       - otherwise, it means either all opcodes are queued, or success,
268         and the job status will be the same
269
270     @return: the job status
271
272     """
273     status = constants.JOB_STATUS_QUEUED
274
275     all_success = True
276     for op in self.ops:
277       if op.status == constants.OP_STATUS_SUCCESS:
278         continue
279
280       all_success = False
281
282       if op.status == constants.OP_STATUS_QUEUED:
283         pass
284       elif op.status == constants.OP_STATUS_WAITLOCK:
285         status = constants.JOB_STATUS_WAITLOCK
286       elif op.status == constants.OP_STATUS_RUNNING:
287         status = constants.JOB_STATUS_RUNNING
288       elif op.status == constants.OP_STATUS_CANCELING:
289         status = constants.JOB_STATUS_CANCELING
290         break
291       elif op.status == constants.OP_STATUS_ERROR:
292         status = constants.JOB_STATUS_ERROR
293         # The whole job fails if one opcode failed
294         break
295       elif op.status == constants.OP_STATUS_CANCELED:
296         status = constants.OP_STATUS_CANCELED
297         break
298
299     if all_success:
300       status = constants.JOB_STATUS_SUCCESS
301
302     return status
303
304   def GetLogEntries(self, newer_than):
305     """Selectively returns the log entries.
306
307     @type newer_than: None or int
308     @param newer_than: if this is None, return all log entries,
309         otherwise return only the log entries with serial higher
310         than this value
311     @rtype: list
312     @return: the list of the log entries selected
313
314     """
315     if newer_than is None:
316       serial = -1
317     else:
318       serial = newer_than
319
320     entries = []
321     for op in self.ops:
322       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
323
324     return entries
325
326   def GetInfo(self, fields):
327     """Returns information about a job.
328
329     @type fields: list
330     @param fields: names of fields to return
331     @rtype: list
332     @return: list with one element for each field
333     @raise errors.OpExecError: when an invalid field
334         has been passed
335
336     """
337     row = []
338     for fname in fields:
339       if fname == "id":
340         row.append(self.id)
341       elif fname == "status":
342         row.append(self.CalcStatus())
343       elif fname == "ops":
344         row.append([op.input.__getstate__() for op in self.ops])
345       elif fname == "opresult":
346         row.append([op.result for op in self.ops])
347       elif fname == "opstatus":
348         row.append([op.status for op in self.ops])
349       elif fname == "oplog":
350         row.append([op.log for op in self.ops])
351       elif fname == "opstart":
352         row.append([op.start_timestamp for op in self.ops])
353       elif fname == "opexec":
354         row.append([op.exec_timestamp for op in self.ops])
355       elif fname == "opend":
356         row.append([op.end_timestamp for op in self.ops])
357       elif fname == "received_ts":
358         row.append(self.received_timestamp)
359       elif fname == "start_ts":
360         row.append(self.start_timestamp)
361       elif fname == "end_ts":
362         row.append(self.end_timestamp)
363       elif fname == "summary":
364         row.append([op.input.Summary() for op in self.ops])
365       else:
366         raise errors.OpExecError("Invalid self query field '%s'" % fname)
367     return row
368
369   def MarkUnfinishedOps(self, status, result):
370     """Mark unfinished opcodes with a given status and result.
371
372     This is an utility function for marking all running or waiting to
373     be run opcodes with a given status. Opcodes which are already
374     finalised are not changed.
375
376     @param status: a given opcode status
377     @param result: the opcode result
378
379     """
380     try:
381       not_marked = True
382       for op in self.ops:
383         if op.status in constants.OPS_FINALIZED:
384           assert not_marked, "Finalized opcodes found after non-finalized ones"
385           continue
386         op.status = status
387         op.result = result
388         not_marked = False
389     finally:
390       self.queue.UpdateJobUnlocked(self)
391
392
393 class _OpExecCallbacks(mcpu.OpExecCbBase):
394   def __init__(self, queue, job, op):
395     """Initializes this class.
396
397     @type queue: L{JobQueue}
398     @param queue: Job queue
399     @type job: L{_QueuedJob}
400     @param job: Job object
401     @type op: L{_QueuedOpCode}
402     @param op: OpCode
403
404     """
405     assert queue, "Queue is missing"
406     assert job, "Job is missing"
407     assert op, "Opcode is missing"
408
409     self._queue = queue
410     self._job = job
411     self._op = op
412
413   def _CheckCancel(self):
414     """Raises an exception to cancel the job if asked to.
415
416     """
417     # Cancel here if we were asked to
418     if self._op.status == constants.OP_STATUS_CANCELING:
419       logging.debug("Canceling opcode")
420       raise CancelJob()
421
422   @locking.ssynchronized(_QUEUE, shared=1)
423   def NotifyStart(self):
424     """Mark the opcode as running, not lock-waiting.
425
426     This is called from the mcpu code as a notifier function, when the LU is
427     finally about to start the Exec() method. Of course, to have end-user
428     visible results, the opcode must be initially (before calling into
429     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
430
431     """
432     assert self._op in self._job.ops
433     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
434                                constants.OP_STATUS_CANCELING)
435
436     # Cancel here if we were asked to
437     self._CheckCancel()
438
439     logging.debug("Opcode is now running")
440
441     self._op.status = constants.OP_STATUS_RUNNING
442     self._op.exec_timestamp = TimeStampNow()
443
444     # And finally replicate the job status
445     self._queue.UpdateJobUnlocked(self._job)
446
447   @locking.ssynchronized(_QUEUE, shared=1)
448   def _AppendFeedback(self, timestamp, log_type, log_msg):
449     """Internal feedback append function, with locks
450
451     """
452     self._job.log_serial += 1
453     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
454     self._queue.UpdateJobUnlocked(self._job, replicate=False)
455
456   def Feedback(self, *args):
457     """Append a log entry.
458
459     """
460     assert len(args) < 3
461
462     if len(args) == 1:
463       log_type = constants.ELOG_MESSAGE
464       log_msg = args[0]
465     else:
466       (log_type, log_msg) = args
467
468     # The time is split to make serialization easier and not lose
469     # precision.
470     timestamp = utils.SplitTime(time.time())
471     self._AppendFeedback(timestamp, log_type, log_msg)
472
473   def ReportLocks(self, msg):
474     """Write locking information to the job.
475
476     Called whenever the LU processor is waiting for a lock or has acquired one.
477
478     """
479     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
480                                constants.OP_STATUS_CANCELING)
481
482     # Cancel here if we were asked to
483     self._CheckCancel()
484
485
486 class _JobChangesChecker(object):
487   def __init__(self, fields, prev_job_info, prev_log_serial):
488     """Initializes this class.
489
490     @type fields: list of strings
491     @param fields: Fields requested by LUXI client
492     @type prev_job_info: string
493     @param prev_job_info: previous job info, as passed by the LUXI client
494     @type prev_log_serial: string
495     @param prev_log_serial: previous job serial, as passed by the LUXI client
496
497     """
498     self._fields = fields
499     self._prev_job_info = prev_job_info
500     self._prev_log_serial = prev_log_serial
501
502   def __call__(self, job):
503     """Checks whether job has changed.
504
505     @type job: L{_QueuedJob}
506     @param job: Job object
507
508     """
509     status = job.CalcStatus()
510     job_info = job.GetInfo(self._fields)
511     log_entries = job.GetLogEntries(self._prev_log_serial)
512
513     # Serializing and deserializing data can cause type changes (e.g. from
514     # tuple to list) or precision loss. We're doing it here so that we get
515     # the same modifications as the data received from the client. Without
516     # this, the comparison afterwards might fail without the data being
517     # significantly different.
518     # TODO: we just deserialized from disk, investigate how to make sure that
519     # the job info and log entries are compatible to avoid this further step.
520     # TODO: Doing something like in testutils.py:UnifyValueType might be more
521     # efficient, though floats will be tricky
522     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
523     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
524
525     # Don't even try to wait if the job is no longer running, there will be
526     # no changes.
527     if (status not in (constants.JOB_STATUS_QUEUED,
528                        constants.JOB_STATUS_RUNNING,
529                        constants.JOB_STATUS_WAITLOCK) or
530         job_info != self._prev_job_info or
531         (log_entries and self._prev_log_serial != log_entries[0][0])):
532       logging.debug("Job %s changed", job.id)
533       return (job_info, log_entries)
534
535     return None
536
537
538 class _JobFileChangesWaiter(object):
539   def __init__(self, filename):
540     """Initializes this class.
541
542     @type filename: string
543     @param filename: Path to job file
544     @raises errors.InotifyError: if the notifier cannot be setup
545
546     """
547     self._wm = pyinotify.WatchManager()
548     self._inotify_handler = \
549       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
550     self._notifier = \
551       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
552     try:
553       self._inotify_handler.enable()
554     except Exception:
555       # pyinotify doesn't close file descriptors automatically
556       self._notifier.stop()
557       raise
558
559   def _OnInotify(self, notifier_enabled):
560     """Callback for inotify.
561
562     """
563     if not notifier_enabled:
564       self._inotify_handler.enable()
565
566   def Wait(self, timeout):
567     """Waits for the job file to change.
568
569     @type timeout: float
570     @param timeout: Timeout in seconds
571     @return: Whether there have been events
572
573     """
574     assert timeout >= 0
575     have_events = self._notifier.check_events(timeout * 1000)
576     if have_events:
577       self._notifier.read_events()
578     self._notifier.process_events()
579     return have_events
580
581   def Close(self):
582     """Closes underlying notifier and its file descriptor.
583
584     """
585     self._notifier.stop()
586
587
588 class _JobChangesWaiter(object):
589   def __init__(self, filename):
590     """Initializes this class.
591
592     @type filename: string
593     @param filename: Path to job file
594
595     """
596     self._filewaiter = None
597     self._filename = filename
598
599   def Wait(self, timeout):
600     """Waits for a job to change.
601
602     @type timeout: float
603     @param timeout: Timeout in seconds
604     @return: Whether there have been events
605
606     """
607     if self._filewaiter:
608       return self._filewaiter.Wait(timeout)
609
610     # Lazy setup: Avoid inotify setup cost when job file has already changed.
611     # If this point is reached, return immediately and let caller check the job
612     # file again in case there were changes since the last check. This avoids a
613     # race condition.
614     self._filewaiter = _JobFileChangesWaiter(self._filename)
615
616     return True
617
618   def Close(self):
619     """Closes underlying waiter.
620
621     """
622     if self._filewaiter:
623       self._filewaiter.Close()
624
625
626 class _WaitForJobChangesHelper(object):
627   """Helper class using inotify to wait for changes in a job file.
628
629   This class takes a previous job status and serial, and alerts the client when
630   the current job status has changed.
631
632   """
633   @staticmethod
634   def _CheckForChanges(job_load_fn, check_fn):
635     job = job_load_fn()
636     if not job:
637       raise errors.JobLost()
638
639     result = check_fn(job)
640     if result is None:
641       raise utils.RetryAgain()
642
643     return result
644
645   def __call__(self, filename, job_load_fn,
646                fields, prev_job_info, prev_log_serial, timeout):
647     """Waits for changes on a job.
648
649     @type filename: string
650     @param filename: File on which to wait for changes
651     @type job_load_fn: callable
652     @param job_load_fn: Function to load job
653     @type fields: list of strings
654     @param fields: Which fields to check for changes
655     @type prev_job_info: list or None
656     @param prev_job_info: Last job information returned
657     @type prev_log_serial: int
658     @param prev_log_serial: Last job message serial number
659     @type timeout: float
660     @param timeout: maximum time to wait in seconds
661
662     """
663     try:
664       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
665       waiter = _JobChangesWaiter(filename)
666       try:
667         return utils.Retry(compat.partial(self._CheckForChanges,
668                                           job_load_fn, check_fn),
669                            utils.RETRY_REMAINING_TIME, timeout,
670                            wait_fn=waiter.Wait)
671       finally:
672         waiter.Close()
673     except (errors.InotifyError, errors.JobLost):
674       return None
675     except utils.RetryTimeout:
676       return constants.JOB_NOTCHANGED
677
678
679 def _EncodeOpError(err):
680   """Encodes an error which occurred while processing an opcode.
681
682   """
683   if isinstance(err, errors.GenericError):
684     to_encode = err
685   else:
686     to_encode = errors.OpExecError(str(err))
687
688   return errors.EncodeException(to_encode)
689
690
691 class _JobQueueWorker(workerpool.BaseWorker):
692   """The actual job workers.
693
694   """
695   def RunTask(self, job): # pylint: disable-msg=W0221
696     """Job executor.
697
698     This functions processes a job. It is closely tied to the _QueuedJob and
699     _QueuedOpCode classes.
700
701     @type job: L{_QueuedJob}
702     @param job: the job to be processed
703
704     """
705     self.SetTaskName("Job%s" % job.id)
706
707     logging.info("Processing job %s", job.id)
708     proc = mcpu.Processor(self.pool.queue.context, job.id)
709     queue = job.queue
710     try:
711       try:
712         count = len(job.ops)
713         for idx, op in enumerate(job.ops):
714           op_summary = op.input.Summary()
715           if op.status == constants.OP_STATUS_SUCCESS:
716             # this is a job that was partially completed before master
717             # daemon shutdown, so it can be expected that some opcodes
718             # are already completed successfully (if any did error
719             # out, then the whole job should have been aborted and not
720             # resubmitted for processing)
721             logging.info("Op %s/%s: opcode %s already processed, skipping",
722                          idx + 1, count, op_summary)
723             continue
724           try:
725             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
726                          op_summary)
727
728             queue.acquire(shared=1)
729             try:
730               if op.status == constants.OP_STATUS_CANCELED:
731                 logging.debug("Canceling opcode")
732                 raise CancelJob()
733               assert op.status == constants.OP_STATUS_QUEUED
734               logging.debug("Opcode %s/%s waiting for locks",
735                             idx + 1, count)
736               op.status = constants.OP_STATUS_WAITLOCK
737               op.result = None
738               op.start_timestamp = TimeStampNow()
739               if idx == 0: # first opcode
740                 job.start_timestamp = op.start_timestamp
741               queue.UpdateJobUnlocked(job)
742
743               input_opcode = op.input
744             finally:
745               queue.release()
746
747             # Make sure not to hold queue lock while calling ExecOpCode
748             result = proc.ExecOpCode(input_opcode,
749                                      _OpExecCallbacks(queue, job, op))
750
751             queue.acquire(shared=1)
752             try:
753               logging.debug("Opcode %s/%s succeeded", idx + 1, count)
754               op.status = constants.OP_STATUS_SUCCESS
755               op.result = result
756               op.end_timestamp = TimeStampNow()
757               if idx == count - 1:
758                 job.end_timestamp = TimeStampNow()
759
760                 # Consistency check
761                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
762                                   for i in job.ops)
763
764               queue.UpdateJobUnlocked(job)
765             finally:
766               queue.release()
767
768             logging.info("Op %s/%s: Successfully finished opcode %s",
769                          idx + 1, count, op_summary)
770           except CancelJob:
771             # Will be handled further up
772             raise
773           except Exception, err:
774             queue.acquire(shared=1)
775             try:
776               try:
777                 logging.debug("Opcode %s/%s failed", idx + 1, count)
778                 op.status = constants.OP_STATUS_ERROR
779                 op.result = _EncodeOpError(err)
780                 op.end_timestamp = TimeStampNow()
781                 logging.info("Op %s/%s: Error in opcode %s: %s",
782                              idx + 1, count, op_summary, err)
783
784                 to_encode = errors.OpExecError("Preceding opcode failed")
785                 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
786                                       _EncodeOpError(to_encode))
787
788                 # Consistency check
789                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
790                                   for i in job.ops[:idx])
791                 assert compat.all(i.status == constants.OP_STATUS_ERROR and
792                                   errors.GetEncodedError(i.result)
793                                   for i in job.ops[idx:])
794               finally:
795                 job.end_timestamp = TimeStampNow()
796                 queue.UpdateJobUnlocked(job)
797             finally:
798               queue.release()
799             raise
800
801       except CancelJob:
802         queue.acquire(shared=1)
803         try:
804           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
805                                 "Job canceled by request")
806           job.end_timestamp = TimeStampNow()
807           queue.UpdateJobUnlocked(job)
808         finally:
809           queue.release()
810       except errors.GenericError, err:
811         logging.exception("Ganeti exception")
812       except:
813         logging.exception("Unhandled exception")
814     finally:
815       status = job.CalcStatus()
816       logging.info("Finished job %s, status = %s", job.id, status)
817
818
819 class _JobQueueWorkerPool(workerpool.WorkerPool):
820   """Simple class implementing a job-processing workerpool.
821
822   """
823   def __init__(self, queue):
824     super(_JobQueueWorkerPool, self).__init__("JobQueue",
825                                               JOBQUEUE_THREADS,
826                                               _JobQueueWorker)
827     self.queue = queue
828
829
830 def _RequireOpenQueue(fn):
831   """Decorator for "public" functions.
832
833   This function should be used for all 'public' functions. That is,
834   functions usually called from other classes. Note that this should
835   be applied only to methods (not plain functions), since it expects
836   that the decorated function is called with a first argument that has
837   a '_queue_filelock' argument.
838
839   @warning: Use this decorator only after locking.ssynchronized
840
841   Example::
842     @locking.ssynchronized(_LOCK)
843     @_RequireOpenQueue
844     def Example(self):
845       pass
846
847   """
848   def wrapper(self, *args, **kwargs):
849     # pylint: disable-msg=W0212
850     assert self._queue_filelock is not None, "Queue should be open"
851     return fn(self, *args, **kwargs)
852   return wrapper
853
854
855 class JobQueue(object):
856   """Queue used to manage the jobs.
857
858   @cvar _RE_JOB_FILE: regex matching the valid job file names
859
860   """
861   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
862
863   def __init__(self, context):
864     """Constructor for JobQueue.
865
866     The constructor will initialize the job queue object and then
867     start loading the current jobs from disk, either for starting them
868     (if they were queue) or for aborting them (if they were already
869     running).
870
871     @type context: GanetiContext
872     @param context: the context object for access to the configuration
873         data and other ganeti objects
874
875     """
876     self.context = context
877     self._memcache = weakref.WeakValueDictionary()
878     self._my_hostname = netutils.HostInfo().name
879
880     # The Big JobQueue lock. If a code block or method acquires it in shared
881     # mode safe it must guarantee concurrency with all the code acquiring it in
882     # shared mode, including itself. In order not to acquire it at all
883     # concurrency must be guaranteed with all code acquiring it in shared mode
884     # and all code acquiring it exclusively.
885     self._lock = locking.SharedLock("JobQueue")
886
887     self.acquire = self._lock.acquire
888     self.release = self._lock.release
889
890     # Initialize the queue, and acquire the filelock.
891     # This ensures no other process is working on the job queue.
892     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
893
894     # Read serial file
895     self._last_serial = jstore.ReadSerial()
896     assert self._last_serial is not None, ("Serial file was modified between"
897                                            " check in jstore and here")
898
899     # Get initial list of nodes
900     self._nodes = dict((n.name, n.primary_ip)
901                        for n in self.context.cfg.GetAllNodesInfo().values()
902                        if n.master_candidate)
903
904     # Remove master node
905     self._nodes.pop(self._my_hostname, None)
906
907     # TODO: Check consistency across nodes
908
909     self._queue_size = 0
910     self._UpdateQueueSizeUnlocked()
911     self._drained = self._IsQueueMarkedDrain()
912
913     # Setup worker pool
914     self._wpool = _JobQueueWorkerPool(self)
915     try:
916       # We need to lock here because WorkerPool.AddTask() may start a job while
917       # we're still doing our work.
918       self.acquire()
919       try:
920         logging.info("Inspecting job queue")
921
922         all_job_ids = self._GetJobIDsUnlocked()
923         jobs_count = len(all_job_ids)
924         lastinfo = time.time()
925         for idx, job_id in enumerate(all_job_ids):
926           # Give an update every 1000 jobs or 10 seconds
927           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
928               idx == (jobs_count - 1)):
929             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
930                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
931             lastinfo = time.time()
932
933           job = self._LoadJobUnlocked(job_id)
934
935           # a failure in loading the job can cause 'None' to be returned
936           if job is None:
937             continue
938
939           status = job.CalcStatus()
940
941           if status in (constants.JOB_STATUS_QUEUED, ):
942             self._wpool.AddTask((job, ))
943
944           elif status in (constants.JOB_STATUS_RUNNING,
945                           constants.JOB_STATUS_WAITLOCK,
946                           constants.JOB_STATUS_CANCELING):
947             logging.warning("Unfinished job %s found: %s", job.id, job)
948             job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
949                                   "Unclean master daemon shutdown")
950
951         logging.info("Job queue inspection finished")
952       finally:
953         self.release()
954     except:
955       self._wpool.TerminateWorkers()
956       raise
957
958   @locking.ssynchronized(_LOCK)
959   @_RequireOpenQueue
960   def AddNode(self, node):
961     """Register a new node with the queue.
962
963     @type node: L{objects.Node}
964     @param node: the node object to be added
965
966     """
967     node_name = node.name
968     assert node_name != self._my_hostname
969
970     # Clean queue directory on added node
971     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
972     msg = result.fail_msg
973     if msg:
974       logging.warning("Cannot cleanup queue directory on node %s: %s",
975                       node_name, msg)
976
977     if not node.master_candidate:
978       # remove if existing, ignoring errors
979       self._nodes.pop(node_name, None)
980       # and skip the replication of the job ids
981       return
982
983     # Upload the whole queue excluding archived jobs
984     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
985
986     # Upload current serial file
987     files.append(constants.JOB_QUEUE_SERIAL_FILE)
988
989     for file_name in files:
990       # Read file content
991       content = utils.ReadFile(file_name)
992
993       result = rpc.RpcRunner.call_jobqueue_update([node_name],
994                                                   [node.primary_ip],
995                                                   file_name, content)
996       msg = result[node_name].fail_msg
997       if msg:
998         logging.error("Failed to upload file %s to node %s: %s",
999                       file_name, node_name, msg)
1000
1001     self._nodes[node_name] = node.primary_ip
1002
1003   @locking.ssynchronized(_LOCK)
1004   @_RequireOpenQueue
1005   def RemoveNode(self, node_name):
1006     """Callback called when removing nodes from the cluster.
1007
1008     @type node_name: str
1009     @param node_name: the name of the node to remove
1010
1011     """
1012     self._nodes.pop(node_name, None)
1013
1014   @staticmethod
1015   def _CheckRpcResult(result, nodes, failmsg):
1016     """Verifies the status of an RPC call.
1017
1018     Since we aim to keep consistency should this node (the current
1019     master) fail, we will log errors if our rpc fail, and especially
1020     log the case when more than half of the nodes fails.
1021
1022     @param result: the data as returned from the rpc call
1023     @type nodes: list
1024     @param nodes: the list of nodes we made the call to
1025     @type failmsg: str
1026     @param failmsg: the identifier to be used for logging
1027
1028     """
1029     failed = []
1030     success = []
1031
1032     for node in nodes:
1033       msg = result[node].fail_msg
1034       if msg:
1035         failed.append(node)
1036         logging.error("RPC call %s (%s) failed on node %s: %s",
1037                       result[node].call, failmsg, node, msg)
1038       else:
1039         success.append(node)
1040
1041     # +1 for the master node
1042     if (len(success) + 1) < len(failed):
1043       # TODO: Handle failing nodes
1044       logging.error("More than half of the nodes failed")
1045
1046   def _GetNodeIp(self):
1047     """Helper for returning the node name/ip list.
1048
1049     @rtype: (list, list)
1050     @return: a tuple of two lists, the first one with the node
1051         names and the second one with the node addresses
1052
1053     """
1054     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1055     name_list = self._nodes.keys()
1056     addr_list = [self._nodes[name] for name in name_list]
1057     return name_list, addr_list
1058
1059   def _UpdateJobQueueFile(self, file_name, data, replicate):
1060     """Writes a file locally and then replicates it to all nodes.
1061
1062     This function will replace the contents of a file on the local
1063     node and then replicate it to all the other nodes we have.
1064
1065     @type file_name: str
1066     @param file_name: the path of the file to be replicated
1067     @type data: str
1068     @param data: the new contents of the file
1069     @type replicate: boolean
1070     @param replicate: whether to spread the changes to the remote nodes
1071
1072     """
1073     utils.WriteFile(file_name, data=data)
1074
1075     if replicate:
1076       names, addrs = self._GetNodeIp()
1077       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1078       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1079
1080   def _RenameFilesUnlocked(self, rename):
1081     """Renames a file locally and then replicate the change.
1082
1083     This function will rename a file in the local queue directory
1084     and then replicate this rename to all the other nodes we have.
1085
1086     @type rename: list of (old, new)
1087     @param rename: List containing tuples mapping old to new names
1088
1089     """
1090     # Rename them locally
1091     for old, new in rename:
1092       utils.RenameFile(old, new, mkdir=True)
1093
1094     # ... and on all nodes
1095     names, addrs = self._GetNodeIp()
1096     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1097     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1098
1099   @staticmethod
1100   def _FormatJobID(job_id):
1101     """Convert a job ID to string format.
1102
1103     Currently this just does C{str(job_id)} after performing some
1104     checks, but if we want to change the job id format this will
1105     abstract this change.
1106
1107     @type job_id: int or long
1108     @param job_id: the numeric job id
1109     @rtype: str
1110     @return: the formatted job id
1111
1112     """
1113     if not isinstance(job_id, (int, long)):
1114       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1115     if job_id < 0:
1116       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1117
1118     return str(job_id)
1119
1120   @classmethod
1121   def _GetArchiveDirectory(cls, job_id):
1122     """Returns the archive directory for a job.
1123
1124     @type job_id: str
1125     @param job_id: Job identifier
1126     @rtype: str
1127     @return: Directory name
1128
1129     """
1130     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1131
1132   def _NewSerialsUnlocked(self, count):
1133     """Generates a new job identifier.
1134
1135     Job identifiers are unique during the lifetime of a cluster.
1136
1137     @type count: integer
1138     @param count: how many serials to return
1139     @rtype: str
1140     @return: a string representing the job identifier.
1141
1142     """
1143     assert count > 0
1144     # New number
1145     serial = self._last_serial + count
1146
1147     # Write to file
1148     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1149                              "%s\n" % serial, True)
1150
1151     result = [self._FormatJobID(v)
1152               for v in range(self._last_serial, serial + 1)]
1153     # Keep it only if we were able to write the file
1154     self._last_serial = serial
1155
1156     return result
1157
1158   @staticmethod
1159   def _GetJobPath(job_id):
1160     """Returns the job file for a given job id.
1161
1162     @type job_id: str
1163     @param job_id: the job identifier
1164     @rtype: str
1165     @return: the path to the job file
1166
1167     """
1168     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1169
1170   @classmethod
1171   def _GetArchivedJobPath(cls, job_id):
1172     """Returns the archived job file for a give job id.
1173
1174     @type job_id: str
1175     @param job_id: the job identifier
1176     @rtype: str
1177     @return: the path to the archived job file
1178
1179     """
1180     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1181                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1182
1183   def _GetJobIDsUnlocked(self, sort=True):
1184     """Return all known job IDs.
1185
1186     The method only looks at disk because it's a requirement that all
1187     jobs are present on disk (so in the _memcache we don't have any
1188     extra IDs).
1189
1190     @type sort: boolean
1191     @param sort: perform sorting on the returned job ids
1192     @rtype: list
1193     @return: the list of job IDs
1194
1195     """
1196     jlist = []
1197     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1198       m = self._RE_JOB_FILE.match(filename)
1199       if m:
1200         jlist.append(m.group(1))
1201     if sort:
1202       jlist = utils.NiceSort(jlist)
1203     return jlist
1204
1205   def _LoadJobUnlocked(self, job_id):
1206     """Loads a job from the disk or memory.
1207
1208     Given a job id, this will return the cached job object if
1209     existing, or try to load the job from the disk. If loading from
1210     disk, it will also add the job to the cache.
1211
1212     @param job_id: the job id
1213     @rtype: L{_QueuedJob} or None
1214     @return: either None or the job object
1215
1216     """
1217     job = self._memcache.get(job_id, None)
1218     if job:
1219       logging.debug("Found job %s in memcache", job_id)
1220       return job
1221
1222     try:
1223       job = self._LoadJobFromDisk(job_id)
1224       if job is None:
1225         return job
1226     except errors.JobFileCorrupted:
1227       old_path = self._GetJobPath(job_id)
1228       new_path = self._GetArchivedJobPath(job_id)
1229       if old_path == new_path:
1230         # job already archived (future case)
1231         logging.exception("Can't parse job %s", job_id)
1232       else:
1233         # non-archived case
1234         logging.exception("Can't parse job %s, will archive.", job_id)
1235         self._RenameFilesUnlocked([(old_path, new_path)])
1236       return None
1237
1238     self._memcache[job_id] = job
1239     logging.debug("Added job %s to the cache", job_id)
1240     return job
1241
1242   def _LoadJobFromDisk(self, job_id):
1243     """Load the given job file from disk.
1244
1245     Given a job file, read, load and restore it in a _QueuedJob format.
1246
1247     @type job_id: string
1248     @param job_id: job identifier
1249     @rtype: L{_QueuedJob} or None
1250     @return: either None or the job object
1251
1252     """
1253     filepath = self._GetJobPath(job_id)
1254     logging.debug("Loading job from %s", filepath)
1255     try:
1256       raw_data = utils.ReadFile(filepath)
1257     except EnvironmentError, err:
1258       if err.errno in (errno.ENOENT, ):
1259         return None
1260       raise
1261
1262     try:
1263       data = serializer.LoadJson(raw_data)
1264       job = _QueuedJob.Restore(self, data)
1265     except Exception, err: # pylint: disable-msg=W0703
1266       raise errors.JobFileCorrupted(err)
1267
1268     return job
1269
1270   def SafeLoadJobFromDisk(self, job_id):
1271     """Load the given job file from disk.
1272
1273     Given a job file, read, load and restore it in a _QueuedJob format.
1274     In case of error reading the job, it gets returned as None, and the
1275     exception is logged.
1276
1277     @type job_id: string
1278     @param job_id: job identifier
1279     @rtype: L{_QueuedJob} or None
1280     @return: either None or the job object
1281
1282     """
1283     try:
1284       return self._LoadJobFromDisk(job_id)
1285     except (errors.JobFileCorrupted, EnvironmentError):
1286       logging.exception("Can't load/parse job %s", job_id)
1287       return None
1288
1289   @staticmethod
1290   def _IsQueueMarkedDrain():
1291     """Check if the queue is marked from drain.
1292
1293     This currently uses the queue drain file, which makes it a
1294     per-node flag. In the future this can be moved to the config file.
1295
1296     @rtype: boolean
1297     @return: True of the job queue is marked for draining
1298
1299     """
1300     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1301
1302   def _UpdateQueueSizeUnlocked(self):
1303     """Update the queue size.
1304
1305     """
1306     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1307
1308   @locking.ssynchronized(_LOCK)
1309   @_RequireOpenQueue
1310   def SetDrainFlag(self, drain_flag):
1311     """Sets the drain flag for the queue.
1312
1313     @type drain_flag: boolean
1314     @param drain_flag: Whether to set or unset the drain flag
1315
1316     """
1317     if drain_flag:
1318       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1319     else:
1320       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1321
1322     self._drained = drain_flag
1323
1324     return True
1325
1326   @_RequireOpenQueue
1327   def _SubmitJobUnlocked(self, job_id, ops):
1328     """Create and store a new job.
1329
1330     This enters the job into our job queue and also puts it on the new
1331     queue, in order for it to be picked up by the queue processors.
1332
1333     @type job_id: job ID
1334     @param job_id: the job ID for the new job
1335     @type ops: list
1336     @param ops: The list of OpCodes that will become the new job.
1337     @rtype: L{_QueuedJob}
1338     @return: the job object to be queued
1339     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1340     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1341
1342     """
1343     # Ok when sharing the big job queue lock, as the drain file is created when
1344     # the lock is exclusive.
1345     if self._drained:
1346       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1347
1348     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1349       raise errors.JobQueueFull()
1350
1351     job = _QueuedJob(self, job_id, ops)
1352
1353     # Write to disk
1354     self.UpdateJobUnlocked(job)
1355
1356     self._queue_size += 1
1357
1358     logging.debug("Adding new job %s to the cache", job_id)
1359     self._memcache[job_id] = job
1360
1361     return job
1362
1363   @locking.ssynchronized(_LOCK)
1364   @_RequireOpenQueue
1365   def SubmitJob(self, ops):
1366     """Create and store a new job.
1367
1368     @see: L{_SubmitJobUnlocked}
1369
1370     """
1371     job_id = self._NewSerialsUnlocked(1)[0]
1372     self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1373     return job_id
1374
1375   @locking.ssynchronized(_LOCK)
1376   @_RequireOpenQueue
1377   def SubmitManyJobs(self, jobs):
1378     """Create and store multiple jobs.
1379
1380     @see: L{_SubmitJobUnlocked}
1381
1382     """
1383     results = []
1384     tasks = []
1385     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1386     for job_id, ops in zip(all_job_ids, jobs):
1387       try:
1388         tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1389         status = True
1390         data = job_id
1391       except errors.GenericError, err:
1392         data = str(err)
1393         status = False
1394       results.append((status, data))
1395     self._wpool.AddManyTasks(tasks)
1396
1397     return results
1398
1399   @_RequireOpenQueue
1400   def UpdateJobUnlocked(self, job, replicate=True):
1401     """Update a job's on disk storage.
1402
1403     After a job has been modified, this function needs to be called in
1404     order to write the changes to disk and replicate them to the other
1405     nodes.
1406
1407     @type job: L{_QueuedJob}
1408     @param job: the changed job
1409     @type replicate: boolean
1410     @param replicate: whether to replicate the change to remote nodes
1411
1412     """
1413     filename = self._GetJobPath(job.id)
1414     data = serializer.DumpJson(job.Serialize(), indent=False)
1415     logging.debug("Writing job %s to %s", job.id, filename)
1416     self._UpdateJobQueueFile(filename, data, replicate)
1417
1418   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1419                         timeout):
1420     """Waits for changes in a job.
1421
1422     @type job_id: string
1423     @param job_id: Job identifier
1424     @type fields: list of strings
1425     @param fields: Which fields to check for changes
1426     @type prev_job_info: list or None
1427     @param prev_job_info: Last job information returned
1428     @type prev_log_serial: int
1429     @param prev_log_serial: Last job message serial number
1430     @type timeout: float
1431     @param timeout: maximum time to wait in seconds
1432     @rtype: tuple (job info, log entries)
1433     @return: a tuple of the job information as required via
1434         the fields parameter, and the log entries as a list
1435
1436         if the job has not changed and the timeout has expired,
1437         we instead return a special value,
1438         L{constants.JOB_NOTCHANGED}, which should be interpreted
1439         as such by the clients
1440
1441     """
1442     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1443
1444     helper = _WaitForJobChangesHelper()
1445
1446     return helper(self._GetJobPath(job_id), load_fn,
1447                   fields, prev_job_info, prev_log_serial, timeout)
1448
1449   @locking.ssynchronized(_LOCK)
1450   @_RequireOpenQueue
1451   def CancelJob(self, job_id):
1452     """Cancels a job.
1453
1454     This will only succeed if the job has not started yet.
1455
1456     @type job_id: string
1457     @param job_id: job ID of job to be cancelled.
1458
1459     """
1460     logging.info("Cancelling job %s", job_id)
1461
1462     job = self._LoadJobUnlocked(job_id)
1463     if not job:
1464       logging.debug("Job %s not found", job_id)
1465       return (False, "Job %s not found" % job_id)
1466
1467     job_status = job.CalcStatus()
1468
1469     if job_status not in (constants.JOB_STATUS_QUEUED,
1470                           constants.JOB_STATUS_WAITLOCK):
1471       logging.debug("Job %s is no longer waiting in the queue", job.id)
1472       return (False, "Job %s is no longer waiting in the queue" % job.id)
1473
1474     if job_status == constants.JOB_STATUS_QUEUED:
1475       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1476                             "Job canceled by request")
1477       return (True, "Job %s canceled" % job.id)
1478
1479     elif job_status == constants.JOB_STATUS_WAITLOCK:
1480       # The worker will notice the new status and cancel the job
1481       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1482       return (True, "Job %s will be canceled" % job.id)
1483
1484   @_RequireOpenQueue
1485   def _ArchiveJobsUnlocked(self, jobs):
1486     """Archives jobs.
1487
1488     @type jobs: list of L{_QueuedJob}
1489     @param jobs: Job objects
1490     @rtype: int
1491     @return: Number of archived jobs
1492
1493     """
1494     archive_jobs = []
1495     rename_files = []
1496     for job in jobs:
1497       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1498         logging.debug("Job %s is not yet done", job.id)
1499         continue
1500
1501       archive_jobs.append(job)
1502
1503       old = self._GetJobPath(job.id)
1504       new = self._GetArchivedJobPath(job.id)
1505       rename_files.append((old, new))
1506
1507     # TODO: What if 1..n files fail to rename?
1508     self._RenameFilesUnlocked(rename_files)
1509
1510     logging.debug("Successfully archived job(s) %s",
1511                   utils.CommaJoin(job.id for job in archive_jobs))
1512
1513     # Since we haven't quite checked, above, if we succeeded or failed renaming
1514     # the files, we update the cached queue size from the filesystem. When we
1515     # get around to fix the TODO: above, we can use the number of actually
1516     # archived jobs to fix this.
1517     self._UpdateQueueSizeUnlocked()
1518     return len(archive_jobs)
1519
1520   @locking.ssynchronized(_LOCK)
1521   @_RequireOpenQueue
1522   def ArchiveJob(self, job_id):
1523     """Archives a job.
1524
1525     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1526
1527     @type job_id: string
1528     @param job_id: Job ID of job to be archived.
1529     @rtype: bool
1530     @return: Whether job was archived
1531
1532     """
1533     logging.info("Archiving job %s", job_id)
1534
1535     job = self._LoadJobUnlocked(job_id)
1536     if not job:
1537       logging.debug("Job %s not found", job_id)
1538       return False
1539
1540     return self._ArchiveJobsUnlocked([job]) == 1
1541
1542   @locking.ssynchronized(_LOCK)
1543   @_RequireOpenQueue
1544   def AutoArchiveJobs(self, age, timeout):
1545     """Archives all jobs based on age.
1546
1547     The method will archive all jobs which are older than the age
1548     parameter. For jobs that don't have an end timestamp, the start
1549     timestamp will be considered. The special '-1' age will cause
1550     archival of all jobs (that are not running or queued).
1551
1552     @type age: int
1553     @param age: the minimum age in seconds
1554
1555     """
1556     logging.info("Archiving jobs with age more than %s seconds", age)
1557
1558     now = time.time()
1559     end_time = now + timeout
1560     archived_count = 0
1561     last_touched = 0
1562
1563     all_job_ids = self._GetJobIDsUnlocked()
1564     pending = []
1565     for idx, job_id in enumerate(all_job_ids):
1566       last_touched = idx + 1
1567
1568       # Not optimal because jobs could be pending
1569       # TODO: Measure average duration for job archival and take number of
1570       # pending jobs into account.
1571       if time.time() > end_time:
1572         break
1573
1574       # Returns None if the job failed to load
1575       job = self._LoadJobUnlocked(job_id)
1576       if job:
1577         if job.end_timestamp is None:
1578           if job.start_timestamp is None:
1579             job_age = job.received_timestamp
1580           else:
1581             job_age = job.start_timestamp
1582         else:
1583           job_age = job.end_timestamp
1584
1585         if age == -1 or now - job_age[0] > age:
1586           pending.append(job)
1587
1588           # Archive 10 jobs at a time
1589           if len(pending) >= 10:
1590             archived_count += self._ArchiveJobsUnlocked(pending)
1591             pending = []
1592
1593     if pending:
1594       archived_count += self._ArchiveJobsUnlocked(pending)
1595
1596     return (archived_count, len(all_job_ids) - last_touched)
1597
1598   def QueryJobs(self, job_ids, fields):
1599     """Returns a list of jobs in queue.
1600
1601     @type job_ids: list
1602     @param job_ids: sequence of job identifiers or None for all
1603     @type fields: list
1604     @param fields: names of fields to return
1605     @rtype: list
1606     @return: list one element per job, each element being list with
1607         the requested fields
1608
1609     """
1610     jobs = []
1611     list_all = False
1612     if not job_ids:
1613       # Since files are added to/removed from the queue atomically, there's no
1614       # risk of getting the job ids in an inconsistent state.
1615       job_ids = self._GetJobIDsUnlocked()
1616       list_all = True
1617
1618     for job_id in job_ids:
1619       job = self.SafeLoadJobFromDisk(job_id)
1620       if job is not None:
1621         jobs.append(job.GetInfo(fields))
1622       elif not list_all:
1623         jobs.append(None)
1624
1625     return jobs
1626
1627   @locking.ssynchronized(_LOCK)
1628   @_RequireOpenQueue
1629   def Shutdown(self):
1630     """Stops the job queue.
1631
1632     This shutdowns all the worker threads an closes the queue.
1633
1634     """
1635     self._wpool.TerminateWorkers()
1636
1637     self._queue_filelock.Close()
1638     self._queue_filelock = None