Add a new NodeGroup config object
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59
60
61 JOBQUEUE_THREADS = 25
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63
64 # member lock names to be passed to @ssynchronized decorator
65 _LOCK = "_lock"
66 _QUEUE = "_queue"
67
68
69 class CancelJob(Exception):
70   """Special exception to cancel a job.
71
72   """
73
74
75 def TimeStampNow():
76   """Returns the current timestamp.
77
78   @rtype: tuple
79   @return: the current time in the (seconds, microseconds) format
80
81   """
82   return utils.SplitTime(time.time())
83
84
85 class _QueuedOpCode(object):
86   """Encapsulates an opcode object.
87
88   @ivar log: holds the execution log and consists of tuples
89   of the form C{(log_serial, timestamp, level, message)}
90   @ivar input: the OpCode we encapsulate
91   @ivar status: the current status
92   @ivar result: the result of the LU execution
93   @ivar start_timestamp: timestamp for the start of the execution
94   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95   @ivar stop_timestamp: timestamp for the end of the execution
96
97   """
98   __slots__ = ["input", "status", "result", "log", "priority",
99                "start_timestamp", "exec_timestamp", "end_timestamp",
100                "__weakref__"]
101
102   def __init__(self, op):
103     """Constructor for the _QuededOpCode.
104
105     @type op: L{opcodes.OpCode}
106     @param op: the opcode we encapsulate
107
108     """
109     self.input = op
110     self.status = constants.OP_STATUS_QUEUED
111     self.result = None
112     self.log = []
113     self.start_timestamp = None
114     self.exec_timestamp = None
115     self.end_timestamp = None
116
117     # Get initial priority (it might change during the lifetime of this opcode)
118     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119
120   @classmethod
121   def Restore(cls, state):
122     """Restore the _QueuedOpCode from the serialized form.
123
124     @type state: dict
125     @param state: the serialized state
126     @rtype: _QueuedOpCode
127     @return: a new _QueuedOpCode instance
128
129     """
130     obj = _QueuedOpCode.__new__(cls)
131     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132     obj.status = state["status"]
133     obj.result = state["result"]
134     obj.log = state["log"]
135     obj.start_timestamp = state.get("start_timestamp", None)
136     obj.exec_timestamp = state.get("exec_timestamp", None)
137     obj.end_timestamp = state.get("end_timestamp", None)
138     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139     return obj
140
141   def Serialize(self):
142     """Serializes this _QueuedOpCode.
143
144     @rtype: dict
145     @return: the dictionary holding the serialized state
146
147     """
148     return {
149       "input": self.input.__getstate__(),
150       "status": self.status,
151       "result": self.result,
152       "log": self.log,
153       "start_timestamp": self.start_timestamp,
154       "exec_timestamp": self.exec_timestamp,
155       "end_timestamp": self.end_timestamp,
156       "priority": self.priority,
157       }
158
159
160 class _QueuedJob(object):
161   """In-memory job representation.
162
163   This is what we use to track the user-submitted jobs. Locking must
164   be taken care of by users of this class.
165
166   @type queue: L{JobQueue}
167   @ivar queue: the parent queue
168   @ivar id: the job ID
169   @type ops: list
170   @ivar ops: the list of _QueuedOpCode that constitute the job
171   @type log_serial: int
172   @ivar log_serial: holds the index for the next log entry
173   @ivar received_timestamp: the timestamp for when the job was received
174   @ivar start_timestmap: the timestamp for start of execution
175   @ivar end_timestamp: the timestamp for end of execution
176
177   """
178   # pylint: disable-msg=W0212
179   __slots__ = ["queue", "id", "ops", "log_serial",
180                "received_timestamp", "start_timestamp", "end_timestamp",
181                "__weakref__"]
182
183   def __init__(self, queue, job_id, ops):
184     """Constructor for the _QueuedJob.
185
186     @type queue: L{JobQueue}
187     @param queue: our parent queue
188     @type job_id: job_id
189     @param job_id: our job id
190     @type ops: list
191     @param ops: the list of opcodes we hold, which will be encapsulated
192         in _QueuedOpCodes
193
194     """
195     if not ops:
196       raise errors.GenericError("A job needs at least one opcode")
197
198     self.queue = queue
199     self.id = job_id
200     self.ops = [_QueuedOpCode(op) for op in ops]
201     self.log_serial = 0
202     self.received_timestamp = TimeStampNow()
203     self.start_timestamp = None
204     self.end_timestamp = None
205
206   def __repr__(self):
207     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
208               "id=%s" % self.id,
209               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
210
211     return "<%s at %#x>" % (" ".join(status), id(self))
212
213   @classmethod
214   def Restore(cls, queue, state):
215     """Restore a _QueuedJob from serialized state:
216
217     @type queue: L{JobQueue}
218     @param queue: to which queue the restored job belongs
219     @type state: dict
220     @param state: the serialized state
221     @rtype: _JobQueue
222     @return: the restored _JobQueue instance
223
224     """
225     obj = _QueuedJob.__new__(cls)
226     obj.queue = queue
227     obj.id = state["id"]
228     obj.received_timestamp = state.get("received_timestamp", None)
229     obj.start_timestamp = state.get("start_timestamp", None)
230     obj.end_timestamp = state.get("end_timestamp", None)
231
232     obj.ops = []
233     obj.log_serial = 0
234     for op_state in state["ops"]:
235       op = _QueuedOpCode.Restore(op_state)
236       for log_entry in op.log:
237         obj.log_serial = max(obj.log_serial, log_entry[0])
238       obj.ops.append(op)
239
240     return obj
241
242   def Serialize(self):
243     """Serialize the _JobQueue instance.
244
245     @rtype: dict
246     @return: the serialized state
247
248     """
249     return {
250       "id": self.id,
251       "ops": [op.Serialize() for op in self.ops],
252       "start_timestamp": self.start_timestamp,
253       "end_timestamp": self.end_timestamp,
254       "received_timestamp": self.received_timestamp,
255       }
256
257   def CalcStatus(self):
258     """Compute the status of this job.
259
260     This function iterates over all the _QueuedOpCodes in the job and
261     based on their status, computes the job status.
262
263     The algorithm is:
264       - if we find a cancelled, or finished with error, the job
265         status will be the same
266       - otherwise, the last opcode with the status one of:
267           - waitlock
268           - canceling
269           - running
270
271         will determine the job status
272
273       - otherwise, it means either all opcodes are queued, or success,
274         and the job status will be the same
275
276     @return: the job status
277
278     """
279     status = constants.JOB_STATUS_QUEUED
280
281     all_success = True
282     for op in self.ops:
283       if op.status == constants.OP_STATUS_SUCCESS:
284         continue
285
286       all_success = False
287
288       if op.status == constants.OP_STATUS_QUEUED:
289         pass
290       elif op.status == constants.OP_STATUS_WAITLOCK:
291         status = constants.JOB_STATUS_WAITLOCK
292       elif op.status == constants.OP_STATUS_RUNNING:
293         status = constants.JOB_STATUS_RUNNING
294       elif op.status == constants.OP_STATUS_CANCELING:
295         status = constants.JOB_STATUS_CANCELING
296         break
297       elif op.status == constants.OP_STATUS_ERROR:
298         status = constants.JOB_STATUS_ERROR
299         # The whole job fails if one opcode failed
300         break
301       elif op.status == constants.OP_STATUS_CANCELED:
302         status = constants.OP_STATUS_CANCELED
303         break
304
305     if all_success:
306       status = constants.JOB_STATUS_SUCCESS
307
308     return status
309
310   def CalcPriority(self):
311     """Gets the current priority for this job.
312
313     Only unfinished opcodes are considered. When all are done, the default
314     priority is used.
315
316     @rtype: int
317
318     """
319     priorities = [op.priority for op in self.ops
320                   if op.status not in constants.OPS_FINALIZED]
321
322     if not priorities:
323       # All opcodes are done, assume default priority
324       return constants.OP_PRIO_DEFAULT
325
326     return min(priorities)
327
328   def GetLogEntries(self, newer_than):
329     """Selectively returns the log entries.
330
331     @type newer_than: None or int
332     @param newer_than: if this is None, return all log entries,
333         otherwise return only the log entries with serial higher
334         than this value
335     @rtype: list
336     @return: the list of the log entries selected
337
338     """
339     if newer_than is None:
340       serial = -1
341     else:
342       serial = newer_than
343
344     entries = []
345     for op in self.ops:
346       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
347
348     return entries
349
350   def GetInfo(self, fields):
351     """Returns information about a job.
352
353     @type fields: list
354     @param fields: names of fields to return
355     @rtype: list
356     @return: list with one element for each field
357     @raise errors.OpExecError: when an invalid field
358         has been passed
359
360     """
361     row = []
362     for fname in fields:
363       if fname == "id":
364         row.append(self.id)
365       elif fname == "status":
366         row.append(self.CalcStatus())
367       elif fname == "ops":
368         row.append([op.input.__getstate__() for op in self.ops])
369       elif fname == "opresult":
370         row.append([op.result for op in self.ops])
371       elif fname == "opstatus":
372         row.append([op.status for op in self.ops])
373       elif fname == "oplog":
374         row.append([op.log for op in self.ops])
375       elif fname == "opstart":
376         row.append([op.start_timestamp for op in self.ops])
377       elif fname == "opexec":
378         row.append([op.exec_timestamp for op in self.ops])
379       elif fname == "opend":
380         row.append([op.end_timestamp for op in self.ops])
381       elif fname == "received_ts":
382         row.append(self.received_timestamp)
383       elif fname == "start_ts":
384         row.append(self.start_timestamp)
385       elif fname == "end_ts":
386         row.append(self.end_timestamp)
387       elif fname == "summary":
388         row.append([op.input.Summary() for op in self.ops])
389       else:
390         raise errors.OpExecError("Invalid self query field '%s'" % fname)
391     return row
392
393   def MarkUnfinishedOps(self, status, result):
394     """Mark unfinished opcodes with a given status and result.
395
396     This is an utility function for marking all running or waiting to
397     be run opcodes with a given status. Opcodes which are already
398     finalised are not changed.
399
400     @param status: a given opcode status
401     @param result: the opcode result
402
403     """
404     not_marked = True
405     for op in self.ops:
406       if op.status in constants.OPS_FINALIZED:
407         assert not_marked, "Finalized opcodes found after non-finalized ones"
408         continue
409       op.status = status
410       op.result = result
411       not_marked = False
412
413
414 class _OpExecCallbacks(mcpu.OpExecCbBase):
415   def __init__(self, queue, job, op):
416     """Initializes this class.
417
418     @type queue: L{JobQueue}
419     @param queue: Job queue
420     @type job: L{_QueuedJob}
421     @param job: Job object
422     @type op: L{_QueuedOpCode}
423     @param op: OpCode
424
425     """
426     assert queue, "Queue is missing"
427     assert job, "Job is missing"
428     assert op, "Opcode is missing"
429
430     self._queue = queue
431     self._job = job
432     self._op = op
433
434   def _CheckCancel(self):
435     """Raises an exception to cancel the job if asked to.
436
437     """
438     # Cancel here if we were asked to
439     if self._op.status == constants.OP_STATUS_CANCELING:
440       logging.debug("Canceling opcode")
441       raise CancelJob()
442
443   @locking.ssynchronized(_QUEUE, shared=1)
444   def NotifyStart(self):
445     """Mark the opcode as running, not lock-waiting.
446
447     This is called from the mcpu code as a notifier function, when the LU is
448     finally about to start the Exec() method. Of course, to have end-user
449     visible results, the opcode must be initially (before calling into
450     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
451
452     """
453     assert self._op in self._job.ops
454     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
455                                constants.OP_STATUS_CANCELING)
456
457     # Cancel here if we were asked to
458     self._CheckCancel()
459
460     logging.debug("Opcode is now running")
461
462     self._op.status = constants.OP_STATUS_RUNNING
463     self._op.exec_timestamp = TimeStampNow()
464
465     # And finally replicate the job status
466     self._queue.UpdateJobUnlocked(self._job)
467
468   @locking.ssynchronized(_QUEUE, shared=1)
469   def _AppendFeedback(self, timestamp, log_type, log_msg):
470     """Internal feedback append function, with locks
471
472     """
473     self._job.log_serial += 1
474     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
475     self._queue.UpdateJobUnlocked(self._job, replicate=False)
476
477   def Feedback(self, *args):
478     """Append a log entry.
479
480     """
481     assert len(args) < 3
482
483     if len(args) == 1:
484       log_type = constants.ELOG_MESSAGE
485       log_msg = args[0]
486     else:
487       (log_type, log_msg) = args
488
489     # The time is split to make serialization easier and not lose
490     # precision.
491     timestamp = utils.SplitTime(time.time())
492     self._AppendFeedback(timestamp, log_type, log_msg)
493
494   def ReportLocks(self, msg):
495     """Write locking information to the job.
496
497     Called whenever the LU processor is waiting for a lock or has acquired one.
498
499     """
500     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501                                constants.OP_STATUS_CANCELING)
502
503     # Cancel here if we were asked to
504     self._CheckCancel()
505
506
507 class _JobChangesChecker(object):
508   def __init__(self, fields, prev_job_info, prev_log_serial):
509     """Initializes this class.
510
511     @type fields: list of strings
512     @param fields: Fields requested by LUXI client
513     @type prev_job_info: string
514     @param prev_job_info: previous job info, as passed by the LUXI client
515     @type prev_log_serial: string
516     @param prev_log_serial: previous job serial, as passed by the LUXI client
517
518     """
519     self._fields = fields
520     self._prev_job_info = prev_job_info
521     self._prev_log_serial = prev_log_serial
522
523   def __call__(self, job):
524     """Checks whether job has changed.
525
526     @type job: L{_QueuedJob}
527     @param job: Job object
528
529     """
530     status = job.CalcStatus()
531     job_info = job.GetInfo(self._fields)
532     log_entries = job.GetLogEntries(self._prev_log_serial)
533
534     # Serializing and deserializing data can cause type changes (e.g. from
535     # tuple to list) or precision loss. We're doing it here so that we get
536     # the same modifications as the data received from the client. Without
537     # this, the comparison afterwards might fail without the data being
538     # significantly different.
539     # TODO: we just deserialized from disk, investigate how to make sure that
540     # the job info and log entries are compatible to avoid this further step.
541     # TODO: Doing something like in testutils.py:UnifyValueType might be more
542     # efficient, though floats will be tricky
543     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
544     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
545
546     # Don't even try to wait if the job is no longer running, there will be
547     # no changes.
548     if (status not in (constants.JOB_STATUS_QUEUED,
549                        constants.JOB_STATUS_RUNNING,
550                        constants.JOB_STATUS_WAITLOCK) or
551         job_info != self._prev_job_info or
552         (log_entries and self._prev_log_serial != log_entries[0][0])):
553       logging.debug("Job %s changed", job.id)
554       return (job_info, log_entries)
555
556     return None
557
558
559 class _JobFileChangesWaiter(object):
560   def __init__(self, filename):
561     """Initializes this class.
562
563     @type filename: string
564     @param filename: Path to job file
565     @raises errors.InotifyError: if the notifier cannot be setup
566
567     """
568     self._wm = pyinotify.WatchManager()
569     self._inotify_handler = \
570       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
571     self._notifier = \
572       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
573     try:
574       self._inotify_handler.enable()
575     except Exception:
576       # pyinotify doesn't close file descriptors automatically
577       self._notifier.stop()
578       raise
579
580   def _OnInotify(self, notifier_enabled):
581     """Callback for inotify.
582
583     """
584     if not notifier_enabled:
585       self._inotify_handler.enable()
586
587   def Wait(self, timeout):
588     """Waits for the job file to change.
589
590     @type timeout: float
591     @param timeout: Timeout in seconds
592     @return: Whether there have been events
593
594     """
595     assert timeout >= 0
596     have_events = self._notifier.check_events(timeout * 1000)
597     if have_events:
598       self._notifier.read_events()
599     self._notifier.process_events()
600     return have_events
601
602   def Close(self):
603     """Closes underlying notifier and its file descriptor.
604
605     """
606     self._notifier.stop()
607
608
609 class _JobChangesWaiter(object):
610   def __init__(self, filename):
611     """Initializes this class.
612
613     @type filename: string
614     @param filename: Path to job file
615
616     """
617     self._filewaiter = None
618     self._filename = filename
619
620   def Wait(self, timeout):
621     """Waits for a job to change.
622
623     @type timeout: float
624     @param timeout: Timeout in seconds
625     @return: Whether there have been events
626
627     """
628     if self._filewaiter:
629       return self._filewaiter.Wait(timeout)
630
631     # Lazy setup: Avoid inotify setup cost when job file has already changed.
632     # If this point is reached, return immediately and let caller check the job
633     # file again in case there were changes since the last check. This avoids a
634     # race condition.
635     self._filewaiter = _JobFileChangesWaiter(self._filename)
636
637     return True
638
639   def Close(self):
640     """Closes underlying waiter.
641
642     """
643     if self._filewaiter:
644       self._filewaiter.Close()
645
646
647 class _WaitForJobChangesHelper(object):
648   """Helper class using inotify to wait for changes in a job file.
649
650   This class takes a previous job status and serial, and alerts the client when
651   the current job status has changed.
652
653   """
654   @staticmethod
655   def _CheckForChanges(job_load_fn, check_fn):
656     job = job_load_fn()
657     if not job:
658       raise errors.JobLost()
659
660     result = check_fn(job)
661     if result is None:
662       raise utils.RetryAgain()
663
664     return result
665
666   def __call__(self, filename, job_load_fn,
667                fields, prev_job_info, prev_log_serial, timeout):
668     """Waits for changes on a job.
669
670     @type filename: string
671     @param filename: File on which to wait for changes
672     @type job_load_fn: callable
673     @param job_load_fn: Function to load job
674     @type fields: list of strings
675     @param fields: Which fields to check for changes
676     @type prev_job_info: list or None
677     @param prev_job_info: Last job information returned
678     @type prev_log_serial: int
679     @param prev_log_serial: Last job message serial number
680     @type timeout: float
681     @param timeout: maximum time to wait in seconds
682
683     """
684     try:
685       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
686       waiter = _JobChangesWaiter(filename)
687       try:
688         return utils.Retry(compat.partial(self._CheckForChanges,
689                                           job_load_fn, check_fn),
690                            utils.RETRY_REMAINING_TIME, timeout,
691                            wait_fn=waiter.Wait)
692       finally:
693         waiter.Close()
694     except (errors.InotifyError, errors.JobLost):
695       return None
696     except utils.RetryTimeout:
697       return constants.JOB_NOTCHANGED
698
699
700 def _EncodeOpError(err):
701   """Encodes an error which occurred while processing an opcode.
702
703   """
704   if isinstance(err, errors.GenericError):
705     to_encode = err
706   else:
707     to_encode = errors.OpExecError(str(err))
708
709   return errors.EncodeException(to_encode)
710
711
712 class _JobQueueWorker(workerpool.BaseWorker):
713   """The actual job workers.
714
715   """
716   def RunTask(self, job): # pylint: disable-msg=W0221
717     """Job executor.
718
719     This functions processes a job. It is closely tied to the _QueuedJob and
720     _QueuedOpCode classes.
721
722     @type job: L{_QueuedJob}
723     @param job: the job to be processed
724
725     """
726     self.SetTaskName("Job%s" % job.id)
727
728     logging.info("Processing job %s", job.id)
729     proc = mcpu.Processor(self.pool.queue.context, job.id)
730     queue = job.queue
731     try:
732       try:
733         count = len(job.ops)
734         for idx, op in enumerate(job.ops):
735           op_summary = op.input.Summary()
736           if op.status == constants.OP_STATUS_SUCCESS:
737             # this is a job that was partially completed before master
738             # daemon shutdown, so it can be expected that some opcodes
739             # are already completed successfully (if any did error
740             # out, then the whole job should have been aborted and not
741             # resubmitted for processing)
742             logging.info("Op %s/%s: opcode %s already processed, skipping",
743                          idx + 1, count, op_summary)
744             continue
745           try:
746             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
747                          op_summary)
748
749             queue.acquire(shared=1)
750             try:
751               if op.status == constants.OP_STATUS_CANCELED:
752                 logging.debug("Canceling opcode")
753                 raise CancelJob()
754               assert op.status == constants.OP_STATUS_QUEUED
755               logging.debug("Opcode %s/%s waiting for locks",
756                             idx + 1, count)
757               op.status = constants.OP_STATUS_WAITLOCK
758               op.result = None
759               op.start_timestamp = TimeStampNow()
760               if idx == 0: # first opcode
761                 job.start_timestamp = op.start_timestamp
762               queue.UpdateJobUnlocked(job)
763
764               input_opcode = op.input
765             finally:
766               queue.release()
767
768             # Make sure not to hold queue lock while calling ExecOpCode
769             result = proc.ExecOpCode(input_opcode,
770                                      _OpExecCallbacks(queue, job, op))
771
772             queue.acquire(shared=1)
773             try:
774               logging.debug("Opcode %s/%s succeeded", idx + 1, count)
775               op.status = constants.OP_STATUS_SUCCESS
776               op.result = result
777               op.end_timestamp = TimeStampNow()
778               if idx == count - 1:
779                 job.end_timestamp = TimeStampNow()
780
781                 # Consistency check
782                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
783                                   for i in job.ops)
784
785               queue.UpdateJobUnlocked(job)
786             finally:
787               queue.release()
788
789             logging.info("Op %s/%s: Successfully finished opcode %s",
790                          idx + 1, count, op_summary)
791           except CancelJob:
792             # Will be handled further up
793             raise
794           except Exception, err:
795             queue.acquire(shared=1)
796             try:
797               try:
798                 logging.debug("Opcode %s/%s failed", idx + 1, count)
799                 op.status = constants.OP_STATUS_ERROR
800                 op.result = _EncodeOpError(err)
801                 op.end_timestamp = TimeStampNow()
802                 logging.info("Op %s/%s: Error in opcode %s: %s",
803                              idx + 1, count, op_summary, err)
804
805                 to_encode = errors.OpExecError("Preceding opcode failed")
806                 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
807                                       _EncodeOpError(to_encode))
808
809                 # Consistency check
810                 assert compat.all(i.status == constants.OP_STATUS_SUCCESS
811                                   for i in job.ops[:idx])
812                 assert compat.all(i.status == constants.OP_STATUS_ERROR and
813                                   errors.GetEncodedError(i.result)
814                                   for i in job.ops[idx:])
815               finally:
816                 job.end_timestamp = TimeStampNow()
817                 queue.UpdateJobUnlocked(job)
818             finally:
819               queue.release()
820             raise
821
822       except CancelJob:
823         queue.acquire(shared=1)
824         try:
825           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
826                                 "Job canceled by request")
827           job.end_timestamp = TimeStampNow()
828           queue.UpdateJobUnlocked(job)
829         finally:
830           queue.release()
831       except errors.GenericError, err:
832         logging.exception("Ganeti exception")
833       except:
834         logging.exception("Unhandled exception")
835     finally:
836       status = job.CalcStatus()
837       logging.info("Finished job %s, status = %s", job.id, status)
838
839
840 class _JobQueueWorkerPool(workerpool.WorkerPool):
841   """Simple class implementing a job-processing workerpool.
842
843   """
844   def __init__(self, queue):
845     super(_JobQueueWorkerPool, self).__init__("JobQueue",
846                                               JOBQUEUE_THREADS,
847                                               _JobQueueWorker)
848     self.queue = queue
849
850
851 def _RequireOpenQueue(fn):
852   """Decorator for "public" functions.
853
854   This function should be used for all 'public' functions. That is,
855   functions usually called from other classes. Note that this should
856   be applied only to methods (not plain functions), since it expects
857   that the decorated function is called with a first argument that has
858   a '_queue_filelock' argument.
859
860   @warning: Use this decorator only after locking.ssynchronized
861
862   Example::
863     @locking.ssynchronized(_LOCK)
864     @_RequireOpenQueue
865     def Example(self):
866       pass
867
868   """
869   def wrapper(self, *args, **kwargs):
870     # pylint: disable-msg=W0212
871     assert self._queue_filelock is not None, "Queue should be open"
872     return fn(self, *args, **kwargs)
873   return wrapper
874
875
876 class JobQueue(object):
877   """Queue used to manage the jobs.
878
879   @cvar _RE_JOB_FILE: regex matching the valid job file names
880
881   """
882   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
883
884   def __init__(self, context):
885     """Constructor for JobQueue.
886
887     The constructor will initialize the job queue object and then
888     start loading the current jobs from disk, either for starting them
889     (if they were queue) or for aborting them (if they were already
890     running).
891
892     @type context: GanetiContext
893     @param context: the context object for access to the configuration
894         data and other ganeti objects
895
896     """
897     self.context = context
898     self._memcache = weakref.WeakValueDictionary()
899     self._my_hostname = netutils.Hostname.GetSysName()
900
901     # The Big JobQueue lock. If a code block or method acquires it in shared
902     # mode safe it must guarantee concurrency with all the code acquiring it in
903     # shared mode, including itself. In order not to acquire it at all
904     # concurrency must be guaranteed with all code acquiring it in shared mode
905     # and all code acquiring it exclusively.
906     self._lock = locking.SharedLock("JobQueue")
907
908     self.acquire = self._lock.acquire
909     self.release = self._lock.release
910
911     # Initialize the queue, and acquire the filelock.
912     # This ensures no other process is working on the job queue.
913     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
914
915     # Read serial file
916     self._last_serial = jstore.ReadSerial()
917     assert self._last_serial is not None, ("Serial file was modified between"
918                                            " check in jstore and here")
919
920     # Get initial list of nodes
921     self._nodes = dict((n.name, n.primary_ip)
922                        for n in self.context.cfg.GetAllNodesInfo().values()
923                        if n.master_candidate)
924
925     # Remove master node
926     self._nodes.pop(self._my_hostname, None)
927
928     # TODO: Check consistency across nodes
929
930     self._queue_size = 0
931     self._UpdateQueueSizeUnlocked()
932     self._drained = self._IsQueueMarkedDrain()
933
934     # Setup worker pool
935     self._wpool = _JobQueueWorkerPool(self)
936     try:
937       self._InspectQueue()
938     except:
939       self._wpool.TerminateWorkers()
940       raise
941
942   @locking.ssynchronized(_LOCK)
943   @_RequireOpenQueue
944   def _InspectQueue(self):
945     """Loads the whole job queue and resumes unfinished jobs.
946
947     This function needs the lock here because WorkerPool.AddTask() may start a
948     job while we're still doing our work.
949
950     """
951     logging.info("Inspecting job queue")
952
953     all_job_ids = self._GetJobIDsUnlocked()
954     jobs_count = len(all_job_ids)
955     lastinfo = time.time()
956     for idx, job_id in enumerate(all_job_ids):
957       # Give an update every 1000 jobs or 10 seconds
958       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
959           idx == (jobs_count - 1)):
960         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
961                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
962         lastinfo = time.time()
963
964       job = self._LoadJobUnlocked(job_id)
965
966       # a failure in loading the job can cause 'None' to be returned
967       if job is None:
968         continue
969
970       status = job.CalcStatus()
971
972       if status in (constants.JOB_STATUS_QUEUED,
973                     constants.JOB_STATUS_WAITLOCK):
974         self._wpool.AddTask((job, ))
975
976       elif status in (constants.JOB_STATUS_RUNNING,
977                       constants.JOB_STATUS_CANCELING):
978         logging.warning("Unfinished job %s found: %s", job.id, job)
979         job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
980                               "Unclean master daemon shutdown")
981         self.UpdateJobUnlocked(job)
982
983     logging.info("Job queue inspection finished")
984
985   @locking.ssynchronized(_LOCK)
986   @_RequireOpenQueue
987   def AddNode(self, node):
988     """Register a new node with the queue.
989
990     @type node: L{objects.Node}
991     @param node: the node object to be added
992
993     """
994     node_name = node.name
995     assert node_name != self._my_hostname
996
997     # Clean queue directory on added node
998     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
999     msg = result.fail_msg
1000     if msg:
1001       logging.warning("Cannot cleanup queue directory on node %s: %s",
1002                       node_name, msg)
1003
1004     if not node.master_candidate:
1005       # remove if existing, ignoring errors
1006       self._nodes.pop(node_name, None)
1007       # and skip the replication of the job ids
1008       return
1009
1010     # Upload the whole queue excluding archived jobs
1011     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1012
1013     # Upload current serial file
1014     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1015
1016     for file_name in files:
1017       # Read file content
1018       content = utils.ReadFile(file_name)
1019
1020       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1021                                                   [node.primary_ip],
1022                                                   file_name, content)
1023       msg = result[node_name].fail_msg
1024       if msg:
1025         logging.error("Failed to upload file %s to node %s: %s",
1026                       file_name, node_name, msg)
1027
1028     self._nodes[node_name] = node.primary_ip
1029
1030   @locking.ssynchronized(_LOCK)
1031   @_RequireOpenQueue
1032   def RemoveNode(self, node_name):
1033     """Callback called when removing nodes from the cluster.
1034
1035     @type node_name: str
1036     @param node_name: the name of the node to remove
1037
1038     """
1039     self._nodes.pop(node_name, None)
1040
1041   @staticmethod
1042   def _CheckRpcResult(result, nodes, failmsg):
1043     """Verifies the status of an RPC call.
1044
1045     Since we aim to keep consistency should this node (the current
1046     master) fail, we will log errors if our rpc fail, and especially
1047     log the case when more than half of the nodes fails.
1048
1049     @param result: the data as returned from the rpc call
1050     @type nodes: list
1051     @param nodes: the list of nodes we made the call to
1052     @type failmsg: str
1053     @param failmsg: the identifier to be used for logging
1054
1055     """
1056     failed = []
1057     success = []
1058
1059     for node in nodes:
1060       msg = result[node].fail_msg
1061       if msg:
1062         failed.append(node)
1063         logging.error("RPC call %s (%s) failed on node %s: %s",
1064                       result[node].call, failmsg, node, msg)
1065       else:
1066         success.append(node)
1067
1068     # +1 for the master node
1069     if (len(success) + 1) < len(failed):
1070       # TODO: Handle failing nodes
1071       logging.error("More than half of the nodes failed")
1072
1073   def _GetNodeIp(self):
1074     """Helper for returning the node name/ip list.
1075
1076     @rtype: (list, list)
1077     @return: a tuple of two lists, the first one with the node
1078         names and the second one with the node addresses
1079
1080     """
1081     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1082     name_list = self._nodes.keys()
1083     addr_list = [self._nodes[name] for name in name_list]
1084     return name_list, addr_list
1085
1086   def _UpdateJobQueueFile(self, file_name, data, replicate):
1087     """Writes a file locally and then replicates it to all nodes.
1088
1089     This function will replace the contents of a file on the local
1090     node and then replicate it to all the other nodes we have.
1091
1092     @type file_name: str
1093     @param file_name: the path of the file to be replicated
1094     @type data: str
1095     @param data: the new contents of the file
1096     @type replicate: boolean
1097     @param replicate: whether to spread the changes to the remote nodes
1098
1099     """
1100     getents = runtime.GetEnts()
1101     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1102                     gid=getents.masterd_gid)
1103
1104     if replicate:
1105       names, addrs = self._GetNodeIp()
1106       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1107       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1108
1109   def _RenameFilesUnlocked(self, rename):
1110     """Renames a file locally and then replicate the change.
1111
1112     This function will rename a file in the local queue directory
1113     and then replicate this rename to all the other nodes we have.
1114
1115     @type rename: list of (old, new)
1116     @param rename: List containing tuples mapping old to new names
1117
1118     """
1119     # Rename them locally
1120     for old, new in rename:
1121       utils.RenameFile(old, new, mkdir=True)
1122
1123     # ... and on all nodes
1124     names, addrs = self._GetNodeIp()
1125     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1126     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1127
1128   @staticmethod
1129   def _FormatJobID(job_id):
1130     """Convert a job ID to string format.
1131
1132     Currently this just does C{str(job_id)} after performing some
1133     checks, but if we want to change the job id format this will
1134     abstract this change.
1135
1136     @type job_id: int or long
1137     @param job_id: the numeric job id
1138     @rtype: str
1139     @return: the formatted job id
1140
1141     """
1142     if not isinstance(job_id, (int, long)):
1143       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1144     if job_id < 0:
1145       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1146
1147     return str(job_id)
1148
1149   @classmethod
1150   def _GetArchiveDirectory(cls, job_id):
1151     """Returns the archive directory for a job.
1152
1153     @type job_id: str
1154     @param job_id: Job identifier
1155     @rtype: str
1156     @return: Directory name
1157
1158     """
1159     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1160
1161   def _NewSerialsUnlocked(self, count):
1162     """Generates a new job identifier.
1163
1164     Job identifiers are unique during the lifetime of a cluster.
1165
1166     @type count: integer
1167     @param count: how many serials to return
1168     @rtype: str
1169     @return: a string representing the job identifier.
1170
1171     """
1172     assert count > 0
1173     # New number
1174     serial = self._last_serial + count
1175
1176     # Write to file
1177     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1178                              "%s\n" % serial, True)
1179
1180     result = [self._FormatJobID(v)
1181               for v in range(self._last_serial, serial + 1)]
1182     # Keep it only if we were able to write the file
1183     self._last_serial = serial
1184
1185     return result
1186
1187   @staticmethod
1188   def _GetJobPath(job_id):
1189     """Returns the job file for a given job id.
1190
1191     @type job_id: str
1192     @param job_id: the job identifier
1193     @rtype: str
1194     @return: the path to the job file
1195
1196     """
1197     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1198
1199   @classmethod
1200   def _GetArchivedJobPath(cls, job_id):
1201     """Returns the archived job file for a give job id.
1202
1203     @type job_id: str
1204     @param job_id: the job identifier
1205     @rtype: str
1206     @return: the path to the archived job file
1207
1208     """
1209     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1210                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1211
1212   def _GetJobIDsUnlocked(self, sort=True):
1213     """Return all known job IDs.
1214
1215     The method only looks at disk because it's a requirement that all
1216     jobs are present on disk (so in the _memcache we don't have any
1217     extra IDs).
1218
1219     @type sort: boolean
1220     @param sort: perform sorting on the returned job ids
1221     @rtype: list
1222     @return: the list of job IDs
1223
1224     """
1225     jlist = []
1226     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1227       m = self._RE_JOB_FILE.match(filename)
1228       if m:
1229         jlist.append(m.group(1))
1230     if sort:
1231       jlist = utils.NiceSort(jlist)
1232     return jlist
1233
1234   def _LoadJobUnlocked(self, job_id):
1235     """Loads a job from the disk or memory.
1236
1237     Given a job id, this will return the cached job object if
1238     existing, or try to load the job from the disk. If loading from
1239     disk, it will also add the job to the cache.
1240
1241     @param job_id: the job id
1242     @rtype: L{_QueuedJob} or None
1243     @return: either None or the job object
1244
1245     """
1246     job = self._memcache.get(job_id, None)
1247     if job:
1248       logging.debug("Found job %s in memcache", job_id)
1249       return job
1250
1251     try:
1252       job = self._LoadJobFromDisk(job_id)
1253       if job is None:
1254         return job
1255     except errors.JobFileCorrupted:
1256       old_path = self._GetJobPath(job_id)
1257       new_path = self._GetArchivedJobPath(job_id)
1258       if old_path == new_path:
1259         # job already archived (future case)
1260         logging.exception("Can't parse job %s", job_id)
1261       else:
1262         # non-archived case
1263         logging.exception("Can't parse job %s, will archive.", job_id)
1264         self._RenameFilesUnlocked([(old_path, new_path)])
1265       return None
1266
1267     self._memcache[job_id] = job
1268     logging.debug("Added job %s to the cache", job_id)
1269     return job
1270
1271   def _LoadJobFromDisk(self, job_id):
1272     """Load the given job file from disk.
1273
1274     Given a job file, read, load and restore it in a _QueuedJob format.
1275
1276     @type job_id: string
1277     @param job_id: job identifier
1278     @rtype: L{_QueuedJob} or None
1279     @return: either None or the job object
1280
1281     """
1282     filepath = self._GetJobPath(job_id)
1283     logging.debug("Loading job from %s", filepath)
1284     try:
1285       raw_data = utils.ReadFile(filepath)
1286     except EnvironmentError, err:
1287       if err.errno in (errno.ENOENT, ):
1288         return None
1289       raise
1290
1291     try:
1292       data = serializer.LoadJson(raw_data)
1293       job = _QueuedJob.Restore(self, data)
1294     except Exception, err: # pylint: disable-msg=W0703
1295       raise errors.JobFileCorrupted(err)
1296
1297     return job
1298
1299   def SafeLoadJobFromDisk(self, job_id):
1300     """Load the given job file from disk.
1301
1302     Given a job file, read, load and restore it in a _QueuedJob format.
1303     In case of error reading the job, it gets returned as None, and the
1304     exception is logged.
1305
1306     @type job_id: string
1307     @param job_id: job identifier
1308     @rtype: L{_QueuedJob} or None
1309     @return: either None or the job object
1310
1311     """
1312     try:
1313       return self._LoadJobFromDisk(job_id)
1314     except (errors.JobFileCorrupted, EnvironmentError):
1315       logging.exception("Can't load/parse job %s", job_id)
1316       return None
1317
1318   @staticmethod
1319   def _IsQueueMarkedDrain():
1320     """Check if the queue is marked from drain.
1321
1322     This currently uses the queue drain file, which makes it a
1323     per-node flag. In the future this can be moved to the config file.
1324
1325     @rtype: boolean
1326     @return: True of the job queue is marked for draining
1327
1328     """
1329     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1330
1331   def _UpdateQueueSizeUnlocked(self):
1332     """Update the queue size.
1333
1334     """
1335     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1336
1337   @locking.ssynchronized(_LOCK)
1338   @_RequireOpenQueue
1339   def SetDrainFlag(self, drain_flag):
1340     """Sets the drain flag for the queue.
1341
1342     @type drain_flag: boolean
1343     @param drain_flag: Whether to set or unset the drain flag
1344
1345     """
1346     getents = runtime.GetEnts()
1347
1348     if drain_flag:
1349       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1350                       uid=getents.masterd_uid, gid=getents.masterd_gid)
1351     else:
1352       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1353
1354     self._drained = drain_flag
1355
1356     return True
1357
1358   @_RequireOpenQueue
1359   def _SubmitJobUnlocked(self, job_id, ops):
1360     """Create and store a new job.
1361
1362     This enters the job into our job queue and also puts it on the new
1363     queue, in order for it to be picked up by the queue processors.
1364
1365     @type job_id: job ID
1366     @param job_id: the job ID for the new job
1367     @type ops: list
1368     @param ops: The list of OpCodes that will become the new job.
1369     @rtype: L{_QueuedJob}
1370     @return: the job object to be queued
1371     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1372     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1373     @raise errors.GenericError: If an opcode is not valid
1374
1375     """
1376     # Ok when sharing the big job queue lock, as the drain file is created when
1377     # the lock is exclusive.
1378     if self._drained:
1379       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1380
1381     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1382       raise errors.JobQueueFull()
1383
1384     job = _QueuedJob(self, job_id, ops)
1385
1386     # Check priority
1387     for idx, op in enumerate(job.ops):
1388       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1389         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1390         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1391                                   " are %s" % (idx, op.priority, allowed))
1392
1393     # Write to disk
1394     self.UpdateJobUnlocked(job)
1395
1396     self._queue_size += 1
1397
1398     logging.debug("Adding new job %s to the cache", job_id)
1399     self._memcache[job_id] = job
1400
1401     return job
1402
1403   @locking.ssynchronized(_LOCK)
1404   @_RequireOpenQueue
1405   def SubmitJob(self, ops):
1406     """Create and store a new job.
1407
1408     @see: L{_SubmitJobUnlocked}
1409
1410     """
1411     job_id = self._NewSerialsUnlocked(1)[0]
1412     self._wpool.AddTask((self._SubmitJobUnlocked(job_id, ops), ))
1413     return job_id
1414
1415   @locking.ssynchronized(_LOCK)
1416   @_RequireOpenQueue
1417   def SubmitManyJobs(self, jobs):
1418     """Create and store multiple jobs.
1419
1420     @see: L{_SubmitJobUnlocked}
1421
1422     """
1423     results = []
1424     tasks = []
1425     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1426     for job_id, ops in zip(all_job_ids, jobs):
1427       try:
1428         tasks.append((self._SubmitJobUnlocked(job_id, ops), ))
1429         status = True
1430         data = job_id
1431       except errors.GenericError, err:
1432         data = str(err)
1433         status = False
1434       results.append((status, data))
1435     self._wpool.AddManyTasks(tasks)
1436
1437     return results
1438
1439   @_RequireOpenQueue
1440   def UpdateJobUnlocked(self, job, replicate=True):
1441     """Update a job's on disk storage.
1442
1443     After a job has been modified, this function needs to be called in
1444     order to write the changes to disk and replicate them to the other
1445     nodes.
1446
1447     @type job: L{_QueuedJob}
1448     @param job: the changed job
1449     @type replicate: boolean
1450     @param replicate: whether to replicate the change to remote nodes
1451
1452     """
1453     filename = self._GetJobPath(job.id)
1454     data = serializer.DumpJson(job.Serialize(), indent=False)
1455     logging.debug("Writing job %s to %s", job.id, filename)
1456     self._UpdateJobQueueFile(filename, data, replicate)
1457
1458   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1459                         timeout):
1460     """Waits for changes in a job.
1461
1462     @type job_id: string
1463     @param job_id: Job identifier
1464     @type fields: list of strings
1465     @param fields: Which fields to check for changes
1466     @type prev_job_info: list or None
1467     @param prev_job_info: Last job information returned
1468     @type prev_log_serial: int
1469     @param prev_log_serial: Last job message serial number
1470     @type timeout: float
1471     @param timeout: maximum time to wait in seconds
1472     @rtype: tuple (job info, log entries)
1473     @return: a tuple of the job information as required via
1474         the fields parameter, and the log entries as a list
1475
1476         if the job has not changed and the timeout has expired,
1477         we instead return a special value,
1478         L{constants.JOB_NOTCHANGED}, which should be interpreted
1479         as such by the clients
1480
1481     """
1482     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1483
1484     helper = _WaitForJobChangesHelper()
1485
1486     return helper(self._GetJobPath(job_id), load_fn,
1487                   fields, prev_job_info, prev_log_serial, timeout)
1488
1489   @locking.ssynchronized(_LOCK)
1490   @_RequireOpenQueue
1491   def CancelJob(self, job_id):
1492     """Cancels a job.
1493
1494     This will only succeed if the job has not started yet.
1495
1496     @type job_id: string
1497     @param job_id: job ID of job to be cancelled.
1498
1499     """
1500     logging.info("Cancelling job %s", job_id)
1501
1502     job = self._LoadJobUnlocked(job_id)
1503     if not job:
1504       logging.debug("Job %s not found", job_id)
1505       return (False, "Job %s not found" % job_id)
1506
1507     job_status = job.CalcStatus()
1508
1509     if job_status not in (constants.JOB_STATUS_QUEUED,
1510                           constants.JOB_STATUS_WAITLOCK):
1511       logging.debug("Job %s is no longer waiting in the queue", job.id)
1512       return (False, "Job %s is no longer waiting in the queue" % job.id)
1513
1514     if job_status == constants.JOB_STATUS_QUEUED:
1515       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1516                             "Job canceled by request")
1517       msg = "Job %s canceled" % job.id
1518
1519     elif job_status == constants.JOB_STATUS_WAITLOCK:
1520       # The worker will notice the new status and cancel the job
1521       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1522       msg = "Job %s will be canceled" % job.id
1523
1524     self.UpdateJobUnlocked(job)
1525
1526     return (True, msg)
1527
1528   @_RequireOpenQueue
1529   def _ArchiveJobsUnlocked(self, jobs):
1530     """Archives jobs.
1531
1532     @type jobs: list of L{_QueuedJob}
1533     @param jobs: Job objects
1534     @rtype: int
1535     @return: Number of archived jobs
1536
1537     """
1538     archive_jobs = []
1539     rename_files = []
1540     for job in jobs:
1541       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1542         logging.debug("Job %s is not yet done", job.id)
1543         continue
1544
1545       archive_jobs.append(job)
1546
1547       old = self._GetJobPath(job.id)
1548       new = self._GetArchivedJobPath(job.id)
1549       rename_files.append((old, new))
1550
1551     # TODO: What if 1..n files fail to rename?
1552     self._RenameFilesUnlocked(rename_files)
1553
1554     logging.debug("Successfully archived job(s) %s",
1555                   utils.CommaJoin(job.id for job in archive_jobs))
1556
1557     # Since we haven't quite checked, above, if we succeeded or failed renaming
1558     # the files, we update the cached queue size from the filesystem. When we
1559     # get around to fix the TODO: above, we can use the number of actually
1560     # archived jobs to fix this.
1561     self._UpdateQueueSizeUnlocked()
1562     return len(archive_jobs)
1563
1564   @locking.ssynchronized(_LOCK)
1565   @_RequireOpenQueue
1566   def ArchiveJob(self, job_id):
1567     """Archives a job.
1568
1569     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1570
1571     @type job_id: string
1572     @param job_id: Job ID of job to be archived.
1573     @rtype: bool
1574     @return: Whether job was archived
1575
1576     """
1577     logging.info("Archiving job %s", job_id)
1578
1579     job = self._LoadJobUnlocked(job_id)
1580     if not job:
1581       logging.debug("Job %s not found", job_id)
1582       return False
1583
1584     return self._ArchiveJobsUnlocked([job]) == 1
1585
1586   @locking.ssynchronized(_LOCK)
1587   @_RequireOpenQueue
1588   def AutoArchiveJobs(self, age, timeout):
1589     """Archives all jobs based on age.
1590
1591     The method will archive all jobs which are older than the age
1592     parameter. For jobs that don't have an end timestamp, the start
1593     timestamp will be considered. The special '-1' age will cause
1594     archival of all jobs (that are not running or queued).
1595
1596     @type age: int
1597     @param age: the minimum age in seconds
1598
1599     """
1600     logging.info("Archiving jobs with age more than %s seconds", age)
1601
1602     now = time.time()
1603     end_time = now + timeout
1604     archived_count = 0
1605     last_touched = 0
1606
1607     all_job_ids = self._GetJobIDsUnlocked()
1608     pending = []
1609     for idx, job_id in enumerate(all_job_ids):
1610       last_touched = idx + 1
1611
1612       # Not optimal because jobs could be pending
1613       # TODO: Measure average duration for job archival and take number of
1614       # pending jobs into account.
1615       if time.time() > end_time:
1616         break
1617
1618       # Returns None if the job failed to load
1619       job = self._LoadJobUnlocked(job_id)
1620       if job:
1621         if job.end_timestamp is None:
1622           if job.start_timestamp is None:
1623             job_age = job.received_timestamp
1624           else:
1625             job_age = job.start_timestamp
1626         else:
1627           job_age = job.end_timestamp
1628
1629         if age == -1 or now - job_age[0] > age:
1630           pending.append(job)
1631
1632           # Archive 10 jobs at a time
1633           if len(pending) >= 10:
1634             archived_count += self._ArchiveJobsUnlocked(pending)
1635             pending = []
1636
1637     if pending:
1638       archived_count += self._ArchiveJobsUnlocked(pending)
1639
1640     return (archived_count, len(all_job_ids) - last_touched)
1641
1642   def QueryJobs(self, job_ids, fields):
1643     """Returns a list of jobs in queue.
1644
1645     @type job_ids: list
1646     @param job_ids: sequence of job identifiers or None for all
1647     @type fields: list
1648     @param fields: names of fields to return
1649     @rtype: list
1650     @return: list one element per job, each element being list with
1651         the requested fields
1652
1653     """
1654     jobs = []
1655     list_all = False
1656     if not job_ids:
1657       # Since files are added to/removed from the queue atomically, there's no
1658       # risk of getting the job ids in an inconsistent state.
1659       job_ids = self._GetJobIDsUnlocked()
1660       list_all = True
1661
1662     for job_id in job_ids:
1663       job = self.SafeLoadJobFromDisk(job_id)
1664       if job is not None:
1665         jobs.append(job.GetInfo(fields))
1666       elif not list_all:
1667         jobs.append(None)
1668
1669     return jobs
1670
1671   @locking.ssynchronized(_LOCK)
1672   @_RequireOpenQueue
1673   def Shutdown(self):
1674     """Stops the job queue.
1675
1676     This shutdowns all the worker threads an closes the queue.
1677
1678     """
1679     self._wpool.TerminateWorkers()
1680
1681     self._queue_filelock.Close()
1682     self._queue_filelock = None