Hs2Py constants: add remaining '_autoconf.*' constants
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41   # pylint: disable=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import opcodes_base
53 from ganeti import errors
54 from ganeti import mcpu
55 from ganeti import utils
56 from ganeti import jstore
57 from ganeti import rpc
58 from ganeti import runtime
59 from ganeti import netutils
60 from ganeti import compat
61 from ganeti import ht
62 from ganeti import query
63 from ganeti import qlang
64 from ganeti import pathutils
65 from ganeti import vcluster
66
67
68 JOBQUEUE_THREADS = 25
69
70 # member lock names to be passed to @ssynchronized decorator
71 _LOCK = "_lock"
72 _QUEUE = "_queue"
73
74 #: Retrieves "id" attribute
75 _GetIdAttr = operator.attrgetter("id")
76
77
78 class CancelJob(Exception):
79   """Special exception to cancel a job.
80
81   """
82
83
84 class QueueShutdown(Exception):
85   """Special exception to abort a job when the job queue is shutting down.
86
87   """
88
89
90 def TimeStampNow():
91   """Returns the current timestamp.
92
93   @rtype: tuple
94   @return: the current time in the (seconds, microseconds) format
95
96   """
97   return utils.SplitTime(time.time())
98
99
100 def _CallJqUpdate(runner, names, file_name, content):
101   """Updates job queue file after virtualizing filename.
102
103   """
104   virt_file_name = vcluster.MakeVirtualPath(file_name)
105   return runner.call_jobqueue_update(names, virt_file_name, content)
106
107
108 class _SimpleJobQuery:
109   """Wrapper for job queries.
110
111   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
112
113   """
114   def __init__(self, fields):
115     """Initializes this class.
116
117     """
118     self._query = query.Query(query.JOB_FIELDS, fields)
119
120   def __call__(self, job):
121     """Executes a job query using cached field list.
122
123     """
124     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
125
126
127 class _QueuedOpCode(object):
128   """Encapsulates an opcode object.
129
130   @ivar log: holds the execution log and consists of tuples
131   of the form C{(log_serial, timestamp, level, message)}
132   @ivar input: the OpCode we encapsulate
133   @ivar status: the current status
134   @ivar result: the result of the LU execution
135   @ivar start_timestamp: timestamp for the start of the execution
136   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137   @ivar stop_timestamp: timestamp for the end of the execution
138
139   """
140   __slots__ = ["input", "status", "result", "log", "priority",
141                "start_timestamp", "exec_timestamp", "end_timestamp",
142                "__weakref__"]
143
144   def __init__(self, op):
145     """Initializes instances of this class.
146
147     @type op: L{opcodes.OpCode}
148     @param op: the opcode we encapsulate
149
150     """
151     self.input = op
152     self.status = constants.OP_STATUS_QUEUED
153     self.result = None
154     self.log = []
155     self.start_timestamp = None
156     self.exec_timestamp = None
157     self.end_timestamp = None
158
159     # Get initial priority (it might change during the lifetime of this opcode)
160     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
161
162   @classmethod
163   def Restore(cls, state):
164     """Restore the _QueuedOpCode from the serialized form.
165
166     @type state: dict
167     @param state: the serialized state
168     @rtype: _QueuedOpCode
169     @return: a new _QueuedOpCode instance
170
171     """
172     obj = _QueuedOpCode.__new__(cls)
173     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174     obj.status = state["status"]
175     obj.result = state["result"]
176     obj.log = state["log"]
177     obj.start_timestamp = state.get("start_timestamp", None)
178     obj.exec_timestamp = state.get("exec_timestamp", None)
179     obj.end_timestamp = state.get("end_timestamp", None)
180     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
181     return obj
182
183   def Serialize(self):
184     """Serializes this _QueuedOpCode.
185
186     @rtype: dict
187     @return: the dictionary holding the serialized state
188
189     """
190     return {
191       "input": self.input.__getstate__(),
192       "status": self.status,
193       "result": self.result,
194       "log": self.log,
195       "start_timestamp": self.start_timestamp,
196       "exec_timestamp": self.exec_timestamp,
197       "end_timestamp": self.end_timestamp,
198       "priority": self.priority,
199       }
200
201
202 class _QueuedJob(object):
203   """In-memory job representation.
204
205   This is what we use to track the user-submitted jobs. Locking must
206   be taken care of by users of this class.
207
208   @type queue: L{JobQueue}
209   @ivar queue: the parent queue
210   @ivar id: the job ID
211   @type ops: list
212   @ivar ops: the list of _QueuedOpCode that constitute the job
213   @type log_serial: int
214   @ivar log_serial: holds the index for the next log entry
215   @ivar received_timestamp: the timestamp for when the job was received
216   @ivar start_timestmap: the timestamp for start of execution
217   @ivar end_timestamp: the timestamp for end of execution
218   @ivar writable: Whether the job is allowed to be modified
219
220   """
221   # pylint: disable=W0212
222   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223                "received_timestamp", "start_timestamp", "end_timestamp",
224                "__weakref__", "processor_lock", "writable", "archived"]
225
226   def _AddReasons(self):
227     """Extend the reason trail
228
229     Add the reason for all the opcodes of this job to be executed.
230
231     """
232     count = 0
233     for queued_op in self.ops:
234       op = queued_op.input
235       reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236       reason_text = "job=%d;index=%d" % (self.id, count)
237       reason = getattr(op, "reason", [])
238       reason.append((reason_src, reason_text, utils.EpochNano()))
239       op.reason = reason
240       count = count + 1
241
242   def __init__(self, queue, job_id, ops, writable):
243     """Constructor for the _QueuedJob.
244
245     @type queue: L{JobQueue}
246     @param queue: our parent queue
247     @type job_id: job_id
248     @param job_id: our job id
249     @type ops: list
250     @param ops: the list of opcodes we hold, which will be encapsulated
251         in _QueuedOpCodes
252     @type writable: bool
253     @param writable: Whether job can be modified
254
255     """
256     if not ops:
257       raise errors.GenericError("A job needs at least one opcode")
258
259     self.queue = queue
260     self.id = int(job_id)
261     self.ops = [_QueuedOpCode(op) for op in ops]
262     self._AddReasons()
263     self.log_serial = 0
264     self.received_timestamp = TimeStampNow()
265     self.start_timestamp = None
266     self.end_timestamp = None
267     self.archived = False
268
269     self._InitInMemory(self, writable)
270
271     assert not self.archived, "New jobs can not be marked as archived"
272
273   @staticmethod
274   def _InitInMemory(obj, writable):
275     """Initializes in-memory variables.
276
277     """
278     obj.writable = writable
279     obj.ops_iter = None
280     obj.cur_opctx = None
281
282     # Read-only jobs are not processed and therefore don't need a lock
283     if writable:
284       obj.processor_lock = threading.Lock()
285     else:
286       obj.processor_lock = None
287
288   def __repr__(self):
289     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290               "id=%s" % self.id,
291               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292
293     return "<%s at %#x>" % (" ".join(status), id(self))
294
295   @classmethod
296   def Restore(cls, queue, state, writable, archived):
297     """Restore a _QueuedJob from serialized state:
298
299     @type queue: L{JobQueue}
300     @param queue: to which queue the restored job belongs
301     @type state: dict
302     @param state: the serialized state
303     @type writable: bool
304     @param writable: Whether job can be modified
305     @type archived: bool
306     @param archived: Whether job was already archived
307     @rtype: _JobQueue
308     @return: the restored _JobQueue instance
309
310     """
311     obj = _QueuedJob.__new__(cls)
312     obj.queue = queue
313     obj.id = int(state["id"])
314     obj.received_timestamp = state.get("received_timestamp", None)
315     obj.start_timestamp = state.get("start_timestamp", None)
316     obj.end_timestamp = state.get("end_timestamp", None)
317     obj.archived = archived
318
319     obj.ops = []
320     obj.log_serial = 0
321     for op_state in state["ops"]:
322       op = _QueuedOpCode.Restore(op_state)
323       for log_entry in op.log:
324         obj.log_serial = max(obj.log_serial, log_entry[0])
325       obj.ops.append(op)
326
327     cls._InitInMemory(obj, writable)
328
329     return obj
330
331   def Serialize(self):
332     """Serialize the _JobQueue instance.
333
334     @rtype: dict
335     @return: the serialized state
336
337     """
338     return {
339       "id": self.id,
340       "ops": [op.Serialize() for op in self.ops],
341       "start_timestamp": self.start_timestamp,
342       "end_timestamp": self.end_timestamp,
343       "received_timestamp": self.received_timestamp,
344       }
345
346   def CalcStatus(self):
347     """Compute the status of this job.
348
349     This function iterates over all the _QueuedOpCodes in the job and
350     based on their status, computes the job status.
351
352     The algorithm is:
353       - if we find a cancelled, or finished with error, the job
354         status will be the same
355       - otherwise, the last opcode with the status one of:
356           - waitlock
357           - canceling
358           - running
359
360         will determine the job status
361
362       - otherwise, it means either all opcodes are queued, or success,
363         and the job status will be the same
364
365     @return: the job status
366
367     """
368     status = constants.JOB_STATUS_QUEUED
369
370     all_success = True
371     for op in self.ops:
372       if op.status == constants.OP_STATUS_SUCCESS:
373         continue
374
375       all_success = False
376
377       if op.status == constants.OP_STATUS_QUEUED:
378         pass
379       elif op.status == constants.OP_STATUS_WAITING:
380         status = constants.JOB_STATUS_WAITING
381       elif op.status == constants.OP_STATUS_RUNNING:
382         status = constants.JOB_STATUS_RUNNING
383       elif op.status == constants.OP_STATUS_CANCELING:
384         status = constants.JOB_STATUS_CANCELING
385         break
386       elif op.status == constants.OP_STATUS_ERROR:
387         status = constants.JOB_STATUS_ERROR
388         # The whole job fails if one opcode failed
389         break
390       elif op.status == constants.OP_STATUS_CANCELED:
391         status = constants.OP_STATUS_CANCELED
392         break
393
394     if all_success:
395       status = constants.JOB_STATUS_SUCCESS
396
397     return status
398
399   def CalcPriority(self):
400     """Gets the current priority for this job.
401
402     Only unfinished opcodes are considered. When all are done, the default
403     priority is used.
404
405     @rtype: int
406
407     """
408     priorities = [op.priority for op in self.ops
409                   if op.status not in constants.OPS_FINALIZED]
410
411     if not priorities:
412       # All opcodes are done, assume default priority
413       return constants.OP_PRIO_DEFAULT
414
415     return min(priorities)
416
417   def GetLogEntries(self, newer_than):
418     """Selectively returns the log entries.
419
420     @type newer_than: None or int
421     @param newer_than: if this is None, return all log entries,
422         otherwise return only the log entries with serial higher
423         than this value
424     @rtype: list
425     @return: the list of the log entries selected
426
427     """
428     if newer_than is None:
429       serial = -1
430     else:
431       serial = newer_than
432
433     entries = []
434     for op in self.ops:
435       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
436
437     return entries
438
439   def GetInfo(self, fields):
440     """Returns information about a job.
441
442     @type fields: list
443     @param fields: names of fields to return
444     @rtype: list
445     @return: list with one element for each field
446     @raise errors.OpExecError: when an invalid field
447         has been passed
448
449     """
450     return _SimpleJobQuery(fields)(self)
451
452   def MarkUnfinishedOps(self, status, result):
453     """Mark unfinished opcodes with a given status and result.
454
455     This is an utility function for marking all running or waiting to
456     be run opcodes with a given status. Opcodes which are already
457     finalised are not changed.
458
459     @param status: a given opcode status
460     @param result: the opcode result
461
462     """
463     not_marked = True
464     for op in self.ops:
465       if op.status in constants.OPS_FINALIZED:
466         assert not_marked, "Finalized opcodes found after non-finalized ones"
467         continue
468       op.status = status
469       op.result = result
470       not_marked = False
471
472   def Finalize(self):
473     """Marks the job as finalized.
474
475     """
476     self.end_timestamp = TimeStampNow()
477
478   def Cancel(self):
479     """Marks job as canceled/-ing if possible.
480
481     @rtype: tuple; (bool, string)
482     @return: Boolean describing whether job was successfully canceled or marked
483       as canceling and a text message
484
485     """
486     status = self.CalcStatus()
487
488     if status == constants.JOB_STATUS_QUEUED:
489       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490                              "Job canceled by request")
491       self.Finalize()
492       return (True, "Job %s canceled" % self.id)
493
494     elif status == constants.JOB_STATUS_WAITING:
495       # The worker will notice the new status and cancel the job
496       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497       return (True, "Job %s will be canceled" % self.id)
498
499     else:
500       logging.debug("Job %s is no longer waiting in the queue", self.id)
501       return (False, "Job %s is no longer waiting in the queue" % self.id)
502
503   def ChangePriority(self, priority):
504     """Changes the job priority.
505
506     @type priority: int
507     @param priority: New priority
508     @rtype: tuple; (bool, string)
509     @return: Boolean describing whether job's priority was successfully changed
510       and a text message
511
512     """
513     status = self.CalcStatus()
514
515     if status in constants.JOBS_FINALIZED:
516       return (False, "Job %s is finished" % self.id)
517     elif status == constants.JOB_STATUS_CANCELING:
518       return (False, "Job %s is cancelling" % self.id)
519     else:
520       assert status in (constants.JOB_STATUS_QUEUED,
521                         constants.JOB_STATUS_WAITING,
522                         constants.JOB_STATUS_RUNNING)
523
524       changed = False
525       for op in self.ops:
526         if (op.status == constants.OP_STATUS_RUNNING or
527             op.status in constants.OPS_FINALIZED):
528           assert not changed, \
529             ("Found opcode for which priority should not be changed after"
530              " priority has been changed for previous opcodes")
531           continue
532
533         assert op.status in (constants.OP_STATUS_QUEUED,
534                              constants.OP_STATUS_WAITING)
535
536         changed = True
537
538         # Set new priority (doesn't modify opcode input)
539         op.priority = priority
540
541       if changed:
542         return (True, ("Priorities of pending opcodes for job %s have been"
543                        " changed to %s" % (self.id, priority)))
544       else:
545         return (False, "Job %s had no pending opcodes" % self.id)
546
547
548 class _OpExecCallbacks(mcpu.OpExecCbBase):
549   def __init__(self, queue, job, op):
550     """Initializes this class.
551
552     @type queue: L{JobQueue}
553     @param queue: Job queue
554     @type job: L{_QueuedJob}
555     @param job: Job object
556     @type op: L{_QueuedOpCode}
557     @param op: OpCode
558
559     """
560     assert queue, "Queue is missing"
561     assert job, "Job is missing"
562     assert op, "Opcode is missing"
563
564     self._queue = queue
565     self._job = job
566     self._op = op
567
568   def _CheckCancel(self):
569     """Raises an exception to cancel the job if asked to.
570
571     """
572     # Cancel here if we were asked to
573     if self._op.status == constants.OP_STATUS_CANCELING:
574       logging.debug("Canceling opcode")
575       raise CancelJob()
576
577     # See if queue is shutting down
578     if not self._queue.AcceptingJobsUnlocked():
579       logging.debug("Queue is shutting down")
580       raise QueueShutdown()
581
582   @locking.ssynchronized(_QUEUE, shared=1)
583   def NotifyStart(self):
584     """Mark the opcode as running, not lock-waiting.
585
586     This is called from the mcpu code as a notifier function, when the LU is
587     finally about to start the Exec() method. Of course, to have end-user
588     visible results, the opcode must be initially (before calling into
589     Processor.ExecOpCode) set to OP_STATUS_WAITING.
590
591     """
592     assert self._op in self._job.ops
593     assert self._op.status in (constants.OP_STATUS_WAITING,
594                                constants.OP_STATUS_CANCELING)
595
596     # Cancel here if we were asked to
597     self._CheckCancel()
598
599     logging.debug("Opcode is now running")
600
601     self._op.status = constants.OP_STATUS_RUNNING
602     self._op.exec_timestamp = TimeStampNow()
603
604     # And finally replicate the job status
605     self._queue.UpdateJobUnlocked(self._job)
606
607   @locking.ssynchronized(_QUEUE, shared=1)
608   def _AppendFeedback(self, timestamp, log_type, log_msg):
609     """Internal feedback append function, with locks
610
611     """
612     self._job.log_serial += 1
613     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
614     self._queue.UpdateJobUnlocked(self._job, replicate=False)
615
616   def Feedback(self, *args):
617     """Append a log entry.
618
619     """
620     assert len(args) < 3
621
622     if len(args) == 1:
623       log_type = constants.ELOG_MESSAGE
624       log_msg = args[0]
625     else:
626       (log_type, log_msg) = args
627
628     # The time is split to make serialization easier and not lose
629     # precision.
630     timestamp = utils.SplitTime(time.time())
631     self._AppendFeedback(timestamp, log_type, log_msg)
632
633   def CurrentPriority(self):
634     """Returns current priority for opcode.
635
636     """
637     assert self._op.status in (constants.OP_STATUS_WAITING,
638                                constants.OP_STATUS_CANCELING)
639
640     # Cancel here if we were asked to
641     self._CheckCancel()
642
643     return self._op.priority
644
645   def SubmitManyJobs(self, jobs):
646     """Submits jobs for processing.
647
648     See L{JobQueue.SubmitManyJobs}.
649
650     """
651     # Locking is done in job queue
652     return self._queue.SubmitManyJobs(jobs)
653
654
655 class _JobChangesChecker(object):
656   def __init__(self, fields, prev_job_info, prev_log_serial):
657     """Initializes this class.
658
659     @type fields: list of strings
660     @param fields: Fields requested by LUXI client
661     @type prev_job_info: string
662     @param prev_job_info: previous job info, as passed by the LUXI client
663     @type prev_log_serial: string
664     @param prev_log_serial: previous job serial, as passed by the LUXI client
665
666     """
667     self._squery = _SimpleJobQuery(fields)
668     self._prev_job_info = prev_job_info
669     self._prev_log_serial = prev_log_serial
670
671   def __call__(self, job):
672     """Checks whether job has changed.
673
674     @type job: L{_QueuedJob}
675     @param job: Job object
676
677     """
678     assert not job.writable, "Expected read-only job"
679
680     status = job.CalcStatus()
681     job_info = self._squery(job)
682     log_entries = job.GetLogEntries(self._prev_log_serial)
683
684     # Serializing and deserializing data can cause type changes (e.g. from
685     # tuple to list) or precision loss. We're doing it here so that we get
686     # the same modifications as the data received from the client. Without
687     # this, the comparison afterwards might fail without the data being
688     # significantly different.
689     # TODO: we just deserialized from disk, investigate how to make sure that
690     # the job info and log entries are compatible to avoid this further step.
691     # TODO: Doing something like in testutils.py:UnifyValueType might be more
692     # efficient, though floats will be tricky
693     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
694     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695
696     # Don't even try to wait if the job is no longer running, there will be
697     # no changes.
698     if (status not in (constants.JOB_STATUS_QUEUED,
699                        constants.JOB_STATUS_RUNNING,
700                        constants.JOB_STATUS_WAITING) or
701         job_info != self._prev_job_info or
702         (log_entries and self._prev_log_serial != log_entries[0][0])):
703       logging.debug("Job %s changed", job.id)
704       return (job_info, log_entries)
705
706     return None
707
708
709 class _JobFileChangesWaiter(object):
710   def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
711     """Initializes this class.
712
713     @type filename: string
714     @param filename: Path to job file
715     @raises errors.InotifyError: if the notifier cannot be setup
716
717     """
718     self._wm = _inotify_wm_cls()
719     self._inotify_handler = \
720       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721     self._notifier = \
722       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723     try:
724       self._inotify_handler.enable()
725     except Exception:
726       # pyinotify doesn't close file descriptors automatically
727       self._notifier.stop()
728       raise
729
730   def _OnInotify(self, notifier_enabled):
731     """Callback for inotify.
732
733     """
734     if not notifier_enabled:
735       self._inotify_handler.enable()
736
737   def Wait(self, timeout):
738     """Waits for the job file to change.
739
740     @type timeout: float
741     @param timeout: Timeout in seconds
742     @return: Whether there have been events
743
744     """
745     assert timeout >= 0
746     have_events = self._notifier.check_events(timeout * 1000)
747     if have_events:
748       self._notifier.read_events()
749     self._notifier.process_events()
750     return have_events
751
752   def Close(self):
753     """Closes underlying notifier and its file descriptor.
754
755     """
756     self._notifier.stop()
757
758
759 class _JobChangesWaiter(object):
760   def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
761     """Initializes this class.
762
763     @type filename: string
764     @param filename: Path to job file
765
766     """
767     self._filewaiter = None
768     self._filename = filename
769     self._waiter_cls = _waiter_cls
770
771   def Wait(self, timeout):
772     """Waits for a job to change.
773
774     @type timeout: float
775     @param timeout: Timeout in seconds
776     @return: Whether there have been events
777
778     """
779     if self._filewaiter:
780       return self._filewaiter.Wait(timeout)
781
782     # Lazy setup: Avoid inotify setup cost when job file has already changed.
783     # If this point is reached, return immediately and let caller check the job
784     # file again in case there were changes since the last check. This avoids a
785     # race condition.
786     self._filewaiter = self._waiter_cls(self._filename)
787
788     return True
789
790   def Close(self):
791     """Closes underlying waiter.
792
793     """
794     if self._filewaiter:
795       self._filewaiter.Close()
796
797
798 class _WaitForJobChangesHelper(object):
799   """Helper class using inotify to wait for changes in a job file.
800
801   This class takes a previous job status and serial, and alerts the client when
802   the current job status has changed.
803
804   """
805   @staticmethod
806   def _CheckForChanges(counter, job_load_fn, check_fn):
807     if counter.next() > 0:
808       # If this isn't the first check the job is given some more time to change
809       # again. This gives better performance for jobs generating many
810       # changes/messages.
811       time.sleep(0.1)
812
813     job = job_load_fn()
814     if not job:
815       raise errors.JobLost()
816
817     result = check_fn(job)
818     if result is None:
819       raise utils.RetryAgain()
820
821     return result
822
823   def __call__(self, filename, job_load_fn,
824                fields, prev_job_info, prev_log_serial, timeout,
825                _waiter_cls=_JobChangesWaiter):
826     """Waits for changes on a job.
827
828     @type filename: string
829     @param filename: File on which to wait for changes
830     @type job_load_fn: callable
831     @param job_load_fn: Function to load job
832     @type fields: list of strings
833     @param fields: Which fields to check for changes
834     @type prev_job_info: list or None
835     @param prev_job_info: Last job information returned
836     @type prev_log_serial: int
837     @param prev_log_serial: Last job message serial number
838     @type timeout: float
839     @param timeout: maximum time to wait in seconds
840
841     """
842     counter = itertools.count()
843     try:
844       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
845       waiter = _waiter_cls(filename)
846       try:
847         return utils.Retry(compat.partial(self._CheckForChanges,
848                                           counter, job_load_fn, check_fn),
849                            utils.RETRY_REMAINING_TIME, timeout,
850                            wait_fn=waiter.Wait)
851       finally:
852         waiter.Close()
853     except errors.JobLost:
854       return None
855     except utils.RetryTimeout:
856       return constants.JOB_NOTCHANGED
857
858
859 def _EncodeOpError(err):
860   """Encodes an error which occurred while processing an opcode.
861
862   """
863   if isinstance(err, errors.GenericError):
864     to_encode = err
865   else:
866     to_encode = errors.OpExecError(str(err))
867
868   return errors.EncodeException(to_encode)
869
870
871 class _TimeoutStrategyWrapper:
872   def __init__(self, fn):
873     """Initializes this class.
874
875     """
876     self._fn = fn
877     self._next = None
878
879   def _Advance(self):
880     """Gets the next timeout if necessary.
881
882     """
883     if self._next is None:
884       self._next = self._fn()
885
886   def Peek(self):
887     """Returns the next timeout.
888
889     """
890     self._Advance()
891     return self._next
892
893   def Next(self):
894     """Returns the current timeout and advances the internal state.
895
896     """
897     self._Advance()
898     result = self._next
899     self._next = None
900     return result
901
902
903 class _OpExecContext:
904   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
905     """Initializes this class.
906
907     """
908     self.op = op
909     self.index = index
910     self.log_prefix = log_prefix
911     self.summary = op.input.Summary()
912
913     # Create local copy to modify
914     if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
915       self.jobdeps = op.input.depends[:]
916     else:
917       self.jobdeps = None
918
919     self._timeout_strategy_factory = timeout_strategy_factory
920     self._ResetTimeoutStrategy()
921
922   def _ResetTimeoutStrategy(self):
923     """Creates a new timeout strategy.
924
925     """
926     self._timeout_strategy = \
927       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928
929   def CheckPriorityIncrease(self):
930     """Checks whether priority can and should be increased.
931
932     Called when locks couldn't be acquired.
933
934     """
935     op = self.op
936
937     # Exhausted all retries and next round should not use blocking acquire
938     # for locks?
939     if (self._timeout_strategy.Peek() is None and
940         op.priority > constants.OP_PRIO_HIGHEST):
941       logging.debug("Increasing priority")
942       op.priority -= 1
943       self._ResetTimeoutStrategy()
944       return True
945
946     return False
947
948   def GetNextLockTimeout(self):
949     """Returns the next lock acquire timeout.
950
951     """
952     return self._timeout_strategy.Next()
953
954
955 class _JobProcessor(object):
956   (DEFER,
957    WAITDEP,
958    FINISHED) = range(1, 4)
959
960   def __init__(self, queue, opexec_fn, job,
961                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
962     """Initializes this class.
963
964     """
965     self.queue = queue
966     self.opexec_fn = opexec_fn
967     self.job = job
968     self._timeout_strategy_factory = _timeout_strategy_factory
969
970   @staticmethod
971   def _FindNextOpcode(job, timeout_strategy_factory):
972     """Locates the next opcode to run.
973
974     @type job: L{_QueuedJob}
975     @param job: Job object
976     @param timeout_strategy_factory: Callable to create new timeout strategy
977
978     """
979     # Create some sort of a cache to speed up locating next opcode for future
980     # lookups
981     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
982     # pending and one for processed ops.
983     if job.ops_iter is None:
984       job.ops_iter = enumerate(job.ops)
985
986     # Find next opcode to run
987     while True:
988       try:
989         (idx, op) = job.ops_iter.next()
990       except StopIteration:
991         raise errors.ProgrammerError("Called for a finished job")
992
993       if op.status == constants.OP_STATUS_RUNNING:
994         # Found an opcode already marked as running
995         raise errors.ProgrammerError("Called for job marked as running")
996
997       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
998                              timeout_strategy_factory)
999
1000       if op.status not in constants.OPS_FINALIZED:
1001         return opctx
1002
1003       # This is a job that was partially completed before master daemon
1004       # shutdown, so it can be expected that some opcodes are already
1005       # completed successfully (if any did error out, then the whole job
1006       # should have been aborted and not resubmitted for processing).
1007       logging.info("%s: opcode %s already processed, skipping",
1008                    opctx.log_prefix, opctx.summary)
1009
1010   @staticmethod
1011   def _MarkWaitlock(job, op):
1012     """Marks an opcode as waiting for locks.
1013
1014     The job's start timestamp is also set if necessary.
1015
1016     @type job: L{_QueuedJob}
1017     @param job: Job object
1018     @type op: L{_QueuedOpCode}
1019     @param op: Opcode object
1020
1021     """
1022     assert op in job.ops
1023     assert op.status in (constants.OP_STATUS_QUEUED,
1024                          constants.OP_STATUS_WAITING)
1025
1026     update = False
1027
1028     op.result = None
1029
1030     if op.status == constants.OP_STATUS_QUEUED:
1031       op.status = constants.OP_STATUS_WAITING
1032       update = True
1033
1034     if op.start_timestamp is None:
1035       op.start_timestamp = TimeStampNow()
1036       update = True
1037
1038     if job.start_timestamp is None:
1039       job.start_timestamp = op.start_timestamp
1040       update = True
1041
1042     assert op.status == constants.OP_STATUS_WAITING
1043
1044     return update
1045
1046   @staticmethod
1047   def _CheckDependencies(queue, job, opctx):
1048     """Checks if an opcode has dependencies and if so, processes them.
1049
1050     @type queue: L{JobQueue}
1051     @param queue: Queue object
1052     @type job: L{_QueuedJob}
1053     @param job: Job object
1054     @type opctx: L{_OpExecContext}
1055     @param opctx: Opcode execution context
1056     @rtype: bool
1057     @return: Whether opcode will be re-scheduled by dependency tracker
1058
1059     """
1060     op = opctx.op
1061
1062     result = False
1063
1064     while opctx.jobdeps:
1065       (dep_job_id, dep_status) = opctx.jobdeps[0]
1066
1067       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068                                                           dep_status)
1069       assert ht.TNonEmptyString(depmsg), "No dependency message"
1070
1071       logging.info("%s: %s", opctx.log_prefix, depmsg)
1072
1073       if depresult == _JobDependencyManager.CONTINUE:
1074         # Remove dependency and continue
1075         opctx.jobdeps.pop(0)
1076
1077       elif depresult == _JobDependencyManager.WAIT:
1078         # Need to wait for notification, dependency tracker will re-add job
1079         # to workerpool
1080         result = True
1081         break
1082
1083       elif depresult == _JobDependencyManager.CANCEL:
1084         # Job was cancelled, cancel this job as well
1085         job.Cancel()
1086         assert op.status == constants.OP_STATUS_CANCELING
1087         break
1088
1089       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1090                          _JobDependencyManager.ERROR):
1091         # Job failed or there was an error, this job must fail
1092         op.status = constants.OP_STATUS_ERROR
1093         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1094         break
1095
1096       else:
1097         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1098                                      depresult)
1099
1100     return result
1101
1102   def _ExecOpCodeUnlocked(self, opctx):
1103     """Processes one opcode and returns the result.
1104
1105     """
1106     op = opctx.op
1107
1108     assert op.status == constants.OP_STATUS_WAITING
1109
1110     timeout = opctx.GetNextLockTimeout()
1111
1112     try:
1113       # Make sure not to hold queue lock while calling ExecOpCode
1114       result = self.opexec_fn(op.input,
1115                               _OpExecCallbacks(self.queue, self.job, op),
1116                               timeout=timeout)
1117     except mcpu.LockAcquireTimeout:
1118       assert timeout is not None, "Received timeout for blocking acquire"
1119       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120
1121       assert op.status in (constants.OP_STATUS_WAITING,
1122                            constants.OP_STATUS_CANCELING)
1123
1124       # Was job cancelled while we were waiting for the lock?
1125       if op.status == constants.OP_STATUS_CANCELING:
1126         return (constants.OP_STATUS_CANCELING, None)
1127
1128       # Queue is shutting down, return to queued
1129       if not self.queue.AcceptingJobsUnlocked():
1130         return (constants.OP_STATUS_QUEUED, None)
1131
1132       # Stay in waitlock while trying to re-acquire lock
1133       return (constants.OP_STATUS_WAITING, None)
1134     except CancelJob:
1135       logging.exception("%s: Canceling job", opctx.log_prefix)
1136       assert op.status == constants.OP_STATUS_CANCELING
1137       return (constants.OP_STATUS_CANCELING, None)
1138
1139     except QueueShutdown:
1140       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141
1142       assert op.status == constants.OP_STATUS_WAITING
1143
1144       # Job hadn't been started yet, so it should return to the queue
1145       return (constants.OP_STATUS_QUEUED, None)
1146
1147     except Exception, err: # pylint: disable=W0703
1148       logging.exception("%s: Caught exception in %s",
1149                         opctx.log_prefix, opctx.summary)
1150       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151     else:
1152       logging.debug("%s: %s successful",
1153                     opctx.log_prefix, opctx.summary)
1154       return (constants.OP_STATUS_SUCCESS, result)
1155
1156   def __call__(self, _nextop_fn=None):
1157     """Continues execution of a job.
1158
1159     @param _nextop_fn: Callback function for tests
1160     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1161       be deferred and C{WAITDEP} if the dependency manager
1162       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1163
1164     """
1165     queue = self.queue
1166     job = self.job
1167
1168     logging.debug("Processing job %s", job.id)
1169
1170     queue.acquire(shared=1)
1171     try:
1172       opcount = len(job.ops)
1173
1174       assert job.writable, "Expected writable job"
1175
1176       # Don't do anything for finalized jobs
1177       if job.CalcStatus() in constants.JOBS_FINALIZED:
1178         return self.FINISHED
1179
1180       # Is a previous opcode still pending?
1181       if job.cur_opctx:
1182         opctx = job.cur_opctx
1183         job.cur_opctx = None
1184       else:
1185         if __debug__ and _nextop_fn:
1186           _nextop_fn()
1187         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1188
1189       op = opctx.op
1190
1191       # Consistency check
1192       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1193                                      constants.OP_STATUS_CANCELING)
1194                         for i in job.ops[opctx.index + 1:])
1195
1196       assert op.status in (constants.OP_STATUS_QUEUED,
1197                            constants.OP_STATUS_WAITING,
1198                            constants.OP_STATUS_CANCELING)
1199
1200       assert (op.priority <= constants.OP_PRIO_LOWEST and
1201               op.priority >= constants.OP_PRIO_HIGHEST)
1202
1203       waitjob = None
1204
1205       if op.status != constants.OP_STATUS_CANCELING:
1206         assert op.status in (constants.OP_STATUS_QUEUED,
1207                              constants.OP_STATUS_WAITING)
1208
1209         # Prepare to start opcode
1210         if self._MarkWaitlock(job, op):
1211           # Write to disk
1212           queue.UpdateJobUnlocked(job)
1213
1214         assert op.status == constants.OP_STATUS_WAITING
1215         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1216         assert job.start_timestamp and op.start_timestamp
1217         assert waitjob is None
1218
1219         # Check if waiting for a job is necessary
1220         waitjob = self._CheckDependencies(queue, job, opctx)
1221
1222         assert op.status in (constants.OP_STATUS_WAITING,
1223                              constants.OP_STATUS_CANCELING,
1224                              constants.OP_STATUS_ERROR)
1225
1226         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1227                                          constants.OP_STATUS_ERROR)):
1228           logging.info("%s: opcode %s waiting for locks",
1229                        opctx.log_prefix, opctx.summary)
1230
1231           assert not opctx.jobdeps, "Not all dependencies were removed"
1232
1233           queue.release()
1234           try:
1235             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236           finally:
1237             queue.acquire(shared=1)
1238
1239           op.status = op_status
1240           op.result = op_result
1241
1242           assert not waitjob
1243
1244         if op.status in (constants.OP_STATUS_WAITING,
1245                          constants.OP_STATUS_QUEUED):
1246           # waiting: Couldn't get locks in time
1247           # queued: Queue is shutting down
1248           assert not op.end_timestamp
1249         else:
1250           # Finalize opcode
1251           op.end_timestamp = TimeStampNow()
1252
1253           if op.status == constants.OP_STATUS_CANCELING:
1254             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1255                                   for i in job.ops[opctx.index:])
1256           else:
1257             assert op.status in constants.OPS_FINALIZED
1258
1259       if op.status == constants.OP_STATUS_QUEUED:
1260         # Queue is shutting down
1261         assert not waitjob
1262
1263         finalize = False
1264
1265         # Reset context
1266         job.cur_opctx = None
1267
1268         # In no case must the status be finalized here
1269         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270
1271       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1272         finalize = False
1273
1274         if not waitjob and opctx.CheckPriorityIncrease():
1275           # Priority was changed, need to update on-disk file
1276           queue.UpdateJobUnlocked(job)
1277
1278         # Keep around for another round
1279         job.cur_opctx = opctx
1280
1281         assert (op.priority <= constants.OP_PRIO_LOWEST and
1282                 op.priority >= constants.OP_PRIO_HIGHEST)
1283
1284         # In no case must the status be finalized here
1285         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1286
1287       else:
1288         # Ensure all opcodes so far have been successful
1289         assert (opctx.index == 0 or
1290                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1291                            for i in job.ops[:opctx.index]))
1292
1293         # Reset context
1294         job.cur_opctx = None
1295
1296         if op.status == constants.OP_STATUS_SUCCESS:
1297           finalize = False
1298
1299         elif op.status == constants.OP_STATUS_ERROR:
1300           # Ensure failed opcode has an exception as its result
1301           assert errors.GetEncodedError(job.ops[opctx.index].result)
1302
1303           to_encode = errors.OpExecError("Preceding opcode failed")
1304           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1305                                 _EncodeOpError(to_encode))
1306           finalize = True
1307
1308           # Consistency check
1309           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1310                             errors.GetEncodedError(i.result)
1311                             for i in job.ops[opctx.index:])
1312
1313         elif op.status == constants.OP_STATUS_CANCELING:
1314           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1315                                 "Job canceled by request")
1316           finalize = True
1317
1318         else:
1319           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320
1321         if opctx.index == (opcount - 1):
1322           # Finalize on last opcode
1323           finalize = True
1324
1325         if finalize:
1326           # All opcodes have been run, finalize job
1327           job.Finalize()
1328
1329         # Write to disk. If the job status is final, this is the final write
1330         # allowed. Once the file has been written, it can be archived anytime.
1331         queue.UpdateJobUnlocked(job)
1332
1333         assert not waitjob
1334
1335         if finalize:
1336           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1337           return self.FINISHED
1338
1339       assert not waitjob or queue.depmgr.JobWaiting(job)
1340
1341       if waitjob:
1342         return self.WAITDEP
1343       else:
1344         return self.DEFER
1345     finally:
1346       assert job.writable, "Job became read-only while being processed"
1347       queue.release()
1348
1349
1350 def _EvaluateJobProcessorResult(depmgr, job, result):
1351   """Looks at a result from L{_JobProcessor} for a job.
1352
1353   To be used in a L{_JobQueueWorker}.
1354
1355   """
1356   if result == _JobProcessor.FINISHED:
1357     # Notify waiting jobs
1358     depmgr.NotifyWaiters(job.id)
1359
1360   elif result == _JobProcessor.DEFER:
1361     # Schedule again
1362     raise workerpool.DeferTask(priority=job.CalcPriority())
1363
1364   elif result == _JobProcessor.WAITDEP:
1365     # No-op, dependency manager will re-schedule
1366     pass
1367
1368   else:
1369     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1370                                  (result, ))
1371
1372
1373 class _JobQueueWorker(workerpool.BaseWorker):
1374   """The actual job workers.
1375
1376   """
1377   def RunTask(self, job): # pylint: disable=W0221
1378     """Job executor.
1379
1380     @type job: L{_QueuedJob}
1381     @param job: the job to be processed
1382
1383     """
1384     assert job.writable, "Expected writable job"
1385
1386     # Ensure only one worker is active on a single job. If a job registers for
1387     # a dependency job, and the other job notifies before the first worker is
1388     # done, the job can end up in the tasklist more than once.
1389     job.processor_lock.acquire()
1390     try:
1391       return self._RunTaskInner(job)
1392     finally:
1393       job.processor_lock.release()
1394
1395   def _RunTaskInner(self, job):
1396     """Executes a job.
1397
1398     Must be called with per-job lock acquired.
1399
1400     """
1401     queue = job.queue
1402     assert queue == self.pool.queue
1403
1404     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1405     setname_fn(None)
1406
1407     proc = mcpu.Processor(queue.context, job.id)
1408
1409     # Create wrapper for setting thread name
1410     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1411                                     proc.ExecOpCode)
1412
1413     _EvaluateJobProcessorResult(queue.depmgr, job,
1414                                 _JobProcessor(queue, wrap_execop_fn, job)())
1415
1416   @staticmethod
1417   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1418     """Updates the worker thread name to include a short summary of the opcode.
1419
1420     @param setname_fn: Callable setting worker thread name
1421     @param execop_fn: Callable for executing opcode (usually
1422                       L{mcpu.Processor.ExecOpCode})
1423
1424     """
1425     setname_fn(op)
1426     try:
1427       return execop_fn(op, *args, **kwargs)
1428     finally:
1429       setname_fn(None)
1430
1431   @staticmethod
1432   def _GetWorkerName(job, op):
1433     """Sets the worker thread name.
1434
1435     @type job: L{_QueuedJob}
1436     @type op: L{opcodes.OpCode}
1437
1438     """
1439     parts = ["Job%s" % job.id]
1440
1441     if op:
1442       parts.append(op.TinySummary())
1443
1444     return "/".join(parts)
1445
1446
1447 class _JobQueueWorkerPool(workerpool.WorkerPool):
1448   """Simple class implementing a job-processing workerpool.
1449
1450   """
1451   def __init__(self, queue):
1452     super(_JobQueueWorkerPool, self).__init__("Jq",
1453                                               JOBQUEUE_THREADS,
1454                                               _JobQueueWorker)
1455     self.queue = queue
1456
1457
1458 class _JobDependencyManager:
1459   """Keeps track of job dependencies.
1460
1461   """
1462   (WAIT,
1463    ERROR,
1464    CANCEL,
1465    CONTINUE,
1466    WRONGSTATUS) = range(1, 6)
1467
1468   def __init__(self, getstatus_fn, enqueue_fn):
1469     """Initializes this class.
1470
1471     """
1472     self._getstatus_fn = getstatus_fn
1473     self._enqueue_fn = enqueue_fn
1474
1475     self._waiters = {}
1476     self._lock = locking.SharedLock("JobDepMgr")
1477
1478   @locking.ssynchronized(_LOCK, shared=1)
1479   def GetLockInfo(self, requested): # pylint: disable=W0613
1480     """Retrieves information about waiting jobs.
1481
1482     @type requested: set
1483     @param requested: Requested information, see C{query.LQ_*}
1484
1485     """
1486     # No need to sort here, that's being done by the lock manager and query
1487     # library. There are no priorities for notifying jobs, hence all show up as
1488     # one item under "pending".
1489     return [("job/%s" % job_id, None, None,
1490              [("job", [job.id for job in waiters])])
1491             for job_id, waiters in self._waiters.items()
1492             if waiters]
1493
1494   @locking.ssynchronized(_LOCK, shared=1)
1495   def JobWaiting(self, job):
1496     """Checks if a job is waiting.
1497
1498     """
1499     return compat.any(job in jobs
1500                       for jobs in self._waiters.values())
1501
1502   @locking.ssynchronized(_LOCK)
1503   def CheckAndRegister(self, job, dep_job_id, dep_status):
1504     """Checks if a dependency job has the requested status.
1505
1506     If the other job is not yet in a finalized status, the calling job will be
1507     notified (re-added to the workerpool) at a later point.
1508
1509     @type job: L{_QueuedJob}
1510     @param job: Job object
1511     @type dep_job_id: int
1512     @param dep_job_id: ID of dependency job
1513     @type dep_status: list
1514     @param dep_status: Required status
1515
1516     """
1517     assert ht.TJobId(job.id)
1518     assert ht.TJobId(dep_job_id)
1519     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520
1521     if job.id == dep_job_id:
1522       return (self.ERROR, "Job can't depend on itself")
1523
1524     # Get status of dependency job
1525     try:
1526       status = self._getstatus_fn(dep_job_id)
1527     except errors.JobLost, err:
1528       return (self.ERROR, "Dependency error: %s" % err)
1529
1530     assert status in constants.JOB_STATUS_ALL
1531
1532     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533
1534     if status not in constants.JOBS_FINALIZED:
1535       # Register for notification and wait for job to finish
1536       job_id_waiters.add(job)
1537       return (self.WAIT,
1538               "Need to wait for job %s, wanted status '%s'" %
1539               (dep_job_id, dep_status))
1540
1541     # Remove from waiters list
1542     if job in job_id_waiters:
1543       job_id_waiters.remove(job)
1544
1545     if (status == constants.JOB_STATUS_CANCELED and
1546         constants.JOB_STATUS_CANCELED not in dep_status):
1547       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548
1549     elif not dep_status or status in dep_status:
1550       return (self.CONTINUE,
1551               "Dependency job %s finished with status '%s'" %
1552               (dep_job_id, status))
1553
1554     else:
1555       return (self.WRONGSTATUS,
1556               "Dependency job %s finished with status '%s',"
1557               " not one of '%s' as required" %
1558               (dep_job_id, status, utils.CommaJoin(dep_status)))
1559
1560   def _RemoveEmptyWaitersUnlocked(self):
1561     """Remove all jobs without actual waiters.
1562
1563     """
1564     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565                    if not waiters]:
1566       del self._waiters[job_id]
1567
1568   def NotifyWaiters(self, job_id):
1569     """Notifies all jobs waiting for a certain job ID.
1570
1571     @attention: Do not call until L{CheckAndRegister} returned a status other
1572       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573     @type job_id: int
1574     @param job_id: Job ID
1575
1576     """
1577     assert ht.TJobId(job_id)
1578
1579     self._lock.acquire()
1580     try:
1581       self._RemoveEmptyWaitersUnlocked()
1582
1583       jobs = self._waiters.pop(job_id, None)
1584     finally:
1585       self._lock.release()
1586
1587     if jobs:
1588       # Re-add jobs to workerpool
1589       logging.debug("Re-adding %s jobs which were waiting for job %s",
1590                     len(jobs), job_id)
1591       self._enqueue_fn(jobs)
1592
1593
1594 def _RequireOpenQueue(fn):
1595   """Decorator for "public" functions.
1596
1597   This function should be used for all 'public' functions. That is,
1598   functions usually called from other classes. Note that this should
1599   be applied only to methods (not plain functions), since it expects
1600   that the decorated function is called with a first argument that has
1601   a '_queue_filelock' argument.
1602
1603   @warning: Use this decorator only after locking.ssynchronized
1604
1605   Example::
1606     @locking.ssynchronized(_LOCK)
1607     @_RequireOpenQueue
1608     def Example(self):
1609       pass
1610
1611   """
1612   def wrapper(self, *args, **kwargs):
1613     # pylint: disable=W0212
1614     assert self._queue_filelock is not None, "Queue should be open"
1615     return fn(self, *args, **kwargs)
1616   return wrapper
1617
1618
1619 def _RequireNonDrainedQueue(fn):
1620   """Decorator checking for a non-drained queue.
1621
1622   To be used with functions submitting new jobs.
1623
1624   """
1625   def wrapper(self, *args, **kwargs):
1626     """Wrapper function.
1627
1628     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1629
1630     """
1631     # Ok when sharing the big job queue lock, as the drain file is created when
1632     # the lock is exclusive.
1633     # Needs access to protected member, pylint: disable=W0212
1634     if self._drained:
1635       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636
1637     if not self._accepting_jobs:
1638       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639
1640     return fn(self, *args, **kwargs)
1641   return wrapper
1642
1643
1644 class JobQueue(object):
1645   """Queue used to manage the jobs.
1646
1647   """
1648   def __init__(self, context):
1649     """Constructor for JobQueue.
1650
1651     The constructor will initialize the job queue object and then
1652     start loading the current jobs from disk, either for starting them
1653     (if they were queue) or for aborting them (if they were already
1654     running).
1655
1656     @type context: GanetiContext
1657     @param context: the context object for access to the configuration
1658         data and other ganeti objects
1659
1660     """
1661     self.context = context
1662     self._memcache = weakref.WeakValueDictionary()
1663     self._my_hostname = netutils.Hostname.GetSysName()
1664
1665     # The Big JobQueue lock. If a code block or method acquires it in shared
1666     # mode safe it must guarantee concurrency with all the code acquiring it in
1667     # shared mode, including itself. In order not to acquire it at all
1668     # concurrency must be guaranteed with all code acquiring it in shared mode
1669     # and all code acquiring it exclusively.
1670     self._lock = locking.SharedLock("JobQueue")
1671
1672     self.acquire = self._lock.acquire
1673     self.release = self._lock.release
1674
1675     # Accept jobs by default
1676     self._accepting_jobs = True
1677
1678     # Initialize the queue, and acquire the filelock.
1679     # This ensures no other process is working on the job queue.
1680     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1681
1682     # Read serial file
1683     self._last_serial = jstore.ReadSerial()
1684     assert self._last_serial is not None, ("Serial file was modified between"
1685                                            " check in jstore and here")
1686
1687     # Get initial list of nodes
1688     self._nodes = dict((n.name, n.primary_ip)
1689                        for n in self.context.cfg.GetAllNodesInfo().values()
1690                        if n.master_candidate)
1691
1692     # Remove master node
1693     self._nodes.pop(self._my_hostname, None)
1694
1695     # TODO: Check consistency across nodes
1696
1697     self._queue_size = None
1698     self._UpdateQueueSizeUnlocked()
1699     assert ht.TInt(self._queue_size)
1700     self._drained = jstore.CheckDrainFlag()
1701
1702     # Job dependencies
1703     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704                                         self._EnqueueJobs)
1705     self.context.glm.AddToLockMonitor(self.depmgr)
1706
1707     # Setup worker pool
1708     self._wpool = _JobQueueWorkerPool(self)
1709     try:
1710       self._InspectQueue()
1711     except:
1712       self._wpool.TerminateWorkers()
1713       raise
1714
1715   @locking.ssynchronized(_LOCK)
1716   @_RequireOpenQueue
1717   def _InspectQueue(self):
1718     """Loads the whole job queue and resumes unfinished jobs.
1719
1720     This function needs the lock here because WorkerPool.AddTask() may start a
1721     job while we're still doing our work.
1722
1723     """
1724     logging.info("Inspecting job queue")
1725
1726     restartjobs = []
1727
1728     all_job_ids = self._GetJobIDsUnlocked()
1729     jobs_count = len(all_job_ids)
1730     lastinfo = time.time()
1731     for idx, job_id in enumerate(all_job_ids):
1732       # Give an update every 1000 jobs or 10 seconds
1733       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1734           idx == (jobs_count - 1)):
1735         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1736                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1737         lastinfo = time.time()
1738
1739       job = self._LoadJobUnlocked(job_id)
1740
1741       # a failure in loading the job can cause 'None' to be returned
1742       if job is None:
1743         continue
1744
1745       status = job.CalcStatus()
1746
1747       if status == constants.JOB_STATUS_QUEUED:
1748         restartjobs.append(job)
1749
1750       elif status in (constants.JOB_STATUS_RUNNING,
1751                       constants.JOB_STATUS_WAITING,
1752                       constants.JOB_STATUS_CANCELING):
1753         logging.warning("Unfinished job %s found: %s", job.id, job)
1754
1755         if status == constants.JOB_STATUS_WAITING:
1756           # Restart job
1757           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1758           restartjobs.append(job)
1759         else:
1760           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761                                 "Unclean master daemon shutdown")
1762           job.Finalize()
1763
1764         self.UpdateJobUnlocked(job)
1765
1766     if restartjobs:
1767       logging.info("Restarting %s jobs", len(restartjobs))
1768       self._EnqueueJobsUnlocked(restartjobs)
1769
1770     logging.info("Job queue inspection finished")
1771
1772   def _GetRpc(self, address_list):
1773     """Gets RPC runner with context.
1774
1775     """
1776     return rpc.JobQueueRunner(self.context, address_list)
1777
1778   @locking.ssynchronized(_LOCK)
1779   @_RequireOpenQueue
1780   def AddNode(self, node):
1781     """Register a new node with the queue.
1782
1783     @type node: L{objects.Node}
1784     @param node: the node object to be added
1785
1786     """
1787     node_name = node.name
1788     assert node_name != self._my_hostname
1789
1790     # Clean queue directory on added node
1791     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792     msg = result.fail_msg
1793     if msg:
1794       logging.warning("Cannot cleanup queue directory on node %s: %s",
1795                       node_name, msg)
1796
1797     if not node.master_candidate:
1798       # remove if existing, ignoring errors
1799       self._nodes.pop(node_name, None)
1800       # and skip the replication of the job ids
1801       return
1802
1803     # Upload the whole queue excluding archived jobs
1804     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805
1806     # Upload current serial file
1807     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808
1809     # Static address list
1810     addrs = [node.primary_ip]
1811
1812     for file_name in files:
1813       # Read file content
1814       content = utils.ReadFile(file_name)
1815
1816       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817                              file_name, content)
1818       msg = result[node_name].fail_msg
1819       if msg:
1820         logging.error("Failed to upload file %s to node %s: %s",
1821                       file_name, node_name, msg)
1822
1823     # Set queue drained flag
1824     result = \
1825       self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826                                                        self._drained)
1827     msg = result[node_name].fail_msg
1828     if msg:
1829       logging.error("Failed to set queue drained flag on node %s: %s",
1830                     node_name, msg)
1831
1832     self._nodes[node_name] = node.primary_ip
1833
1834   @locking.ssynchronized(_LOCK)
1835   @_RequireOpenQueue
1836   def RemoveNode(self, node_name):
1837     """Callback called when removing nodes from the cluster.
1838
1839     @type node_name: str
1840     @param node_name: the name of the node to remove
1841
1842     """
1843     self._nodes.pop(node_name, None)
1844
1845   @staticmethod
1846   def _CheckRpcResult(result, nodes, failmsg):
1847     """Verifies the status of an RPC call.
1848
1849     Since we aim to keep consistency should this node (the current
1850     master) fail, we will log errors if our rpc fail, and especially
1851     log the case when more than half of the nodes fails.
1852
1853     @param result: the data as returned from the rpc call
1854     @type nodes: list
1855     @param nodes: the list of nodes we made the call to
1856     @type failmsg: str
1857     @param failmsg: the identifier to be used for logging
1858
1859     """
1860     failed = []
1861     success = []
1862
1863     for node in nodes:
1864       msg = result[node].fail_msg
1865       if msg:
1866         failed.append(node)
1867         logging.error("RPC call %s (%s) failed on node %s: %s",
1868                       result[node].call, failmsg, node, msg)
1869       else:
1870         success.append(node)
1871
1872     # +1 for the master node
1873     if (len(success) + 1) < len(failed):
1874       # TODO: Handle failing nodes
1875       logging.error("More than half of the nodes failed")
1876
1877   def _GetNodeIp(self):
1878     """Helper for returning the node name/ip list.
1879
1880     @rtype: (list, list)
1881     @return: a tuple of two lists, the first one with the node
1882         names and the second one with the node addresses
1883
1884     """
1885     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886     name_list = self._nodes.keys()
1887     addr_list = [self._nodes[name] for name in name_list]
1888     return name_list, addr_list
1889
1890   def _UpdateJobQueueFile(self, file_name, data, replicate):
1891     """Writes a file locally and then replicates it to all nodes.
1892
1893     This function will replace the contents of a file on the local
1894     node and then replicate it to all the other nodes we have.
1895
1896     @type file_name: str
1897     @param file_name: the path of the file to be replicated
1898     @type data: str
1899     @param data: the new contents of the file
1900     @type replicate: boolean
1901     @param replicate: whether to spread the changes to the remote nodes
1902
1903     """
1904     getents = runtime.GetEnts()
1905     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906                     gid=getents.daemons_gid,
1907                     mode=constants.JOB_QUEUE_FILES_PERMS)
1908
1909     if replicate:
1910       names, addrs = self._GetNodeIp()
1911       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913
1914   def _RenameFilesUnlocked(self, rename):
1915     """Renames a file locally and then replicate the change.
1916
1917     This function will rename a file in the local queue directory
1918     and then replicate this rename to all the other nodes we have.
1919
1920     @type rename: list of (old, new)
1921     @param rename: List containing tuples mapping old to new names
1922
1923     """
1924     # Rename them locally
1925     for old, new in rename:
1926       utils.RenameFile(old, new, mkdir=True)
1927
1928     # ... and on all nodes
1929     names, addrs = self._GetNodeIp()
1930     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932
1933   def _NewSerialsUnlocked(self, count):
1934     """Generates a new job identifier.
1935
1936     Job identifiers are unique during the lifetime of a cluster.
1937
1938     @type count: integer
1939     @param count: how many serials to return
1940     @rtype: list of int
1941     @return: a list of job identifiers.
1942
1943     """
1944     assert ht.TNonNegativeInt(count)
1945
1946     # New number
1947     serial = self._last_serial + count
1948
1949     # Write to file
1950     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951                              "%s\n" % serial, True)
1952
1953     result = [jstore.FormatJobID(v)
1954               for v in range(self._last_serial + 1, serial + 1)]
1955
1956     # Keep it only if we were able to write the file
1957     self._last_serial = serial
1958
1959     assert len(result) == count
1960
1961     return result
1962
1963   @staticmethod
1964   def _GetJobPath(job_id):
1965     """Returns the job file for a given job id.
1966
1967     @type job_id: str
1968     @param job_id: the job identifier
1969     @rtype: str
1970     @return: the path to the job file
1971
1972     """
1973     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1974
1975   @staticmethod
1976   def _GetArchivedJobPath(job_id):
1977     """Returns the archived job file for a give job id.
1978
1979     @type job_id: str
1980     @param job_id: the job identifier
1981     @rtype: str
1982     @return: the path to the archived job file
1983
1984     """
1985     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986                           jstore.GetArchiveDirectory(job_id),
1987                           "job-%s" % job_id)
1988
1989   @staticmethod
1990   def _DetermineJobDirectories(archived):
1991     """Build list of directories containing job files.
1992
1993     @type archived: bool
1994     @param archived: Whether to include directories for archived jobs
1995     @rtype: list
1996
1997     """
1998     result = [pathutils.QUEUE_DIR]
1999
2000     if archived:
2001       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002       result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003                         utils.ListVisibleFiles(archive_path)))
2004
2005     return result
2006
2007   @classmethod
2008   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009     """Return all known job IDs.
2010
2011     The method only looks at disk because it's a requirement that all
2012     jobs are present on disk (so in the _memcache we don't have any
2013     extra IDs).
2014
2015     @type sort: boolean
2016     @param sort: perform sorting on the returned job ids
2017     @rtype: list
2018     @return: the list of job IDs
2019
2020     """
2021     jlist = []
2022
2023     for path in cls._DetermineJobDirectories(archived):
2024       for filename in utils.ListVisibleFiles(path):
2025         m = constants.JOB_FILE_RE.match(filename)
2026         if m:
2027           jlist.append(int(m.group(1)))
2028
2029     if sort:
2030       jlist.sort()
2031     return jlist
2032
2033   def _LoadJobUnlocked(self, job_id):
2034     """Loads a job from the disk or memory.
2035
2036     Given a job id, this will return the cached job object if
2037     existing, or try to load the job from the disk. If loading from
2038     disk, it will also add the job to the cache.
2039
2040     @type job_id: int
2041     @param job_id: the job id
2042     @rtype: L{_QueuedJob} or None
2043     @return: either None or the job object
2044
2045     """
2046     job = self._memcache.get(job_id, None)
2047     if job:
2048       logging.debug("Found job %s in memcache", job_id)
2049       assert job.writable, "Found read-only job in memcache"
2050       return job
2051
2052     try:
2053       job = self._LoadJobFromDisk(job_id, False)
2054       if job is None:
2055         return job
2056     except errors.JobFileCorrupted:
2057       old_path = self._GetJobPath(job_id)
2058       new_path = self._GetArchivedJobPath(job_id)
2059       if old_path == new_path:
2060         # job already archived (future case)
2061         logging.exception("Can't parse job %s", job_id)
2062       else:
2063         # non-archived case
2064         logging.exception("Can't parse job %s, will archive.", job_id)
2065         self._RenameFilesUnlocked([(old_path, new_path)])
2066       return None
2067
2068     assert job.writable, "Job just loaded is not writable"
2069
2070     self._memcache[job_id] = job
2071     logging.debug("Added job %s to the cache", job_id)
2072     return job
2073
2074   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2075     """Load the given job file from disk.
2076
2077     Given a job file, read, load and restore it in a _QueuedJob format.
2078
2079     @type job_id: int
2080     @param job_id: job identifier
2081     @type try_archived: bool
2082     @param try_archived: Whether to try loading an archived job
2083     @rtype: L{_QueuedJob} or None
2084     @return: either None or the job object
2085
2086     """
2087     path_functions = [(self._GetJobPath, False)]
2088
2089     if try_archived:
2090       path_functions.append((self._GetArchivedJobPath, True))
2091
2092     raw_data = None
2093     archived = None
2094
2095     for (fn, archived) in path_functions:
2096       filepath = fn(job_id)
2097       logging.debug("Loading job from %s", filepath)
2098       try:
2099         raw_data = utils.ReadFile(filepath)
2100       except EnvironmentError, err:
2101         if err.errno != errno.ENOENT:
2102           raise
2103       else:
2104         break
2105
2106     if not raw_data:
2107       return None
2108
2109     if writable is None:
2110       writable = not archived
2111
2112     try:
2113       data = serializer.LoadJson(raw_data)
2114       job = _QueuedJob.Restore(self, data, writable, archived)
2115     except Exception, err: # pylint: disable=W0703
2116       raise errors.JobFileCorrupted(err)
2117
2118     return job
2119
2120   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2121     """Load the given job file from disk.
2122
2123     Given a job file, read, load and restore it in a _QueuedJob format.
2124     In case of error reading the job, it gets returned as None, and the
2125     exception is logged.
2126
2127     @type job_id: int
2128     @param job_id: job identifier
2129     @type try_archived: bool
2130     @param try_archived: Whether to try loading an archived job
2131     @rtype: L{_QueuedJob} or None
2132     @return: either None or the job object
2133
2134     """
2135     try:
2136       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2137     except (errors.JobFileCorrupted, EnvironmentError):
2138       logging.exception("Can't load/parse job %s", job_id)
2139       return None
2140
2141   def _UpdateQueueSizeUnlocked(self):
2142     """Update the queue size.
2143
2144     """
2145     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2146
2147   @locking.ssynchronized(_LOCK)
2148   @_RequireOpenQueue
2149   def SetDrainFlag(self, drain_flag):
2150     """Sets the drain flag for the queue.
2151
2152     @type drain_flag: boolean
2153     @param drain_flag: Whether to set or unset the drain flag
2154
2155     """
2156     # Change flag locally
2157     jstore.SetDrainFlag(drain_flag)
2158
2159     self._drained = drain_flag
2160
2161     # ... and on all nodes
2162     (names, addrs) = self._GetNodeIp()
2163     result = \
2164       self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2165     self._CheckRpcResult(result, self._nodes,
2166                          "Setting queue drain flag to %s" % drain_flag)
2167
2168     return True
2169
2170   @_RequireOpenQueue
2171   def _SubmitJobUnlocked(self, job_id, ops):
2172     """Create and store a new job.
2173
2174     This enters the job into our job queue and also puts it on the new
2175     queue, in order for it to be picked up by the queue processors.
2176
2177     @type job_id: job ID
2178     @param job_id: the job ID for the new job
2179     @type ops: list
2180     @param ops: The list of OpCodes that will become the new job.
2181     @rtype: L{_QueuedJob}
2182     @return: the job object to be queued
2183     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2184     @raise errors.GenericError: If an opcode is not valid
2185
2186     """
2187     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2188       raise errors.JobQueueFull()
2189
2190     job = _QueuedJob(self, job_id, ops, True)
2191
2192     for idx, op in enumerate(job.ops):
2193       # Check priority
2194       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2195         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2196         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2197                                   " are %s" % (idx, op.priority, allowed))
2198
2199       # Check job dependencies
2200       dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2201       if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2202         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2203                                   " match %s: %s" %
2204                                   (idx, opcodes_base.TNoRelativeJobDependencies,
2205                                    dependencies))
2206
2207     # Write to disk
2208     self.UpdateJobUnlocked(job)
2209
2210     self._queue_size += 1
2211
2212     logging.debug("Adding new job %s to the cache", job_id)
2213     self._memcache[job_id] = job
2214
2215     return job
2216
2217   @locking.ssynchronized(_LOCK)
2218   @_RequireOpenQueue
2219   @_RequireNonDrainedQueue
2220   def SubmitJob(self, ops):
2221     """Create and store a new job.
2222
2223     @see: L{_SubmitJobUnlocked}
2224
2225     """
2226     (job_id, ) = self._NewSerialsUnlocked(1)
2227     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2228     return job_id
2229
2230   @locking.ssynchronized(_LOCK)
2231   @_RequireOpenQueue
2232   @_RequireNonDrainedQueue
2233   def SubmitManyJobs(self, jobs):
2234     """Create and store multiple jobs.
2235
2236     @see: L{_SubmitJobUnlocked}
2237
2238     """
2239     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2240
2241     (results, added_jobs) = \
2242       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2243
2244     self._EnqueueJobsUnlocked(added_jobs)
2245
2246     return results
2247
2248   @staticmethod
2249   def _FormatSubmitError(msg, ops):
2250     """Formats errors which occurred while submitting a job.
2251
2252     """
2253     return ("%s; opcodes %s" %
2254             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2255
2256   @staticmethod
2257   def _ResolveJobDependencies(resolve_fn, deps):
2258     """Resolves relative job IDs in dependencies.
2259
2260     @type resolve_fn: callable
2261     @param resolve_fn: Function to resolve a relative job ID
2262     @type deps: list
2263     @param deps: Dependencies
2264     @rtype: tuple; (boolean, string or list)
2265     @return: If successful (first tuple item), the returned list contains
2266       resolved job IDs along with the requested status; if not successful,
2267       the second element is an error message
2268
2269     """
2270     result = []
2271
2272     for (dep_job_id, dep_status) in deps:
2273       if ht.TRelativeJobId(dep_job_id):
2274         assert ht.TInt(dep_job_id) and dep_job_id < 0
2275         try:
2276           job_id = resolve_fn(dep_job_id)
2277         except IndexError:
2278           # Abort
2279           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2280       else:
2281         job_id = dep_job_id
2282
2283       result.append((job_id, dep_status))
2284
2285     return (True, result)
2286
2287   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2288     """Create and store multiple jobs.
2289
2290     @see: L{_SubmitJobUnlocked}
2291
2292     """
2293     results = []
2294     added_jobs = []
2295
2296     def resolve_fn(job_idx, reljobid):
2297       assert reljobid < 0
2298       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2299
2300     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2301       for op in ops:
2302         if getattr(op, opcodes_base.DEPEND_ATTR, None):
2303           (status, data) = \
2304             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2305                                          op.depends)
2306           if not status:
2307             # Abort resolving dependencies
2308             assert ht.TNonEmptyString(data), "No error message"
2309             break
2310           # Use resolved dependencies
2311           op.depends = data
2312       else:
2313         try:
2314           job = self._SubmitJobUnlocked(job_id, ops)
2315         except errors.GenericError, err:
2316           status = False
2317           data = self._FormatSubmitError(str(err), ops)
2318         else:
2319           status = True
2320           data = job_id
2321           added_jobs.append(job)
2322
2323       results.append((status, data))
2324
2325     return (results, added_jobs)
2326
2327   @locking.ssynchronized(_LOCK)
2328   def _EnqueueJobs(self, jobs):
2329     """Helper function to add jobs to worker pool's queue.
2330
2331     @type jobs: list
2332     @param jobs: List of all jobs
2333
2334     """
2335     return self._EnqueueJobsUnlocked(jobs)
2336
2337   def _EnqueueJobsUnlocked(self, jobs):
2338     """Helper function to add jobs to worker pool's queue.
2339
2340     @type jobs: list
2341     @param jobs: List of all jobs
2342
2343     """
2344     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2345     self._wpool.AddManyTasks([(job, ) for job in jobs],
2346                              priority=[job.CalcPriority() for job in jobs],
2347                              task_id=map(_GetIdAttr, jobs))
2348
2349   def _GetJobStatusForDependencies(self, job_id):
2350     """Gets the status of a job for dependencies.
2351
2352     @type job_id: int
2353     @param job_id: Job ID
2354     @raise errors.JobLost: If job can't be found
2355
2356     """
2357     # Not using in-memory cache as doing so would require an exclusive lock
2358
2359     # Try to load from disk
2360     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2361
2362     assert not job.writable, "Got writable job" # pylint: disable=E1101
2363
2364     if job:
2365       return job.CalcStatus()
2366
2367     raise errors.JobLost("Job %s not found" % job_id)
2368
2369   @_RequireOpenQueue
2370   def UpdateJobUnlocked(self, job, replicate=True):
2371     """Update a job's on disk storage.
2372
2373     After a job has been modified, this function needs to be called in
2374     order to write the changes to disk and replicate them to the other
2375     nodes.
2376
2377     @type job: L{_QueuedJob}
2378     @param job: the changed job
2379     @type replicate: boolean
2380     @param replicate: whether to replicate the change to remote nodes
2381
2382     """
2383     if __debug__:
2384       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2385       assert (finalized ^ (job.end_timestamp is None))
2386       assert job.writable, "Can't update read-only job"
2387       assert not job.archived, "Can't update archived job"
2388
2389     filename = self._GetJobPath(job.id)
2390     data = serializer.DumpJson(job.Serialize())
2391     logging.debug("Writing job %s to %s", job.id, filename)
2392     self._UpdateJobQueueFile(filename, data, replicate)
2393
2394   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2395                         timeout):
2396     """Waits for changes in a job.
2397
2398     @type job_id: int
2399     @param job_id: Job identifier
2400     @type fields: list of strings
2401     @param fields: Which fields to check for changes
2402     @type prev_job_info: list or None
2403     @param prev_job_info: Last job information returned
2404     @type prev_log_serial: int
2405     @param prev_log_serial: Last job message serial number
2406     @type timeout: float
2407     @param timeout: maximum time to wait in seconds
2408     @rtype: tuple (job info, log entries)
2409     @return: a tuple of the job information as required via
2410         the fields parameter, and the log entries as a list
2411
2412         if the job has not changed and the timeout has expired,
2413         we instead return a special value,
2414         L{constants.JOB_NOTCHANGED}, which should be interpreted
2415         as such by the clients
2416
2417     """
2418     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2419                              writable=False)
2420
2421     helper = _WaitForJobChangesHelper()
2422
2423     return helper(self._GetJobPath(job_id), load_fn,
2424                   fields, prev_job_info, prev_log_serial, timeout)
2425
2426   @locking.ssynchronized(_LOCK)
2427   @_RequireOpenQueue
2428   def CancelJob(self, job_id):
2429     """Cancels a job.
2430
2431     This will only succeed if the job has not started yet.
2432
2433     @type job_id: int
2434     @param job_id: job ID of job to be cancelled.
2435
2436     """
2437     logging.info("Cancelling job %s", job_id)
2438
2439     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2440
2441   @locking.ssynchronized(_LOCK)
2442   @_RequireOpenQueue
2443   def ChangeJobPriority(self, job_id, priority):
2444     """Changes a job's priority.
2445
2446     @type job_id: int
2447     @param job_id: ID of the job whose priority should be changed
2448     @type priority: int
2449     @param priority: New priority
2450
2451     """
2452     logging.info("Changing priority of job %s to %s", job_id, priority)
2453
2454     if priority not in constants.OP_PRIO_SUBMIT_VALID:
2455       allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2456       raise errors.GenericError("Invalid priority %s, allowed are %s" %
2457                                 (priority, allowed))
2458
2459     def fn(job):
2460       (success, msg) = job.ChangePriority(priority)
2461
2462       if success:
2463         try:
2464           self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2465         except workerpool.NoSuchTask:
2466           logging.debug("Job %s is not in workerpool at this time", job.id)
2467
2468       return (success, msg)
2469
2470     return self._ModifyJobUnlocked(job_id, fn)
2471
2472   def _ModifyJobUnlocked(self, job_id, mod_fn):
2473     """Modifies a job.
2474
2475     @type job_id: int
2476     @param job_id: Job ID
2477     @type mod_fn: callable
2478     @param mod_fn: Modifying function, receiving job object as parameter,
2479       returning tuple of (status boolean, message string)
2480
2481     """
2482     job = self._LoadJobUnlocked(job_id)
2483     if not job:
2484       logging.debug("Job %s not found", job_id)
2485       return (False, "Job %s not found" % job_id)
2486
2487     assert job.writable, "Can't modify read-only job"
2488     assert not job.archived, "Can't modify archived job"
2489
2490     (success, msg) = mod_fn(job)
2491
2492     if success:
2493       # If the job was finalized (e.g. cancelled), this is the final write
2494       # allowed. The job can be archived anytime.
2495       self.UpdateJobUnlocked(job)
2496
2497     return (success, msg)
2498
2499   @_RequireOpenQueue
2500   def _ArchiveJobsUnlocked(self, jobs):
2501     """Archives jobs.
2502
2503     @type jobs: list of L{_QueuedJob}
2504     @param jobs: Job objects
2505     @rtype: int
2506     @return: Number of archived jobs
2507
2508     """
2509     archive_jobs = []
2510     rename_files = []
2511     for job in jobs:
2512       assert job.writable, "Can't archive read-only job"
2513       assert not job.archived, "Can't cancel archived job"
2514
2515       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2516         logging.debug("Job %s is not yet done", job.id)
2517         continue
2518
2519       archive_jobs.append(job)
2520
2521       old = self._GetJobPath(job.id)
2522       new = self._GetArchivedJobPath(job.id)
2523       rename_files.append((old, new))
2524
2525     # TODO: What if 1..n files fail to rename?
2526     self._RenameFilesUnlocked(rename_files)
2527
2528     logging.debug("Successfully archived job(s) %s",
2529                   utils.CommaJoin(job.id for job in archive_jobs))
2530
2531     # Since we haven't quite checked, above, if we succeeded or failed renaming
2532     # the files, we update the cached queue size from the filesystem. When we
2533     # get around to fix the TODO: above, we can use the number of actually
2534     # archived jobs to fix this.
2535     self._UpdateQueueSizeUnlocked()
2536     return len(archive_jobs)
2537
2538   @locking.ssynchronized(_LOCK)
2539   @_RequireOpenQueue
2540   def ArchiveJob(self, job_id):
2541     """Archives a job.
2542
2543     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2544
2545     @type job_id: int
2546     @param job_id: Job ID of job to be archived.
2547     @rtype: bool
2548     @return: Whether job was archived
2549
2550     """
2551     logging.info("Archiving job %s", job_id)
2552
2553     job = self._LoadJobUnlocked(job_id)
2554     if not job:
2555       logging.debug("Job %s not found", job_id)
2556       return False
2557
2558     return self._ArchiveJobsUnlocked([job]) == 1
2559
2560   @locking.ssynchronized(_LOCK)
2561   @_RequireOpenQueue
2562   def AutoArchiveJobs(self, age, timeout):
2563     """Archives all jobs based on age.
2564
2565     The method will archive all jobs which are older than the age
2566     parameter. For jobs that don't have an end timestamp, the start
2567     timestamp will be considered. The special '-1' age will cause
2568     archival of all jobs (that are not running or queued).
2569
2570     @type age: int
2571     @param age: the minimum age in seconds
2572
2573     """
2574     logging.info("Archiving jobs with age more than %s seconds", age)
2575
2576     now = time.time()
2577     end_time = now + timeout
2578     archived_count = 0
2579     last_touched = 0
2580
2581     all_job_ids = self._GetJobIDsUnlocked()
2582     pending = []
2583     for idx, job_id in enumerate(all_job_ids):
2584       last_touched = idx + 1
2585
2586       # Not optimal because jobs could be pending
2587       # TODO: Measure average duration for job archival and take number of
2588       # pending jobs into account.
2589       if time.time() > end_time:
2590         break
2591
2592       # Returns None if the job failed to load
2593       job = self._LoadJobUnlocked(job_id)
2594       if job:
2595         if job.end_timestamp is None:
2596           if job.start_timestamp is None:
2597             job_age = job.received_timestamp
2598           else:
2599             job_age = job.start_timestamp
2600         else:
2601           job_age = job.end_timestamp
2602
2603         if age == -1 or now - job_age[0] > age:
2604           pending.append(job)
2605
2606           # Archive 10 jobs at a time
2607           if len(pending) >= 10:
2608             archived_count += self._ArchiveJobsUnlocked(pending)
2609             pending = []
2610
2611     if pending:
2612       archived_count += self._ArchiveJobsUnlocked(pending)
2613
2614     return (archived_count, len(all_job_ids) - last_touched)
2615
2616   def _Query(self, fields, qfilter):
2617     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2618                        namefield="id")
2619
2620     # Archived jobs are only looked at if the "archived" field is referenced
2621     # either as a requested field or in the filter. By default archived jobs
2622     # are ignored.
2623     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2624
2625     job_ids = qobj.RequestedNames()
2626
2627     list_all = (job_ids is None)
2628
2629     if list_all:
2630       # Since files are added to/removed from the queue atomically, there's no
2631       # risk of getting the job ids in an inconsistent state.
2632       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2633
2634     jobs = []
2635
2636     for job_id in job_ids:
2637       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2638       if job is not None or not list_all:
2639         jobs.append((job_id, job))
2640
2641     return (qobj, jobs, list_all)
2642
2643   def QueryJobs(self, fields, qfilter):
2644     """Returns a list of jobs in queue.
2645
2646     @type fields: sequence
2647     @param fields: List of wanted fields
2648     @type qfilter: None or query2 filter (list)
2649     @param qfilter: Query filter
2650
2651     """
2652     (qobj, ctx, _) = self._Query(fields, qfilter)
2653
2654     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2655
2656   def OldStyleQueryJobs(self, job_ids, fields):
2657     """Returns a list of jobs in queue.
2658
2659     @type job_ids: list
2660     @param job_ids: sequence of job identifiers or None for all
2661     @type fields: list
2662     @param fields: names of fields to return
2663     @rtype: list
2664     @return: list one element per job, each element being list with
2665         the requested fields
2666
2667     """
2668     # backwards compat:
2669     job_ids = [int(jid) for jid in job_ids]
2670     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2671
2672     (qobj, ctx, _) = self._Query(fields, qfilter)
2673
2674     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2675
2676   @locking.ssynchronized(_LOCK)
2677   def PrepareShutdown(self):
2678     """Prepare to stop the job queue.
2679
2680     Disables execution of jobs in the workerpool and returns whether there are
2681     any jobs currently running. If the latter is the case, the job queue is not
2682     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2683     be called without interfering with any job. Queued and unfinished jobs will
2684     be resumed next time.
2685
2686     Once this function has been called no new job submissions will be accepted
2687     (see L{_RequireNonDrainedQueue}).
2688
2689     @rtype: bool
2690     @return: Whether there are any running jobs
2691
2692     """
2693     if self._accepting_jobs:
2694       self._accepting_jobs = False
2695
2696       # Tell worker pool to stop processing pending tasks
2697       self._wpool.SetActive(False)
2698
2699     return self._wpool.HasRunningTasks()
2700
2701   def AcceptingJobsUnlocked(self):
2702     """Returns whether jobs are accepted.
2703
2704     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2705     queue is shutting down.
2706
2707     @rtype: bool
2708
2709     """
2710     return self._accepting_jobs
2711
2712   @locking.ssynchronized(_LOCK)
2713   @_RequireOpenQueue
2714   def Shutdown(self):
2715     """Stops the job queue.
2716
2717     This shutdowns all the worker threads an closes the queue.
2718
2719     """
2720     self._wpool.TerminateWorkers()
2721
2722     self._queue_filelock.Close()
2723     self._queue_filelock = None